Recording

Commit volatile memory to persistent append-only log

0%

学习一门新的语言时,我觉得最重要的是学习该语言的惯例用法(idioms),即其编程范式。

很遗憾的是 C++ 是一门日渐进化并且日趋复杂的语言,是一个具有多重编程范型的语言,被视为一个语言联邦1

即使这样,我认为在日常 C++ 编程时,也应该选择一个相对固定的编程模型,实验并改进之。

在用 C++ 编程时,应该尽量保持谦卑、进取与怀疑。

下面主要记录一下我写 C++ 的不长的时间里的一些经验。

避免编译时依赖。

C++ 没有现代的 package, import 机制,而是使用了继承自 C 的 #include 头文件包含机制。C++ 的复杂和 C 的编译模型导致了 C++ 项目冗长的编译时间。在其他语言中 import/require 可以解决的问题,在 C++ 中必须要程序员自己付出努力。[2]

隐式 (implicit) 的编译依赖不像头文件依赖会导致编译期的错误,却会在运行期招致不可预估的致命错误。比如,当你在编译单元 CompilationUnitA 中 p = new(CompilationUnitB::ClassB) 时,你已经埋下了一颗地雷。以后当你改变 CompilationUnitB::ClassB 的成员而导致其大小 (size) 变化时,你编译 CompilationUnitB ,链接 CompilationUnitA ,运行,然后不知何时那颗地雷就会炸掉。我觉得就 new 的编译期行为,C++ 能做的更好,也应该做的更好。好吧,永远不要跨编译单元调用 new

虚函数调用、跨编译单元的栈上变量/全局变量同样是隐式的编译依赖。

跨编译单元(.so, .dll, .lib)时,区分接口与实现;同一编译单元内,不过度区分接口与实现。

Read more »

封装是 C++ 面向对象三大特性之一。C++ 通过将数据和对数据操作的函数绑定在一个类里来表达封装。从字面意义和 C++ 的历史(C with class)来看,封装对于 C++ 来说是根本的。

《代码大全2》6.2节 中提到了一个良好的封装应该“避免把私有的实现细节放入类的接口中”,并介绍了 《Effective C++》 中提到的 pImpl 惯用技法。

在我看来 C++ 中类的实现方式明显无法很好的表达封装,只是粗暴的将函数及其操作的数据绑在一起,而忽视了可见性的控制。C++ 对运行时性能苛求可能是其罪魁祸首。

实际上我认为封装从根本上就是一个错误的观点,数据和对数据操作的函数从来就不是一个整体。

好吧,本文要说只是另一种定义 C++ 类的方式,类似 Objective-C 类的 interface/implementation 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// foo_interface.hpp
#include <boost/noncopyable.hpp>

namespace foo {

class Interface : boost::noncopyable {
public:
int Bar();
};

}

// foo.hpp
#include "foo_interface.hpp"

#include <boost/shared_ptr.hpp>

namespace foo {

typedef boost::shared_ptr<Interface> Reference;

Reference New();

}

// foo.cpp
#include "foo.hpp"

namespace foo {

class implementation : public Interface {
public:
int Bar() {
// ...
return bar0();
}

implementation() {
// ...
}

private:
int priv_data0;
// ...

int bar0() {
// ...
}

};

int Interface::Bar() {
BOOST_AUTO(impl, static_cast<implementation>(this));
return impl->Bar();
}

Reference New() {
return Reference(new implementation());
}

}

foo::Interface 中只包含 foo 对外提供的接口函数,没有数据成员。

Read more »

先引用一下,0mq manual 的话:
0MQ ensures atomic delivery of messages; peers shall receive either all message parts of a message or none at all.

在 0mq 中最终数据的发送和接收是由 encoder_t 和 decoder_t 负责包装。他们都是由 io 线程调用,从应用发送 zmq_msg_t 的管道中取得数据。

