if (protocol == "tcp") rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ()); else if (protocol == "ipc") rc = resolve_local_path (&addr, &addr_len, address.c_str ()); if (rc != 0) return-1;
// Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return-1; }
// If 'immediate connect' feature is required, we'll create the pipes // to the session straight away. Otherwise, they'll be created by the // session once the connection is established. if (options.immediate_connect) {
void zmq::connect_session_t::start_connecting (bool wait_) { // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_assert (io_thread);
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure. if (protocol == "tcp" || protocol == "ipc") {
// Handle the error condition by attempt to reconnect. if (fd == retired_fd) { tcp_connecter.close (); wait = true; add_reconnect_timer(); return; }
// Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_assert (io_thread);
// Create an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, session, fd, options); alloc_assert (init); launch_sibling (init);
bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) { // If identity was already received, we are not interested // in subsequent messages. if (received) returnfalse;
void zmq::zmq_init_t::dispatch_engine () { if (sent && received) {
// Engine must be detached. zmq_assert (!engine); zmq_assert (ephemeral_engine);
// If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we // know about the session only if this object is owned by it. Thus, // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; }
// All the cases below are listener-based. Therefore we need the socket // reference so that new sessions can bind to that socket. zmq_assert (socket);
// We have no associated session. If the peer has no identity we'll // create a transient session for the connection. Note that // seqnum is incremented to account for attach command before the // session is launched. That way we are sure it won't terminate before // being attached. if (peer_identity [0] == 0) { session = new (std::nothrow) transient_session_t (io_thread, socket, options); alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } // Try to find the session corresponding to the peer's identity. // If found, send the engine to that session and destroy this object. // Note that session's seqnum is incremented by find_session rather // than by send_attach. session = socket->find_session (peer_identity); if (session) { send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; }
// There's no such named session. We have to create one. Note that // seqnum is incremented to account for attach command before the // session is launched. That way we are sure it won't terminate before // being attached. session = new (std::nothrow) named_session_t (io_thread, socket, options, peer_identity); alloc_assert (session); session->inc_seqnum (); launch_sibling (session); send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } }
// Check whether the required pipes already exist. If not so, we'll // create them and bind them to the socket object. if (!pipes_attached) { zmq_assert (!in_pipe && !out_pipe); pipes_attached = true; reader_t *socket_reader = NULL; writer_t *socket_writer = NULL;
// Create the pipes, as required. if (options.requires_in) { create_pipe (socket, this, options.hwm, options.swap, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out) { create_pipe (this, socket, options.hwm, options.swap, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); }
// Bind the pipes to the socket object. if (socket_reader || socket_writer) send_bind (socket, socket_reader, socket_writer, peer_identity_); }
// Plug in the engine. engine = engine_; engine->plug (io_thread, this);
// Trigger the notfication about the attachment. attached (peer_identity_); }
// Create all three objects pipe consists of: the pipe per se, reader and // writer. The pipe will be handled by reader and writer, its never passed // to the user. Reader and writer are returned to the user. pipe_t *pipe = new (std::nothrow) pipe_t (); alloc_assert (pipe); *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); alloc_assert (*reader_); *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, hwm_, swap_size_); alloc_assert (*writer_); }