Redis源码剖析——AOF持久化

对RDB持久化的实现过程进行剖析

前面讲了RDB持久化的实现,其主要保存的是数据库中的键值对。除了RDB持久化功能之外, Redis还提供了AOF(Append Only File)持久化功能。与RDB持久化通过保存数据库中的键值对来记录数据库状态不同,AOF持久化是通过保存Redis服务器所执行的写命令来记录数据库状态的。

实现

服务器中的缓冲区

在Redis中服务器状态结构体redisServer中有一个字段是aof_buf的缓冲区,当服务器执行完一条命令以后会以协议格式将被执行的写命令追加到服务器状态的aof_buf缓冲区的末尾:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct redisServer {
...
// AOF 状态(开启/关闭/可写)
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */

// AOF 缓冲区
sds aof_buf; /* AOF buffer, written before entering the event loop */

// AOF 文件的描述符
int aof_fd; /* File descriptor of currently selected AOF file */

// AOF 的当前目标数据库
int aof_selected_db; /* Currently selected DB in AOF */
...

AOF文件的写入与同步

Redis的服务器进程就是一个事件循环(loop),这个循环中的文件事件负责接收客户端的命令请求,以及向客户端发送命令回复,而时间事件则负责执行像server_cron函数这样需要定时运行的函数。

因为服务器在处理文件事件时可能会执行写命令,使得一些内容被追加到 aof_buf缓冲区里面,所以在服务器每次结束一个事件循环之前,它都会调用flushAppendOnlyFile函数,考虑是否需要将 aof_buf缓冲区中的内容写人和保存到AOF文件里面

flushAppendOnlyFile函数的行为由服务器配置的appendfsync选项的值来决定,各个不同值产生的行为

命令写入实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];

/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed.
*
* 使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
*/
if (dictid != server.aof_selected_db) {
char seldb[64];

snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);

server.aof_selected_db = dictid;
}

...
// 省略其他命令的加入buff过程
...

/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed.
*
* 将命令追加到 AOF 缓存中,
* 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
* 并向客户端返回一个回复。
*/
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file.
*
* 如果 BGREWRITEAOF 正在进行,
* 那么我们还需要将命令追加到重写缓存中,
* 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
*/
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

// 释放
sdsfree(buf);
}

缓冲区人间写入AOF实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;

// 缓冲区中没有任何内容,直接返回
if (sdslen(server.aof_buf) == 0) return;

// 策略为每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后台进行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

// 每秒 fsync ,并且强制写入为假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

/* With this append fsync policy we do background fsyncing.
*
* 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
*
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds.
*
* 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
* (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
*/
if (sync_in_progress) {

// 有 fsync 正在后台进行 。。。

if (server.aof_flush_postponed_start == 0) {
/* No previous write postponinig, remember that we are
* postponing the flush and return.
*
* 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
* 然后就返回,不执行 write 或者 fsync
*/
server.aof_flush_postponed_start = server.unixtime;
return;

} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again.
*
* 如果之前已经因为 fsync 而推迟了 write 操作
* 但是推迟的时间不超过 2 秒,那么直接返回
* 不执行 write 或者 fsync
*/
return;

}

/* Otherwise fall trough, and go write since we can't wait
* over two seconds.
*
* 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
* 那么执行写操作(write 将被阻塞)
*/
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}

/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero.
*
* 执行到这里,程序会对 AOF 文件进行写入。
*
* 清零延迟 write 的时间记录
*/
server.aof_flush_postponed_start = 0;

/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
*
* 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
*
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike
*
* 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
* 这时就要用 redis-check-aof 程序来进行修复。
*/
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {

static time_t last_write_error_log = 0;
int can_log = 0;

/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}

/* Lof the AOF write error and record the error code. */
// 如果写入出错,那么尝试将该情况写入到日志里面
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}

// 尝试移除新追加的不完整内容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}

/* Handle the AOF write error. */
// 处理写入 AOF 文件时出现的错误
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synched on disk. */
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;

/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 写入成功,更新最后写入状态
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}

// 更新写入后的 AOF 文件大小
server.aof_current_size += nwritten;

/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary).
*
* 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
* 否则的话,释放 AOF 缓存。
*/
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
// 清空缓存中的内容,等待重用
sdsclear(server.aof_buf);
} else {
// 释放缓存
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background.
*
* 如果 no-appendfsync-on-rewrite 选项为开启状态,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
* 那么不执行 fsync
*/
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;

/* Perform the fsync if needed. */

// 总是执行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */

// 更新最后一次执行 fsnyc 的时间
server.aof_last_fsync = server.unixtime;

// 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 放到后台执行
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最后一次执行 fsync 的时间
server.aof_last_fsync = server.unixtime;
}

