zeromq context

A 0MQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller.

在C++中ctx_t这个类型代表context。

1
2
3
4
5
6
7
8
void *zmq_init (int io_threads_)
{
// ...
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
alloc_assert (ctx);
return (void*) ctx;
}

The io_threads argument specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.

下面来看ctx_t的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//  Initialise the array of mailboxes. Additional three slots are for
// internal log socket and the zmq_term thread the reaper thread.
slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);

// Initialise the infrastructure for zmq_term thread.
slots [term_tid] = &term_mailbox;

// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();

1
2
3
4
5
//  Create the logging infrastructure.
log_socket = create_socket (ZMQ_PUB);
zmq_assert (log_socket);
rc = log_socket->bind ("sys://log");
zmq_assert (rc == 0);
1
2
3
4
5
6
7
8
//  Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
1
2
3
4
5
6
//  In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}

以上就是ctx_t构造函数函数体的全部。

接下来看reaper_t这个类型,reaper_t用来销毁0mq的socket的。

1
2
3
4
5
6
7
8
9
10
11
void zmq::reaper_t::process_reap (socket_base_t *socket_)
{
// Add the socket to the poller.
socket_->start_reaping (poller);

// Start termination of associated I/O object hierarchy.
socket_->terminate ();
socket_->check_destroy ();

++sockets;
}

下面是reaper_t的构造函数:

1
2
3
4
5
6
7
8
9
10
11
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
sockets (0),
terminating (false)
{
poller = new (std::nothrow) poller_t;
alloc_assert (poller);

mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}

其中poller_t这个类型在FreeBSD上定义为kqueue_t,基于kqueue/kevent做polling。

1
typedef kqueue_t poller_t
1
2
3
4
5
6
7
zmq::kqueue_t::kqueue_t () :
stopping (false)
{
// Create event queue
kqueue_fd = kqueue ();
errno_assert (kqueue_fd != -1);
}

ctx_t的构造函数调用了reaper_t的start函数,而该函数有调用了kqueue_t的start函数。

1
2
3
4
5
void zmq::reaper_t::start ()
{
// Start the thread.
poller->start ();
}
1
2
3
4
void zmq::kqueue_t::start ()
{
worker.start (worker_routine, this);
}

kqueue_t的worker成员的类型是thread_t,其start函数创建了一个新的线程。

1
2
3
4
5
6
7
void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{
tfn = tfn_;
arg =arg_;
int rc = pthread_create (&descriptor, NULL, thread_routine, this);
posix_assert (rc);
}
1
2
3
4
5
6
static void *thread_routine (void *arg_)
{
zmq::thread_t *self = (zmq::thread_t*) arg_;
self->tfn (self->arg);
return NULL;
}

回过头来在看kqueue_t的worker_routine函数。

1
2
3
4
void zmq::kqueue_t::worker_routine (void *arg_)
{
((kqueue_t*) arg_)->loop ();
}

其实就是在一个新的线程里面调用kqueue_t的loop函数。0mq的做法将之与具体平台的线程接口隔离开了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void zmq::kqueue_t::loop ()
{
while (!stopping) {

// Execute any due timers.
int timeout = (int) execute_timers ();

// Wait for events.
struct kevent ev_buf [max_io_events];
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
timeout ? &ts: NULL);
if (n == -1 && errno == EINTR)
continue;
errno_assert (n != -1);

for (int i = 0; i < n; i ++) {
poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;

if (pe->fd == retired_fd)
continue;
if (ev_buf [i].flags & EV_EOF)
pe->reactor->in_event ();
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].filter == EVFILT_WRITE)
pe->reactor->out_event ();
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].filter == EVFILT_READ)
pe->reactor->in_event ();
}

// Destroy retired event sources.
for (retired_t::iterator it = retired.begin (); it != retired.end ();
++it)
delete *it;
retired.clear ();
}
}

kqueue_t的loop函数主要执行timer相关函数、polling fds并调用相关接口和清理资源。

ctx_t构造函数创建的io_thread_t与reaper_t类似

1
2
3
4
5
6
7
8
9
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t;
alloc_assert (poller);

mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
1
2
3
4
5
void zmq::io_thread_t::start ()
{
// Start the underlying I/O thread.
poller->start ();
}

在来看io_thread_t的in_event函数,reaper_t的in_event函数和io_thread_t的一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zmq::io_thread_t::in_event ()
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?

while (true) {

// Get the next command. If there is none, exit.
command_t cmd;
int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
break;
errno_assert (rc == 0);

// Process the command.
cmd.destination->process_command (cmd);
}
}