Recording

Commit volatile memory to persistent append-only log

0%

zeromq mailbox

0mq中的reaper_t和io_thread_t的in_event函数大体相同,都是用来处理mailbox里的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zmq::io_thread_t::in_event ()
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?

while (true) {

// Get the next command. If there is none, exit.
command_t cmd;
int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
break;
errno_assert (rc == 0);

// Process the command.
cmd.destination->;process_command (cmd);
}
}
1
2
3
4
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
}

之前可以看出:

    • 当in_event函数相关联的fd可读时,in_event函数被调用;
    • reaper_t和io_thread_t的in_event的关联fd都是各自的mailbox成员get_fd函数的返回值。
    1
    2
    3
    4
    zmq::fd_t zmq::mailbox_t::get_fd ()
    {
    return signaler.get_fd ();
    }
    1
    2
    3
    4
    zmq::fd_t zmq::signaler_t::get_fd ()
    {
    return r;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    zmq::signaler_t::signaler_t ()
    {
    // Create the socketpair for signaling.
    int rc = make_fdpair (&r, &w);
    errno_assert (rc == 0);

    // Set both fds to non-blocking mode.
    int flags = fcntl (w, F_GETFL, 0);
    errno_assert (flags >= 0);
    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc == 0);
    flags = fcntl (r, F_GETFL, 0);
    errno_assert (flags >= 0);
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc == 0);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
    {
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    *w_ = sv [0];
    *r_ = sv [1];
    return 0;
    }

    mailbox_t的get_fd函数返回的是其signaler成员所创建的管道的读端。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    zmq::mailbox_t::mailbox_t ()
    {
    // Get the pipe into passive state. That way, if the users starts by
    // polling on the associated file descriptor it will get woken up when
    // new command is posted.
    bool ok = cpipe.read (NULL);
    zmq_assert (!ok);
    active = false;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void zmq::mailbox_t::send (const command_t &cmd_)
    {
    sync.lock ();
    cpipe.write (cmd_, false);
    bool ok = cpipe.flush ();
    sync.unlock ();
    if (!ok)
    signaler.send ();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void zmq::signaler_t::send ()
    {
    unsigned char dummy = 0;
    while (true) {
    ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
    if (unlikely (nbytes == -1 && errno == EINTR))
    continue;
    zmq_assert (nbytes == sizeof (dummy));
    break;
    }
    }

    当用户第一次调用mailbox_t的send函数时,会调用其signaler成员的send函数,这使得signaler创建的管道的读端变为可读。其关联对象的in_event函数得到调用。reaper_t和io_thread_t的in_event函数循环调用各自mailbox的recv函数取得command。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
    {
    // Try to get the command straight away.
    if (active) {
    bool ok = cpipe.read (cmd_);
    if (ok)
    return 0;

    // If there are no more commands available, switch into passive state.
    active = false;
    signaler.recv ();
    }

    // Wait for signal from the command sender.
    int rc = signaler.wait (timeout_);
    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
    return -1;

    // We've got the signal. Now we can switch into active state.
    active = true;

    // Get a command.
    errno_assert (rc == 0);
    bool ok = cpipe.read (cmd_);
    zmq_assert (ok);
    return 0;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    int zmq::signaler_t::wait (int timeout_)
    {
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
    zmq_assert (errno == EINTR);
    return -1;
    }
    else if (unlikely (rc == 0)) {
    errno = EAGAIN;
    return -1;
    }
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;
    }

    当command被取完时,调用singlar.recv将之前由mailbox_t::send触发的singlar.send发送的内容清空;并将errno设为EAGAIN。in_event函数终止。