Commit volatile memory to persistent append-only log
0%
zeromq context
Posted onEdited on
A 0MQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller.
The io_threads argument specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.
下面来看ctx_t的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Initialise the array of mailboxes. Additional three slots are for // internal log socket and the zmq_term thread the reaper thread. slot_count = max_sockets + io_threads_ + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); alloc_assert (slots);
// Initialise the infrastructure for zmq_term thread. slots [term_tid] = &term_mailbox;
// Create I/O thread objects and launch them. for (uint32_t i = 2; i != io_threads_ + 2; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); alloc_assert (io_thread); io_threads.push_back (io_thread); slots [i] = io_thread->get_mailbox (); io_thread->start (); }
1 2 3 4 5 6
// In the unused part of the slot array, create a list of empty slots. for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) io_threads_ + 2; i--) { empty_slots.push_back (i); slots [i] = NULL; }
以上就是ctx_t构造函数函数体的全部。
接下来看reaper_t这个类型,reaper_t用来销毁0mq的socket的。
1 2 3 4 5 6 7 8 9 10 11
void zmq::reaper_t::process_reap (socket_base_t *socket_) { // Add the socket to the poller. socket_->start_reaping (poller);
void zmq::io_thread_t::in_event () { // TODO: Do we want to limit number of commands I/O thread can // process in a single go?
while (true) {
// Get the next command. If there is none, exit. command_t cmd; int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) break; errno_assert (rc == 0);
// Process the command. cmd.destination->process_command (cmd); } }