1
typedef ypipe_t < zmq_msg_t, message_pipe_granularity > pipe_t;
1
2
3
4
5
6
//  Lock-free queue implementation.
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
// N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//  Write an item to the pipe.  Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();

// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}

write 函数的 incomplete_ 参数由外部的包装函数根据 ZMQ_SNDMORE 标志决定。只有当 multipart message 的最后一部分写入管道时,f 才会更新。当 encoder_t 读取 zmq_msg_t ,准备发送时,他不会读到一个未全部写完的 multipart message。decoder_t 从网络上接收数据后,在未读到最好一个部分时不会更新 f。

其实 ypipe_t 是个挺有意思的两端可以同时允许一个线程读、一个线程写的 lock-free 的数据结构实现。

没什么好写的啊,看代码就OVER了。凑数。

Read more »

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Virtual interface to be exposed by object that want to be notified
// about events on file descriptors.

struct i_poll_events
{
virtual ~i_poll_events () {}

// Called by I/O thread when file descriptor is ready for reading.
virtual void in_event () = 0;

// Called by I/O thread when file descriptor is ready for writing.
virtual void out_event () = 0;

// Called when timer expires.
virtual void timer_event (int id_) = 0;
};

0mq 中实现(或部分实现)了这些接口的类包括: io_thread_t, reaper_t, zmq_connect_t, zmq_listener_t, zmq_engine_t等。

Read more »

这里只看tcp/ipc部分,下面代码摘自socket_base_t的bind函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (protocol == "tcp" || protocol == "ipc") {

// Choose I/O thread to run the listerner in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}

// Create and run the listener.
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
io_thread, this, options);
alloc_assert (listener);
int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
delete listener;
return -1;
}
launch_child (listener);

return 0;
}

bind创建一个zmq_listener_t挂到socket上去了。zmq_listener_t在其set_address函数中就开始监听tcp/ipc连接。

1
2
3
4
5
int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_)
{
return tcp_listener.set_address (protocol_, addr_, options.backlog);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
int backlog_)
{
if (strcmp (protocol_, "tcp") == 0 ) {

// Resolve the sockaddr to bind to.
int rc = resolve_ip_interface (&addr, &addr_len, addr_);
if (rc != 0)
return -1;

// Create a listening socket.
s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;

// Allow reusing of the address.
int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);

// Set the non-blocking flag.
flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);

// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

// Listen for incomming connections.
rc = listen (s, backlog_);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

return 0;
}
else if (strcmp (protocol_, "ipc") == 0) {

// Get rid of the file associated with the UNIX domain socket that
// may have been left behind by the previous run of the application.
::unlink (addr_);

// Convert the address into sockaddr_un structure.
int rc = resolve_local_path (&addr, &addr_len, addr_);
if (rc != 0)
return -1;

// Create a listening socket.
s = socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
return -1;

// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);

// Bind the socket to the file path.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}
has_file = true;

// Listen for incomming connections.
rc = listen (s, backlog_);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}

return 0;
}
else {
errno = EPROTONOSUPPORT;
return -1;
}
}

set_address函数用一个非阻塞的 tcp/ipc socket 去监听 tcp/ipc 连接。下面是 zmq_listener_t 的process_plug函数,这个函数被 socket_base_t 的 bind 调用的 launch_child 触发,被 io_thread 调用。

1
2
3
4
5
6
void zmq::zmq_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle);
}
1
2
3
4
zmq::fd_t zmq::tcp_listener_t::get_fd ()
{
return s;
}

process_plug将正在进行监听的 tcp/ipc socket 加入轮询。下面看 zmq_listener_t 的 in_event 函数,该函数在有新连接请求时被调用(负责监听的 socket 可读)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zmq::zmq_listener_t::in_event ()
{
fd_t fd = tcp_listener.accept ();

// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;

// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);

// Create and launch an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
NULL, fd, options);
alloc_assert (init);
launch_child (init);
}
Read more »