Recording

Commit volatile memory to persistent append-only log

0%

zeromq bind

这里只看tcp/ipc部分,下面代码摘自socket_base_t的bind函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (protocol == "tcp" || protocol == "ipc") {

// Choose I/O thread to run the listerner in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}

// Create and run the listener.
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
io_thread, this, options);
alloc_assert (listener);
int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
delete listener;
return -1;
}
launch_child (listener);

return 0;
}

bind创建一个zmq_listener_t挂到socket上去了。zmq_listener_t在其set_address函数中就开始监听tcp/ipc连接。

1
2
3
4
5
int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_)
{
return tcp_listener.set_address (protocol_, addr_, options.backlog);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
int backlog_)
{
if (strcmp (protocol_, "tcp") == 0 ) {

// Resolve the sockaddr to bind to.
int rc = resolve_ip_interface (&addr, &addr_len, addr_);
if (rc != 0)
return -1;

// Create a listening socket.
s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;

// Allow reusing of the address.
int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);

// Set the non-blocking flag.
flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);

// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

// Listen for incomming connections.
rc = listen (s, backlog_);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

return 0;
}
else if (strcmp (protocol_, "ipc") == 0) {

// Get rid of the file associated with the UNIX domain socket that
// may have been left behind by the previous run of the application.
::unlink (addr_);

// Convert the address into sockaddr_un structure.
int rc = resolve_local_path (&addr, &addr_len, addr_);
if (rc != 0)
return -1;

// Create a listening socket.
s = socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
return -1;

// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);

// Bind the socket to the file path.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}
has_file = true;

// Listen for incomming connections.
rc = listen (s, backlog_);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

return 0;
}
else {
errno = EPROTONOSUPPORT;
return -1;
}
}

set_address函数用一个非阻塞的 tcp/ipc socket 去监听 tcp/ipc 连接。下面是 zmq_listener_t 的process_plug函数,这个函数被 socket_base_t 的 bind 调用的 launch_child 触发,被 io_thread 调用。

1
2
3
4
5
6
void zmq::zmq_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle);
}
1
2
3
4
zmq::fd_t zmq::tcp_listener_t::get_fd ()
{
return s;
}

process_plug将正在进行监听的 tcp/ipc socket 加入轮询。下面看 zmq_listener_t 的 in_event 函数,该函数在有新连接请求时被调用(负责监听的 socket 可读)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zmq::zmq_listener_t::in_event ()
{
fd_t fd = tcp_listener.accept ();

// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;

// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create and launch an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
NULL, fd, options);
alloc_assert (init);
launch_child (init);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
zmq::fd_t zmq::tcp_listener_t::accept ()
{
zmq_assert (s != retired_fd);

// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL);

if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED))
return retired_fd;

errno_assert (sock != -1);

// Set to non-blocking mode.
int flags = fcntl (s, F_GETFL, 0);
if (flags == -1)
flags = 0;
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);

struct sockaddr *sa = (struct sockaddr*) &addr;
if (AF_UNIX != sa->sa_family) {

// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
errno_assert (rc == 0);
}

return sock;
}

zmq_listener_t 的 in_event 函数为每个新连接创建一个 zmq_init_t 进行 identity 交换。当连接双方交换完 identity 后,zmq_init_t 的 dispatch_engine 函数根据对方的 identity 创建相应的 session_t 类型(named_session_t 或 transient_session_t)。具体可见 zmq_init_t 的 dispatch_engine 函数。

tcp_listener_t 的 accept 函数在 io thread 判断 listening socket 可读时调用,但其依然对其返回值进行了判断,这是因为在 accept 之前,连接可能被客户 RST 。