epoll 的读写分离

FreeBSD 下的 kqueue 监听的单位是 (ident, filter) , Linux 下的 epoll 监听的单位是单个 fd 。在 Linux 下,通常你需要对 epoll 监听的 fd 做一些额外的记录工作,以便下次更改时查询。这里直接用 Redis 的代码做个示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// FreeBSD kqueue
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;

if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;

if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
}

从上面的代码可以看到,FreeBSD kqueue 在改变 fd 的监听事件时不需要做额外的记录工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Linux epoll
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);

ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}

从上面的代码可以看到,Linux epoll 在改变 fd 的监听事件时需要查询之前监听的事件,记录当前监听的事件。 这样 API 的调用者就无需做额外的记录工作,但读写事件需要额外的同步机制去保证线程安全。

Q3 Is the epoll file descriptor itself poll/epoll/selectable?

A3 Yes. If an epoll file descriptor has events waiting, then it will indicate as being readable.

—- epoll(7)

下面的代码利用了 epoll fd 本身在有等待事件时是可读的特性,展示了一种新的在 Linux epoll 下对 fd 读写 事件进行监听的方法。该方法分离了 fd 的读和写,这样我们就可以把 fd 的读和写交给不同的线程去处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
// initialization
int epfd = epoll_create(1);
int readfd = epoll_create(1);
int writefd = epoll_create(1);

struct epoll_event event;
event.events = EPOLLIN;

event.data.fd = readfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, readfd, &event);

event.data.fd = writefd;
epoll_ctl(epfd, EPOLL_CTL_ADD, writefd, &event);
1
2
3
4
5
// wait read event
struct epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN; // with possible other flags.
epoll_ctl(readfd, EPOLL_CTL_ADD, fd, &ev);
1
2
3
4
5
6
// wait write event
struct epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLOUT; // with possible other flags.
// No bookkeeping needed for read flags.
epoll_ctl(writefd, EPOLL_CTL_ADD, fd, &ev);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// polling
enum {
kEventsSize = 512,
};

struct epoll_event eps[2];
struct epoll_event evs[kEventsSize];

int n = epoll_wait(epfd, eps, 2, timeout);
while (n-- > 0) {
int fd = eps[n].data.fd;
int nevent = epoll_wait(fd, events, kEventsSize, 0);
if (fd == readfd) {
// read events
} else {
// write events
}
}

现在,我们有了一个 kqueue 式的 epoll 接口。 Enjoy it!