Redis源码剖析——RDB持久化

对RDB实现持久化的过程进行剖析

持久化概述

持久化

因为Redis数据库是基于内存的Key-Value型数据库,他的所有数据库状态都存储在内存里面,如果服务器进程退出,那么存放在内存中的所有Redis数据库的状态都将消失。为了避免这种情况发生,就需要一种将数据库状态从内存转移到磁盘中的技术。

持久化就是这样一种将数据库的状态从内存存储到磁盘中的技术,Redis提供两种持久化:RDB持久化和AOF持久化。

RDB持久化

Redis提供了RDB持久化功能,这个功能可以将 Redis在内存中的数据库状态保存到磁盘里面,避免数据意外丢失。RDB持久化功能所生成的RDB文件是一个经过压缩的二进制文件,通过该文件可以还原生成RDB文件时的数据库状态。

RDB命令

Redis中生成RDB文件有两个命令:SAVE和BGSAVE。

SAVE命令会阻塞Redis服务器进程,直到RDB文件创建完毕为止,在服务器进程阻塞期间,服务器不能处理任何命令请求;BGSAVE命令会派生出一个子进程,然后由子进程负责创建RDB文件,服务器进程(父进程)继续处理命令请求

由此可见BGSAVE的实现是在SAVE的基础上进行的。

RDB文件的载入工作是在服务器启动时自动执行的,所以Redis并没有专门用于载入RDB文件的命令,只要 Redis服务器在启动时检测到RDB文件存在,它就会自动载入RDB文件。

生成RDB文件实现

在讲生成RDB文件前首先要讲一个Redis中对I/O进行封装的一个结构体:rio,这个结构体在基本I/O的基础上加了一些计算校验和,获取文件指针所在偏移量等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
struct _rio {

/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
// API
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);

/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
// 校验和计算函数,每次有写入/读取新数据时都要计算一次
void (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum */
// 当前校验和
uint64_t cksum;

/* number of bytes read or written */
size_t processed_bytes;

/* maximum single read or write chunk size */
size_t max_processing_chunk;

/* Backend-specific vars. */
union {

struct {
// 缓存指针
sds ptr;
// 偏移量
off_t pos;
} buffer;

struct {
// 被打开文件的指针
FILE *fp;
// 最近一次 fsync() 以来,写入的字节量
off_t buffered; /* Bytes written since last fsync. */
// 写入多少字节之后,才会自动执行一次 fsync()
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};

SAVE的实现就是依靠rio的,其所有的I/O都是通过rio进行的。SAVE实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;

// 创建临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}

// 初始化 I/O
rioInitWithFile(&rdb,fp);

// 设置校验和函数
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;

// 写入 RDB 版本号
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;

// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {

// 指向数据库
redisDb *db = server.db+j;

// 指向数据库键空间
dict *d = db->dict;

// 跳过空数据库
if (dictSize(d) == 0) continue;

// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}

/* Write the SELECT DB opcode
*
* 写入 DB 选择器
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;

/* Iterate this DB writing every entry
*
* 遍历数据库,并写入每个键值对的数据
*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;

// 根据 keystr ,在栈中创建一个 key 对象
initStaticStringObject(key,keystr);

// 获取键的过期时间
expire = getExpire(db,&key);

// 保存键值对数据
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */

/* EOF opcode
*
* 写入 EOF 代码
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case.
*
* CRC64 校验和。
*
* 如果校验和功能已关闭,那么 rdb.cksum 将为 0 ,
* 在这种情况下, RDB 载入时会跳过校验和检查。
*/
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);

/* Make sure data will not remain on the OS's output buffers */
// 冲洗缓存,确保数据已写入磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok.
*
* 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。
*/
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}

// 写入完成,打印日志
redisLog(REDIS_NOTICE,"DB saved on disk");

// 清零数据库脏状态
server.dirty = 0;

// 记录最后一次完成 SAVE 的时间
server.lastsave = time(NULL);

// 记录最后一次执行 SAVE 的状态
server.lastbgsave_status = REDIS_OK;

return REDIS_OK;

werr:
// 关闭文件
fclose(fp);
// 删除文件
unlink(tmpfile);

redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));

if (di) dictReleaseIterator(di);

return REDIS_ERR;
}

从代码逻辑上看,SAVE的实现还是比较简单的,首先写入一些Redis版本信息,然后将数据库中的所有数据写入文件,最后加上一个校验和。

BESAVE在SAVE的基础上还要实现一些新进程创建等任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;

// 如果 BGSAVE 已经在执行,那么出错
if (server.rdb_child_pid != -1) return REDIS_ERR;

