zeromq connect

zmq_connect直接调用socket_base_t的connect函数。这里暂且只看tcp和ipc部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
int zmq::socket_base_t::connect (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}

// Parse addr_ string.
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
// omit...
// Parsed address for validation
sockaddr_storage addr;
socklen_t addr_len;

if (protocol == "tcp")
rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ());
else
if (protocol == "ipc")
rc = resolve_local_path (&addr, &addr_len, address.c_str ());
if (rc != 0)
return -1;

// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}

// Create session.
connect_session_t *session = new (std::nothrow) connect_session_t (
io_thread, this, options, protocol.c_str (), address.c_str ());
alloc_assert (session);

// If 'immediate connect' feature is required, we'll create the pipes
// to the session straight away. Otherwise, they'll be created by the
// session once the connection is established.
if (options.immediate_connect) {

reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;

// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, session, options.hwm, options.swap,
&inpipe_reader, &inpipe_writer);

// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (session, this, options.hwm, options.swap,
&outpipe_reader, &outpipe_writer);

// Attach the pipes to the socket object.
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());

// Attach the pipes to the session object.
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}

// Activate the session. Make it a child of this socket.
launch_child (session);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
void zmq::own_t::launch_child (own_t *object_)
{
// Specify the owner of the object.
object_->set_owner (this);

// Plug the object into the I/O thread.
send_plug (object_);

// Take ownership of the object.
send_own (this, object_);
}

0mq socket为每个connect创建一个connect_session_t,代表主动发起的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void zmq::connect_session_t::process_plug ()
{
// Start connection process immediately.
start_connecting (false);
}

void zmq::connect_session_t::start_connecting (bool wait_)
{
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create the connecter object.

// Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") {

zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
io_thread, this, options, protocol.c_str (), address.c_str (),
wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}

zmq_assert (false);
}

zmq_connect_t负责发起连接以及连接失败后重连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
add_reconnect_timer();
else
start_connecting ();
}
void zmq::zmq_connecter_t::start_connecting ()
{
// Open the connecting socket.
int rc = tcp_connecter.open ();

// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
out_event ();
return;
}

// Connection establishment may be dealyed. Poll for its completion.
else if (rc == -1 && errno == EAGAIN) {
handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
set_pollout (handle);
return;
}

// Handle any other error condition by eventual reconnect.
wait = true;
add_reconnect_timer();
}

对于ipc或者本机tcp端口的连接可能立即成功,对于处于连接过程中的fd将其置入io_thread_t的线程中polling。如失败,则重连。下面是zmq_connect_t的out_event函数,该函数在连接成功时被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void zmq::zmq_connecter_t::out_event ()
{
fd_t fd = tcp_connecter.connect ();
rm_fd (handle);
handle_valid = false;

// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
add_reconnect_timer();
return;
}

// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
session, fd, options);
alloc_assert (init);
launch_sibling (init);

// Shut the connecter down.
terminate ();
}

当连接建立起来之后,通过zmq_init_t进行最出的数据(identity)交换。zmq_init_t被直接挂接到了session_t(connect_session_t)上去了,而zmq_connecter_t被terminate()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
sent (false),
received (false),
socket (socket_),
session (session_),
io_thread (io_thread_)
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine);
}
void zmq::zmq_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (io_thread, this);
}
void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
zmq_assert (!plugged);
plugged = true;
ephemeral_inout = NULL;

// Connect to session/init object.
zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_);
decoder.set_inout (inout_);
inout = inout_;

// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (tcp_socket.get_fd ());
set_pollin (handle);
set_pollout (handle);

// Flush all the data that may have been already received downstream.
in_event ();
}

这一次被polling的是真正的用于tcp/ipc数据传输的socket fd。zmq_engine_t的encoder和decoder的类型分别为encoder_t和decoder_t。encoder_t用于发送数据时负责从inout_(这里是zmq_init_t)取得(调用inout_的read函数)并打包数据。decoder_t用于接收数据时负责解包并将数据发送(调用inout_的write函数)到inout_。具体可看二者的实现及zmq_engine_t的in_event和out_event函数。下面时zmq_init_t的read、write函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
// If the identity was already sent, do nothing.
if (sent)
return false;

// Send the identity.
int rc = zmq_msg_init_size (msg_, options.identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
sent = true;

// Try finalize initialization.
finalise_initialisation ();

return true;
}

bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
{
// If identity was already received, we are not interested
// in subsequent messages.
if (received)
return false;

// Retreieve the remote identity. If it's empty, generate a unique name.
if (!zmq_msg_size (msg_)) {
unsigned char identity [uuid_t::uuid_blob_len + 1];
identity [0] = 0;
memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
}
else {
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
}
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);

received = true;

// Try finalize initialization.
finalise_initialisation ();

return true;
}
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
if (sent && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
return;
}
}
void zmq::zmq_init_t::flush ()
{
// Check if there's anything to flush.
if (!received)
return;

// Initialization is done, dispatch engine.
if (ephemeral_engine)
dispatch_engine ();
}

可以看出zmq_init_t的read只是将其identity发送出去,write只是接收连接另一方的identity。当连接的双方交换过identity之后,zmq_init_t不在发送或接收任何数据。zmq_init_t的flush函数被zmq_engine_t的in_event或out_event调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void zmq::zmq_init_t::dispatch_engine ()
{
if (sent && received) {

// Engine must be detached.
zmq_assert (!engine);
zmq_assert (ephemeral_engine);

// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
// know about the session only if this object is owned by it. Thus,
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}

// All the cases below are listener-based. Therefore we need the socket
// reference so that new sessions can bind to that socket.
zmq_assert (socket);

// We have no associated session. If the peer has no identity we'll
// create a transient session for the connection. Note that
// seqnum is incremented to account for attach command before the
// session is launched. That way we are sure it won't terminate before
// being attached.
if (peer_identity [0] == 0) {
session = new (std::nothrow) transient_session_t (io_thread,
socket, options);
alloc_assert (session);
session->inc_seqnum ();
launch_sibling (session);
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}

// Try to find the session corresponding to the peer's identity.
// If found, send the engine to that session and destroy this object.
// Note that session's seqnum is incremented by find_session rather
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}

// There's no such named session. We have to create one. Note that
// seqnum is incremented to account for attach command before the
// session is launched. That way we are sure it won't terminate before
// being attached.
session = new (std::nothrow) named_session_t (io_thread, socket,
options, peer_identity);
alloc_assert (session);
session->inc_seqnum ();
launch_sibling (session);
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
}

在主动发起connect时,zmq_init_t是由zmq_connecter_t创建的,其session成员指向connect_session_t(继承自session_t)。当zmq_init_t的dispatch_engine被调用时,直接向connect_session_t发送attach zmq_engine_t的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
{
// omit ...

// Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object.
if (!pipes_attached) {
zmq_assert (!in_pipe && !out_pipe);
pipes_attached = true;
reader_t *socket_reader = NULL;
writer_t *socket_writer = NULL;

// Create the pipes, as required.
if (options.requires_in) {
create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
&out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
&socket_writer);
in_pipe->set_event_sink (this);
}

// Bind the pipes to the socket object.
if (socket_reader || socket_writer)
send_bind (socket, socket_reader, socket_writer, peer_identity_);
}

// Plug in the engine.
engine = engine_;
engine->plug (io_thread, this);

// Trigger the notfication about the attachment.
attached (peer_identity_);
}

在if中执行的代码其实和socket_base_t::connect函数靠后的if中的代码作用相似。都是在0mq的socket和session_t之间建立传输zmq_msg_t的管道。这一次zmq_engine_t plug函数的inout_参数时session_t。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
{
uint64_t lwm = (hwm_ > max_wm_delta * 2) ?
hwm_ - max_wm_delta : (hwm_ + 1) / 2;

// Create all three objects pipe consists of: the pipe per se, reader and
// writer. The pipe will be handled by reader and writer, its never passed
// to the user. Reader and writer are returned to the user.
pipe_t *pipe = new (std::nothrow) pipe_t ();
alloc_assert (pipe);
*reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
alloc_assert (*reader_);
*writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
hwm_, swap_size_);
alloc_assert (*writer_);
}

当tcp/ipc的socket可读或可写时分别调用的是session_t的write和read函数,即从tcp/ipc读出数据,由decoder解包,由session_t写入传输zmq_msg_t的管道,再由0mq socket读给应用程序;应用程序写入zmq_msg_t,0mq socket将之写入传输管道,session_t从中读取zmq_msg_t,再由encoder_t打包发送到tcp/ipc。