Redis源码剖析——事件

对RDB处理事件的的过程实现进行分析

概述

Redis服务器是一个事件驱动程序,服务器需要处理以下两类事件

  • 文件事件( file event): Redis服务器通过套接字与客户端(或者其他 Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象。服务器与客户端(或者其他服务器)的通信会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作。
  • 时间事件( time event): Redis服务器中的一些操作需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器( fle event handler)

  • 文件事件处理器使用I/o多路复用( multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器
  • 当被监听的套接字准备好执行连接应答( accept)、读取(read)、写人( wrte)关闭( close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件

虽然文件事件处理器以单线程方式运行,但通过使用I/O多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型,又可以很好地与 Redis服务器中其他同样以单线程方式运行的模块进行对接,这保持了 Redis内部单线程设计的简单性。

事件结构定义

在Redis中事件结构体的定义如下:

1
2
3
4
5
6
typedef struct aeFileEvent {
int mask; // 读or写标记
aeFileProc *rfileProc; // 读处理函数
aeFileProc *wfileProc; // 写处理函数
void *clientData; // 私有数据
} aeFileEvent;

事件的创建和删除

针对事件的创建和删除的API有:

1
2
3
4
5
6
7
// 创建文件事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
// 删除文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
// 根据文件描述符获取文件事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);

这些接口的实现都比较简单,就是在eventLoop这个事件池中创建(删除)指定属性的事件

需要使用到事件的创建的地方有两个:

  • 一个是在初始化服务器的时候,需要添加一个对应套接字描述符的监听套接字来监听新的客户端连接
  • 新的客户端连接的时候,需要添加一个文件事件来监听这个客户端的请求

I/O多路复用

在Linux/Unix中实现I/O多路复用的方法有非常多,大致有select、 epoll、 export和 kqueue这些IO多路复用函数库来实现的

各种实现的性能也是不一样的,之前我写了一篇博客对比了三种I/O多路复用

在Redis中,其会根据具体底层操作系统的不同自动选择系统中性能最高的I/O多路复用函数库来作为 Redis的I/O多路复用程序的底层实现(从程序中看,其性能的排行应该是evport > epoll > kqueue > select ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

各种不同的I/O多路复用库的使用方式是不一样的,所以Redis对功能进行了统一的封装,方便在不同的环境下的使用:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建,初始化
static int aeApiCreate(aeEventLoop *eventLoop);
// 改变能够监听事件的大小值
static int aeApiResize(aeEventLoop *eventLoop, int setsize);
// 清空
static void aeApiFree(aeEventLoop *eventLoop);
// 添加监听事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
// 删除监听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
// 取出已经就绪的文件描述符
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

下面以我比较熟悉的epoll为例查看封装的实现:

首先定义一个ae状态结构体,事实上就是epoll的文件描述符和一个获取监听事件中就绪文件描述符的文件表

1
2
3
4
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

创建的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 监听指定大小的事件数量
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建epoll
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 指定数据
eventLoop->apidata = state;
return 0;
}

添加监听事件过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
// 根据时间的mask来决定监听读or写就绪
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
// 添加监听事件到内核中
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

文件事件的处理

I/O多路复用接收到了就绪的事件的时候,就需要对事件进行处理,通过文件事件分派器来分派给不同的文件事件处理器,具体需要处理的文件事件类型如下:

  • 为了对连接服务器的各个客户端进行应答,服务器要为监听套接字关联连接应答处理器。
  • 为了接收客户端传来的命令请求,服务器要为客户端套接字关联命令请求处理器。
  • 为了向客户端返回命令的执行结果,服务器要为客户端套接字关联命令回复处理器。
  • 当主服务器和从服务器进行复制操作时,主从服务器都需要关联特别为复制功能编写的复制处理器。

值得注意的是连接应答处理时,需要新添加一个监听事件

连接应答处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
// accept 客户端连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 为客户端创建客户端状态(redisClient)
acceptCommonHandler(cfd,0);
}
}

通过Redis对上面几种事件的应答处理,我们可以得出客户端和服务端的通信模型如下:

时间事件

Redis的时间事件分为以下两类:

  • 定时事件:让一段程序在指定的时间之后执行一次。比如说,让程序X在当前时间的30毫秒之后执行一次。
  • 周期性事件:让一段程序每隔指定时间就执行一次。比如说,让程序Y每隔30毫秒就执行一次。

事件结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct aeTimeEvent {
// 时间事件的唯一标识符
long long id; /* time event identifier. */
// 事件的到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数
aeTimeProc *timeProc;
// 事件释放函数
aeEventFinalizerProc *finalizerProc;
// 多路复用库的私有数据
void *clientData;
// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;
} aeTimeEvent;

服务器将所有时间事件都放在一个无序链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已到达的时间事件,并调用相应的事件处理器。

1
2
3
4
5
6
7
8
9
10
11
12
// 已就绪事件
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;

时间事件相关API

时间事件相关API如下:

1
2
3
4
5
6
7
8
9
10
// 创建时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
// 删除时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
// 时间事件的执行器
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// 返回最近的时间事件
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop);

创建和删除时间事件的实现都比较简单,相当于构造和析构函数,我们先看看时间事件执行器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
// 通过重置事件的运行时间,
// 防止因时间穿插(skew)而造成的事件处理混乱
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次处理时间事件的时间
eventLoop->lastTime = now;
// 遍历链表
// 执行那些已经到达的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
// 跳过无效事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 获取当前时间
aeGetTime(&now_sec, &now_ms);
// 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 执行事件处理器,并获取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 记录是否有需要循环执行这个事件时间
if (retval != AE_NOMORE) {
// 是的, retval 毫秒之后继续执行这个时间事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 不,将这个事件删除
aeDeleteTimeEvent(eventLoop, id);
}
// 因为执行事件之后,事件列表可能已经被改变了
// 因此需要将 te 放回表头,继续开始执行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}

其总体的思想是:遍历所有已到达的时间事件并调用这些事件的处理器。已到达指的是,时间事件的when属性记录的UNIX时间截等于或小于当前时间的UNIX时间戳。

aeSearchNearestTimer返回目前时间最近的时间事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 寻找里目前时间最近的时间事件
// 因为链表是乱序的,所以查找复杂度为 O(N)
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}

时间事件的处理

时间事件的主要处理应用在serverCron中,其函数的主要工作有:

  • 更新服务器的各类统计信息,比如时间、内存占用、数据库占用等
  • 清理数据库中的过期键值对
  • 关闭和清理连接失效的客户端
  • 尝试进行AOF和RDB持久化操作
  • 如果是主服务器,就对从服务器进行定期同步
  • 如果是集群模式,对集群进行定期同步和连接测试

事件循环

时间事件和文件事件都在一个事件循环结构体中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;

在加入事件到进行处理事件中间的环节就是事件循环了,其调用的是aeMain函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件处理前执行的函数,那么运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

可以看到,当服务器开始运行的时候,事件循环就不停运行,其事件处理函数aeProcessEvents实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}
// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

其主题的逻辑如下:

  • 查找最早的时间事件,判断是否需要执行,如需要,就标记下来,等待处理,并确定后面处理文件事件的阻塞时间
  • 获取已准备好的文件事件描述符集
  • 优先处理读事件
  • 处理写事件
  • 如有时间事件,就处理时间事件

小结

通过对Redis的时间事件和文件事件的解析,能够了解Redis客户端和服务端交互的基本过程,同时也能够了解到Redis是单线程的,整个事件循环是串行的