// 记录 BGSAVE 执行前的数据库被修改次数
server.dirty_before_bgsave = server.dirty;

// 最近一次尝试执行 BGSAVE 的时间
server.lastbgsave_try = time(NULL);

// fork() 开始前的时间,记录 fork() 返回耗时用
start = ustime();

if ((childpid = fork()) == 0) {
int retval;

/* Child */

// 关闭网络连接 fd
closeListeningSockets(0);

// 设置进程的标题,方便识别
redisSetProcTitle("redis-rdb-bgsave");

// 执行保存操作
retval = rdbSave(filename);

// 打印 copy-on-write 时使用的内存数
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();

if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}

// 向父进程发送信号
exitFromChild((retval == REDIS_OK) ? 0 : 1);

} else {

/* Parent */

// 计算 fork() 执行的时间
server.stat_fork_time = ustime()-start;

// 如果 fork() 出错,那么报告错误
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}

// 打印 BGSAVE 开始的日志
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);

// 记录数据库开始 BGSAVE 的时间
server.rdb_save_time_start = time(NULL);

// 记录负责执行 BGSAVE 的子进程 ID
server.rdb_child_pid = childpid;

// 关闭自动 rehash
updateDictResizePolicy();

return REDIS_OK;
}

return REDIS_OK; /* unreached */
}

RDB文件载入实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
int rdbLoad(char *filename) {
uint32_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
long long expiretime, now = mstime();
FILE *fp;
rio rdb;

// 打开 rdb 文件
if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;

// 初始化写入流
rioInitWithFile(&rdb,fp);
rdb.update_cksum = rdbLoadProgressCallback;
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';

// 检查版本号
if (memcmp(buf,"REDIS",5) != 0) {
fclose(fp);
redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL;
return REDIS_ERR;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
fclose(fp);
redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL;
return REDIS_ERR;
}

// 将服务器状态调整到开始载入状态
startLoading(fp);
while(1) {
robj *key, *val;
expiretime = -1;

/* Read type.
*
* 读入类型指示,决定该如何读入之后跟着的数据。
*
* 这个指示可以是 rdb.h 中定义的所有以
* REDIS_RDB_TYPE_* 为前缀的常量的其中一个
* 或者所有以 REDIS_RDB_OPCODE_* 为前缀的常量的其中一个
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;

// 读入过期时间值
if (type == REDIS_RDB_OPCODE_EXPIRETIME) {

// 以秒计算的过期时间

if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;

/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;

/* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds.
*
* 将格式转换为毫秒*/
expiretime *= 1000;
} else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {

// 以毫秒计算的过期时间

/* Milliseconds precision expire times introduced with RDB
* version 3. */
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;

/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
}

// 读入数据 EOF (不是 rdb 文件的 EOF)
if (type == REDIS_RDB_OPCODE_EOF)
break;

/* Handle SELECT DB opcode as a special case
*
* 读入切换数据库指示
*/
if (type == REDIS_RDB_OPCODE_SELECTDB) {

// 读入数据库号码
if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
goto eoferr;

// 检查数据库号码的正确性
if (dbid >= (unsigned)server.dbnum) {
redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
exit(1);
}

// 在程序内容切换数据库
db = server.db+dbid;

// 跳过
continue;
}

/* Read key
*
* 读入键
*/
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;

/* Read value
*
* 读入值
*/
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;

/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave.
*
* 如果服务器为主节点的话,
* 那么在键已经过期的时候,不再将它们关联到数据库中去
*/
if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
// 跳过
continue;
}

/* Add the new object in the hash table
*
* 将键值对关联到数据库中
*/
dbAdd(db,key,val);

/* Set the expire time if needed
*
* 设置过期时间
*/
if (expiretime != -1) setExpire(db,key,expiretime);

decrRefCount(key);
}

/* Verify the checksum if RDB version is >= 5
*
* 如果 RDB 版本 >= 5 ,那么比对校验和
*/
if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum;

// 读入文件的校验和
if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
memrev64ifbe(&cksum);

// 比对校验和
if (cksum == 0) {
redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
exit(1);
}
}

// 关闭 RDB
fclose(fp);

// 服务器从载入状态中退出
stopLoading();

return REDIS_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
}

RDB文件结构

从SAVE命令的实现过程,我们能够知道一个RDB文件总体结构可以这样划分:

在数据库数据部分其最基本的元素是键值对,存储在RDB的数据又可以分含有过期键的键值对和没有过期键的键值对两种

没有过期键的键值对组织形式:

含有过期键的键值对组织形式: