Recording

Commit volatile memory to persistent append-only log

0%

zmq_connect直接调用socket_base_t的connect函数。这里暂且只看tcp和ipc部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
int zmq::socket_base_t::connect (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}

// Parse addr_ string.
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
// omit...
// Parsed address for validation
sockaddr_storage addr;
socklen_t addr_len;

if (protocol == "tcp")
rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ());
else
if (protocol == "ipc")
rc = resolve_local_path (&addr, &addr_len, address.c_str ());
if (rc != 0)
return -1;

// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}

// Create session.
connect_session_t *session = new (std::nothrow) connect_session_t (
io_thread, this, options, protocol.c_str (), address.c_str ());
alloc_assert (session);

// If 'immediate connect' feature is required, we'll create the pipes
// to the session straight away. Otherwise, they'll be created by the
// session once the connection is established.
if (options.immediate_connect) {

reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;

// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, session, options.hwm, options.swap,
&inpipe_reader, &inpipe_writer);

// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (session, this, options.hwm, options.swap,
&outpipe_reader, &outpipe_writer);

// Attach the pipes to the socket object.
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());

// Attach the pipes to the session object.
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}

// Activate the session. Make it a child of this socket.
launch_child (session);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
void zmq::own_t::launch_child (own_t *object_)
{
// Specify the owner of the object.
object_->set_owner (this);

// Plug the object into the I/O thread.
send_plug (object_);

// Take ownership of the object.
send_own (this, object_);
}

0mq socket为每个connect创建一个connect_session_t,代表主动发起的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void zmq::connect_session_t::process_plug ()
{
// Start connection process immediately.
start_connecting (false);
}

void zmq::connect_session_t::start_connecting (bool wait_)
{
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create the connecter object.

// Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") {

zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
io_thread, this, options, protocol.c_str (), address.c_str (),
wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}

zmq_assert (false);
}

zmq_connect_t负责发起连接以及连接失败后重连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
add_reconnect_timer();
else
start_connecting ();
}
void zmq::zmq_connecter_t::start_connecting ()
{
// Open the connecting socket.
int rc = tcp_connecter.open ();

// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
out_event ();
return;
}

// Connection establishment may be dealyed. Poll for its completion.
else if (rc == -1 && errno == EAGAIN) {
handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
set_pollout (handle);
return;
}

// Handle any other error condition by eventual reconnect.
wait = true;
add_reconnect_timer();
}

对于ipc或者本机tcp端口的连接可能立即成功,对于处于连接过程中的fd将其置入io_thread_t的线程中polling。如失败,则重连。下面是zmq_connect_t的out_event函数,该函数在连接成功时被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void zmq::zmq_connecter_t::out_event ()
{
fd_t fd = tcp_connecter.connect ();
rm_fd (handle);
handle_valid = false;

// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
add_reconnect_timer();
return;
}

// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
session, fd, options);
alloc_assert (init);
launch_sibling (init);

// Shut the connecter down.
terminate ();
}

当连接建立起来之后,通过zmq_init_t进行最出的数据(identity)交换。zmq_init_t被直接挂接到了session_t(connect_session_t)上去了,而zmq_connecter_t被terminate()。

Read more »

0mq中的reaper_t和io_thread_t的in_event函数大体相同,都是用来处理mailbox里的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zmq::io_thread_t::in_event ()
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?

while (true) {

// Get the next command. If there is none, exit.
command_t cmd;
int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
break;
errno_assert (rc == 0);

// Process the command.
cmd.destination->;process_command (cmd);
}
}
1
2
3
4
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
}

