void zmq::io_thread_t::in_event () { // TODO: Do we want to limit number of commands I/O thread can // process in a single go?
while (true) {
// Get the next command. If there is none, exit. command_t cmd; int rc = mailbox.recv (&cmd, 0); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) break; errno_assert (rc == 0);
// Process the command. cmd.destination->;process_command (cmd); } }
zmq::mailbox_t::mailbox_t () { // Get the pipe into passive state. That way, if the users starts by // polling on the associated file descriptor it will get woken up when // new command is posted. bool ok = cpipe.read (NULL); zmq_assert (!ok); active = false; }
int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { // Try to get the command straight away. if (active) { bool ok = cpipe.read (cmd_); if (ok) return0;
// If there are no more commands available, switch into passive state. active = false; signaler.recv (); }
// Wait for signal from the command sender. int rc = signaler.wait (timeout_); if (rc != 0 && (errno == EAGAIN || errno == EINTR)) return-1;
// We've got the signal. Now we can switch into active state. active = true;
// Get a command. errno_assert (rc == 0); bool ok = cpipe.read (cmd_); zmq_assert (ok); return0; }