1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| int zmq::socket_base_t::connect (const char *addr_) { if (unlikely (ctx_terminated)) { errno = ETERM; return -1; }
std::string protocol; std::string address; int rc = parse_uri (addr_, protocol, address); if (rc != 0) return -1; sockaddr_storage addr; socklen_t addr_len;
if (protocol == "tcp") rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ()); else if (protocol == "ipc") rc = resolve_local_path (&addr, &addr_len, address.c_str ()); if (rc != 0) return -1;
io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return -1; }
connect_session_t *session = new (std::nothrow) connect_session_t ( io_thread, this, options, protocol.c_str (), address.c_str ()); alloc_assert (session);
if (options.immediate_connect) {
reader_t *inpipe_reader = NULL; writer_t *inpipe_writer = NULL; reader_t *outpipe_reader = NULL; writer_t *outpipe_writer = NULL;
if (options.requires_in) create_pipe (this, session, options.hwm, options.swap, &inpipe_reader, &inpipe_writer);
if (options.requires_out) create_pipe (session, this, options.hwm, options.swap, &outpipe_reader, &outpipe_writer);
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); }
launch_child (session);
return 0; }
|