之前可以看出:

    • 当in_event函数相关联的fd可读时,in_event函数被调用;
    • reaper_t和io_thread_t的in_event的关联fd都是各自的mailbox成员get_fd函数的返回值。
    1
    2
    3
    4
    zmq::fd_t zmq::mailbox_t::get_fd ()
    {
    return signaler.get_fd ();
    }
    1
    2
    3
    4
    zmq::fd_t zmq::signaler_t::get_fd ()
    {
    return r;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    zmq::signaler_t::signaler_t ()
    {
    // Create the socketpair for signaling.
    int rc = make_fdpair (&r, &w);
    errno_assert (rc == 0);

    // Set both fds to non-blocking mode.
    int flags = fcntl (w, F_GETFL, 0);
    errno_assert (flags >= 0);
    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc == 0);
    flags = fcntl (r, F_GETFL, 0);
    errno_assert (flags >= 0);
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc == 0);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
    {
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    *w_ = sv [0];
    *r_ = sv [1];
    return 0;
    }

    mailbox_t的get_fd函数返回的是其signaler成员所创建的管道的读端。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    zmq::mailbox_t::mailbox_t ()
    {
    // Get the pipe into passive state. That way, if the users starts by
    // polling on the associated file descriptor it will get woken up when
    // new command is posted.
    bool ok = cpipe.read (NULL);
    zmq_assert (!ok);
    active = false;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void zmq::mailbox_t::send (const command_t &cmd_)
    {
    sync.lock ();
    cpipe.write (cmd_, false);
    bool ok = cpipe.flush ();
    sync.unlock ();
    if (!ok)
    signaler.send ();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void zmq::signaler_t::send ()
    {
    unsigned char dummy = 0;
    while (true) {
    ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
    if (unlikely (nbytes == -1 && errno == EINTR))
    continue;
    zmq_assert (nbytes == sizeof (dummy));
    break;
    }
    }

    当用户第一次调用mailbox_t的send函数时,会调用其signaler成员的send函数,这使得signaler创建的管道的读端变为可读。其关联对象的in_event函数得到调用。reaper_t和io_thread_t的in_event函数循环调用各自mailbox的recv函数取得command。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
    {
    // Try to get the command straight away.
    if (active) {
    bool ok = cpipe.read (cmd_);
    if (ok)
    return 0;

    // If there are no more commands available, switch into passive state.
    active = false;
    signaler.recv ();
    }

    // Wait for signal from the command sender.
    int rc = signaler.wait (timeout_);
    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
    return -1;

    // We've got the signal. Now we can switch into active state.
    active = true;

    // Get a command.
    errno_assert (rc == 0);
    bool ok = cpipe.read (cmd_);
    zmq_assert (ok);
    return 0;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    int zmq::signaler_t::wait (int timeout_)
    {
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
    zmq_assert (errno == EINTR);
    return -1;
    }
    else if (unlikely (rc == 0)) {
    errno = EAGAIN;
    return -1;
    }
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;
    }

    当command被取完时,调用singlar.recv将之前由mailbox_t::send触发的singlar.send发送的内容清空;并将errno设为EAGAIN。in_event函数终止。

Read more »

A 0MQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller.

在C++中ctx_t这个类型代表context。

1
2
3
4
5
6
7
8
void *zmq_init (int io_threads_)
{
// ...
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
alloc_assert (ctx);
return (void*) ctx;
}

The io_threads argument specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.

下面来看ctx_t的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//  Initialise the array of mailboxes. Additional three slots are for
// internal log socket and the zmq_term thread the reaper thread.
slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);

// Initialise the infrastructure for zmq_term thread.
slots [term_tid] = &term_mailbox;

// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper-&gt;get_mailbox ();
reaper-&gt;start ();
1
2
3
4
5
//  Create the logging infrastructure.
log_socket = create_socket (ZMQ_PUB);
zmq_assert (log_socket);
rc = log_socket-&gt;bind ("sys://log");
zmq_assert (rc == 0);
1
2
3
4
5
6
7
8
//  Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread-&gt;get_mailbox ();
io_thread-&gt;start ();
}
1
2
3
4
5
6
//  In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}

以上就是ctx_t构造函数函数体的全部。

Read more »