// 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
// 可以将代码挪到下面来
// server.aof_last_fsync = server.unixtime;
}

对于三种不同的策略其效率和安全性能是不一样的:

  • always因为在每一个事件循环中都需要将所有的内容写入AOF文件,所以其效率是最低的,但是其安全性是最高的
  • everysec是一个效率和安全性的折中。从效率上来讲, everysec模式足够快,并且就算出现故障停机,数据库也只丢失一秒钟的命令数据。
  • no的同步时间需要有操作系统控制,这样其效率比较高但是安全性就比较差了

文件载入和数据的还原

因为AOF文件里面包含了重建数据库状态所需的所有写命令,所以服务器只要读入并重新执行一遍AOF文件里面保存的写命令,就可以还原服务器关闭之前的数据库状态。

其底层主要通过loadAppendOnlyFile这个函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/* 
* 执行 AOF 文件中的命令。
*
* 出错时返回 REDIS_OK 。
*
* 出现非执行错误(比如文件长度为 0 )时返回 REDIS_ERR 。
*
* 出现致命错误时打印信息到日志,并且程序退出。
*/
int loadAppendOnlyFile(char *filename) {

// 为客户端
struct redisClient *fakeClient;

// 打开 AOF 文件
FILE *fp = fopen(filename,"r");

struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;

// 检查文件的正确性
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}

// 检查文件是否正常打开
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}

/*
* 暂时性地关闭 AOF ,防止在执行 MULTI 时,
* EXEC 命令被传播到正在打开的 AOF 文件中。
*/
server.aof_state = REDIS_AOF_OFF;

fakeClient = createFakeClient();

// 设置服务器的状态为:正在载入
// startLoading 定义于 rdb.c
startLoading(fp);

...
// 省略载入过程
...

/*
* 如果能执行到这里,说明 AOF 文件的全部内容都可以正确地读取,
* 但是,还要检查 AOF 是否包含未正确结束的事务
*/
if (fakeClient->flags & REDIS_MULTI) goto readerr;

// 关闭 AOF 文件
fclose(fp);
// 释放伪客户端
freeFakeClient(fakeClient);
// 复原 AOF 状态
server.aof_state = old_aof_state;
// 停止载入
stopLoading();
// 更新服务器状态中, AOF 文件的当前大小
aofUpdateCurrentSize();
// 记录前一次重写时的大小
server.aof_rewrite_base_size = server.aof_current_size;

return REDIS_OK;

AOF重写

因为AOF持久化是通过保存被执行的写命令来记录数据库状态的,所以随着服务器运行时间的流逝,AOF文件中的内容会越来越多,文件的体积也会越来越大,如果不加以控制的话,体积过大的AOF文件很可能对 Redis服务器、甚至整个宿主计算机造成影响,并且AOF文件的体积越大,便用AOF文件来进行数据还原所需的时间就越多。

为了解决AOF文件体积膨胀的问题, Redis提供了AOF文件重写( rewrite)功能。通过该功能, Redis服务器可以创建一个新的AOF文件来替代现有的AOF文件,新旧两个AOF文件所保存的数据库状态相同,但新AOF文件不会包含任何浪费空间的冗余命令,所以新AOF文件的体积通常会比旧AOF文件的体积要小得多。

AOF重写实现

AOF重写部分的主要实现(遍历所有的键,通过命令进行插入)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
int rewriteAppendOnlyFile(char *filename) {

...

/* Iterate this DB writing every entry
*
* 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
*/
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;

// 取出键
keystr = dictGetKey(de);

// 取出值
o = dictGetVal(de);
initStaticStringObject(key,keystr);

// 取出过期时间
expiretime = getExpire(db,&key);

/* If this key is already expired skip it
*
* 如果键已经过期,那么跳过它,不保存
*/
if (expiretime != -1 && expiretime < now) continue;

/* Save the key and associated value
*
* 根据值的类型,选择适当的命令来保存值
*/
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
} else if (o->type == REDIS_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
} else {
redisPanic("Unknown object type");
}

/* Save the expire time
*
* 保存键的过期时间
*/
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";

// 写入 PEXPIREAT expiretime 命令
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}

...

因为新的AOF文件只包含还原当前数据库所必须的命令,所以新AOF文件不会浪费任何空间。