zeromq atomic msg

先引用一下,0mq manual 的话: 0MQ ensures atomic delivery of messages; peers shall receive either all message parts of a message or none at all.

在 0mq 中最终数据的发送和接收是由 encoder_t 和 decoder_t 负责包装。他们都是由 io 线程调用,从应用发送 zmq_msg_t 的管道中取得数据。

1
typedef ypipe_t < zmq_msg_t, message_pipe_granularity > pipe_t;
1
2
3
4
5
6
//  Lock-free queue implementation.
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
// N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//  Write an item to the pipe.  Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();

// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}

write 函数的 incomplete_ 参数由外部的包装函数根据 ZMQ_SNDMORE 标志决定。只有当 multipart message 的最后一部分写入管道时,f 才会更新。当 encoder_t 读取 zmq_msg_t ,准备发送时,他不会读到一个未全部写完的 multipart message。decoder_t 从网络上接收数据后,在未读到最好一个部分时不会更新 f。

其实 ypipe_t 是个挺有意思的两端可以同时允许一个线程读、一个线程写的 lock-free 的数据结构实现。

没什么好写的啊,看代码就OVER了。凑数。