redis(六) AOF日志详解
之前已经大致的讲解过aof的整体执行流程.
主要有四点:
- 命令追加
- 文件写入
- 日志重写
- 日志重载.
本文会根据以上四点,分析源码来了解redis具体的执行过程,以更好的理解机制.
命令追加
当AOF持久化功能处于打开状态时,服务器在执行完一个写命令之后,会以协议格式将被执行的写命令追加到服务器状态的aof_buf缓冲区的末尾.
追加时机:
- 客户端命令
- 过期健删除,内存淘汰等发生时,会删除对应的key,也会把删除命令通过propagate函数传播到AOF中
- 事务追加传播命令时,会在头和尾加上”multi”和”exec”
流程图

call(接收客户端命令) – server.c
void call(client *c, int flags) {
/** 省略其他逻辑*/
/** */
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
/** 省略其他逻辑 */
/** 当需要同步或者aof日志保存时,调用propagate方法 */
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}
propagate(传播命令) - -server.c
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
/** aof日志配置开启,则追加命令到aof日志 */
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
feedAppendOnlyFile() – aof.c
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/** 选择正确的db */
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
/** 如果命令中带有过期时间,把相对时间转化成绝对时间 */
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
/** 按照redis的协议对命令编码 */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
/** 把命令追加到aof缓存中
在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
* 并向客户端返回一个回复*/
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
/** 如果存在aof重写子进程,
将命令追加到重写缓存中,
* 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。 */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
/** 释放缓存文件持有 */
sdsfree(buf);
}
文件写入
讲缓存中的命令刷入磁盘中.
写入时机:
服务关闭前
sleep事件前
定时查询是否满足同步机制即serverCron函数调用flushAppendOnlyFile函数.
flushAppendOnlyFile函数的行为由服务器配置的appendfsync选项的值决定
- always:将aof_buf中的所有内容写入并同步到aof文件。
- everysec:将aof_buf中的所有内容写入到aof文件,如果上次同步的时间距离现在超过1s,那么对aof文件进行同 步,同步操作由一个线程专门负责执行。
- no:将aof_buf中的所有内容写入到aof文件,但不对aof文件同步,同步有操作系统执行。
flushAppendOnlyFile(刷入文件) – aof.c
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
/** 判断aof_buf缓存是否为空,但怕还是有缓存数据到打aof_buf中,所以需要再进行刷入文件操作. */
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
/** 判断fsync处于每秒同步,则获取是否存在flush进程状态 */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
/** 当fsync处于每秒刷新时且后台执行(force=0)flush写入 */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
/** 当前还有正在执行的flush的后台线程,则进行延时等待执行 */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
/** 正有异步线程在fsync操作时需要等2秒钟才进行写磁盘操作*/
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
/** 把aof_buf数据写入aof_fd中 */
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
/** aof_buf的数据没有完全到文件缓冲 */
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/** 每隔30秒打印一次错误日志 */
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
/** 如果写入出错,那么尝试将该情况写入到日志里面 */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
/** 尝试移除新追加的不完整内容 */
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
/**处理写入 AOF 文件时出现的错误 */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
/** 写入成功,更新最后写入状态 */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
/**更新写入后的 AOF 文件大小*/
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
/**如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
* 否则的话,释放 AOF 缓存。*/
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
/** 清空缓存中的内容,等待重用 */
sdsclear(server.aof_buf);
} else {
/** 释放缓存 */
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
/** 如果 no-appendfsync-on-rewrite 选项为开启状态,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
* 那么不执行 fsync */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
/**更新aof的同步指针 */
server.aof_fsync_offset = server.aof_current_size;
/**更新最后一次执行 fsnyc 的时间*/
server.aof_last_fsync = server.unixtime;
/** 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒*/
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
/** 后台执行flash*/
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
/** 更新最后一次执行 fsync 的时间 */
server.aof_last_fsync = server.unixtime;
}
}
rewriteAppendOnlyFileBackground(后台执行写入) –aof.c
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
/** 已经有进程在进行 AOF或者rdb 重写了, 返回异常*/
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
/** 创建子进程失败,返回异常 */
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
/** fork主进程 */
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
/** 关闭无用的资源*/
closeClildUnusedResourceAfterFork();
/** 设置进程名称*/
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
/** 创建临时文件, 并进行 AOF 重写 */
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_AOF);
/**发送重写成功信号*/
exitFromChild(0);
} else {
/**发送重写失败信号*/
exitFromChild(1);
}
} else {
/* Parent */
/**记录执行 fork 所消耗的时间*/
server.stat_fork_time = ustime()-start;
/** 记录执行fork内存拷贝速度,gb/s */
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
/**记录 AOF 重写的信息*/
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
/**关闭字典自动 rehash*/
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
/**将 aof_selected_db 设为 -1 ,
* 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
* 从而确保之后新添加的命令会设置到正确的数据库中*/
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return C_OK;
}
return C_OK; /* unreached */
}
日志重写
aof是通过不断追加写命令来记录数据库状态,所以服务器执行比较久之后,aof文件中的内容会越来越多,磁盘占有量越来越大,同时也是使通过过aof文件还原数据库的需要的时间也变得很久。所以就需要通过读取服务器当前的数据库状态来重写新的aof文件。
redis设置了AOF重写缓冲区,在创建子进程后,主进程每执行一个写命令都会写到重写缓冲区。在子进程完成重写后,主进程会将AOF重写缓冲区的数据写入到重写的AOF文件,保证数据状态的一致。
重写时机: 定时任务扫描到aof日志文件过大时.
serverCron(判断重写条件) – server.c
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/** ... */
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
/** .... */
} else {
/** ... */
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
}