中间件 七月 10, 2022

redis (五) RDB日志详解

文章字数 27k 阅读约需 25 mins. 阅读次数 0

redis (五) RDB日志详解

本文中的源码来自 Redis 5.0 ,RDB持久化过程的相关源码都在 rdb.c 文件中.

整体流程

rbd持久化(详细)

​ 上图表明了三种触发 RDB 持久化的手段之间的整体关系。通过 serverCron 自动触发的 RDB 相当于直接调用了 bgsave 指令的流程进行处理。而 bgsave 的处理流程启动子进程后,调用了 save 指令的处理流程。

各流程详解

serverCron(自动触发持久化逻辑)

redisServer 结构体的 save_params指向拥有三个值的数组,该数组的值与 redis.conf 文件中 save 配置项一一对应。dirty 记录着有多少键值发生变化, lastsave记录着上次 RDB 持久化的时间。

serverCron 函数就是遍历该数组的值,检查当前 Redis 状态是否符合触发 RDB 持久化的条件,比如说距离上次 RDB 持久化过去了 900 秒并且有至少一条数据发生变更。

rdb涉及到的参数

struct redisServer {
    /** 省略其他参数 */
     /* RDB persistence */
    long long dirty;                /* Changes to DB from the last save */
    long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */
    pid_t rdb_child_pid;            /* PID of RDB saving child */
    struct saveparam *saveparams;   /* Save points array for RDB */
    int saveparamslen;              /* Number of saving points */
    char *rdb_filename;             /* Name of RDB file */
    int rdb_compression;            /* Use compression in RDB? */
    int rdb_checksum;               /* Use RDB checksum? */
    time_t lastsave;                /* Unix time of last successful save */
    time_t lastbgsave_try;          /* Unix time of last attempted bgsave */
    time_t rdb_save_time_last;      /* Time used by last RDB save run. */
    time_t rdb_save_time_start;     /* Current RDB save start time. */
    int rdb_bgsave_scheduled;       /* BGSAVE when possible if true. */
    int rdb_child_type;             /* Type of save by active child. */
    int lastbgsave_status;          /* C_OK or C_ERR */
    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */
    int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
    int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
    /* Pipe and data structures for child -> parent info sharing. */
    int child_info_pipe[2];         /* Pipe used to write the child_info_data. */
    struct {
        int process_type;           /* AOF or RDB child? */
        size_t cow_size;            /* Copy on write size. */
        unsigned long long magic;   /* Magic value to make sure data is valid. */
    } child_info_data;
}

serverCron函数相关逻辑-server.c


int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
   /* 省略其他逻辑 */
    /* 检查后台是否正在进行 rdb 或者 aof 操作 */
    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) {
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now. */
        /** 遍历每一个rdb 保存条件*/
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
            /** 如果数据保存记录 大于规定的修改次数 且距离 上一次保存的时间大于规定时间或者上次BGSAVE命令执行成功,
            才执行 BGSAVE 操作*/
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                /** 记录日志*/
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                /** 异步保存*/
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
    /* 省略其他逻辑 */
}

bgsaveCommand(子进程后台执行rdb持久化检查状态)

执行 bgsave 指令时,Redis 会先触发 bgsaveCommand 进行当前状态检查,然后才会调用 rdbSaveBackground.

bgsaveCommand相关逻辑 – rdb.c


/* BGSAVE [SCHEDULE] */
void bgsaveCommand(client *c) {
    int schedule = 0;

    /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
     * is in progress. Instead of returning an error a BGSAVE gets scheduled. */
    if (c->argc > 1) {
        if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
            schedule = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    rdbSaveInfo rsi, *rsiptr;
    rsiptr = rdbPopulateSaveInfo(&rsi);

    
    if (server.rdb_child_pid != -1) {
        /** 是否存在rdb持久化的进程id, 若进程id不为-1,则返回,并记录异常文案*/
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {        
        /** 是否存在aof持久化的进程id, 若进程id不为-1,怎判断是否存在定时任务参数*/
        if (schedule) {
            /** 若存在定时任务,则打上定时保存标记,并直接返回,记录状态日志 */
            server.rdb_bgsave_scheduled = 1;
            addReplyStatus(c,"Background saving scheduled");
        } else {
            /** 若干不存在定时任务,则返回,并记录异常文案 */
            addReplyError(c,
                "An AOF log rewriting in progress: can't BGSAVE right now. "
                "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
                "possible.");
        }
    } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {/**若rdb与aof都没有进行,则调用rdbSaveBackground方法来fork子进程处理rdb持久化,并返回,记录状态日志. */
        addReplyStatus(c,"Background saving started");
    } else {
        /** 记录其他异常日志*/
        addReply(c,shared.err);
    }
}

rdbSaveBackground(fork子进程)

rdbSaveBackground 函数中最主要的工作就是调用 fork 命令生成子流程,然后在子流程中执行 rdbSave函数,也就是 save 指令最终会触发的函数。

根据调用方(父进程或者子进程)不同,有两种不同的执行逻辑。

  • 如果调用方是父进程,则fork出子进程,保存子进程信息后直接返回。
  • 如果调用方是子进程则调用rdbSave执行RDB持久化逻辑,持久化完成后退出子进程。

父进程内存占用过大,fork过程会比较耗时,在这个过程中父进程无法对外提供服务;fork的过程会拷贝一份与主进程一样的内存,所以redis内存空间会翻倍,需要注意剩余内存的大小.

info stats命令查看latest_fork_usec选项,可以获取最近一个fork以操作的耗时。

rdbSaveBackground函数相关逻辑–rdb.c


int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;
    /**检查后台是否正在执行 aof 或者 rdb 操作*/
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    /**拿出 数据保存记录,保存为 上次记录*/
    server.dirty_before_bgsave = server.dirty;
    /**bgsave 时间*/
    server.lastbgsave_try = time(NULL);
    /** 创建子进程*/
    openChildInfoPipe();

    /** 记录开始时间*/
    start = ustime();
    /** fork子进程*/
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        /** 关闭子进程中无用的资源,在frok之后, 比如:关闭子进程继承的 socket 监听, 关闭集群锁定(若有), 释放对service.pidfile文件的持有并设置为null来不可达处理*/
        closeClildUnusedResourceAfterFork();
        /** 子进程 title 修改*/
        redisSetProcTitle("redis-rdb-bgsave");
        /**  执行rdb 写入操作*/
        retval = rdbSave(filename,rsi);
        /** 执行完毕后*/
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);
            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        /** 退出子进程*/
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        /** 进行fork时间的统计和信息记录,比如说rdb_save_time_start、rdb_child_pid、和rdb_child_type */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {
            closeChildInfoPipe();
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

rdbSave(RDB 持久化函数)

真正进行 RDB 持久化的函数

大致的流程:

  1. 打开一个临时文件,

  2. 调用 rdbSaveRio函数,将当前 Redis 的内存信息写入到这个临时文件中,

  3. 调用 fflush、 fsync 和 fclose 接口将文件写入磁盘中,

  4. 使用 rename 将临时文件改名为 正式的 RDB 文件,

  5. 记录 dirty 和 lastsave等状态信息。

**rdbSave–rdb.c **


/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    /** 生成tmpfile文件名 temp-[pid].rdb,并打开 */
    fp = fopen(tmpfile,"w");
    if (!fp) {
        /**tmp文件打开异常日志记录 */
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }
    /** 初始化rio结构 */
    rioInitWithFile(&rdb,fp);

    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
    
    /** 主要逻辑:内存写入临时文件.*/
    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    /**缓存数据从应用层缓存刷新到内核*/
    if (fflush(fp) == EOF) goto werr;
    /**作用于文件描述符,用于将内核缓存刷新到物理设备上。*/
    if (fsync(fileno(fp)) == -1) goto werr;
    /**关闭文件*/
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    /**重新命名 rdb 文件,把之前临时的名称修改为正式的 rdb 文件名称*/
    if (rename(tmpfile,filename) == -1) {
        /** 异常处理,断开临时文件的持有.*/
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }

    /**写入完成,打印日志*/
    serverLog(LL_NOTICE,"DB saved on disk");
    /** 清理数据保存记录*/
    server.dirty = 0;
    /** 最后一次完成 SAVE 命令的时间*/
    server.lastsave = time(NULL);
    /** 最后一次 bgsave 的状态置位 成功*/
    server.lastbgsave_status = C_OK;
    return C_OK;

    /** 当发生异常, 保存日志,关闭文件,断开临时文件的持有.*/
werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

rdbSaveRio(格式化保存)

rdbSaveRio 会将 Redis 内存中的数据以相对紧凑的格式写入到文件.

大致流程:

  1. 先写入 REDIS 魔法值,然后是 RDB 文件的版本( rdb_version ),额外辅助信息 ( aux )。辅助信息中包含了 Redis 的版本,内存占用和复制库( repl-id )和偏移量( repl-offset )等
  2. 遍历数据库,先写入 RDB_OPCODE_SELECTDB识别码和数据库编号,接着写入 RDB_OPCODE_RESIZEDB识别码和数据库键值数量和待失效键值数量,最后会遍历所有的键值,依次写入。在写入键值时,当该键值有失效时间时,会先写入 RDB_OPCODE_EXPIRETIME_MS识别码和失效时间,然后写入键值类型的识别码,最后再写入键和值。
  3. 完数据库信息后,还会把 Lua 相关的信息写入,最后再写入 RDB_OPCODE_EOF结束符识别码和校验值.

rdbSaveRio-rdb.c


/* Produces a dump of the database in RDB format sending it to the specified
 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
 * is returned and part of the output, or all the output, can be
 * missing because of I/O errors.
 *
 * When the function returns C_ERR and if 'error' is not NULL, the
 * integer pointed by 'error' is set to the value of errno just after the I/O
 * error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    /**写入 REDIS 魔法值*/
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    /**写入辅助信息 REDIS版本,服务器操作系统位数,当前时间,复制信息比如repl-stream-db,repl-id和repl-offset等等数据 */
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    /** 写入额外辅助信息. */
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

    /** 遍历数据库*/
    for (j = 0; j < server.dbnum; j++) {
        /**获取数据库指针地址和数据库字典*/
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);

        /* Write the SELECT DB opcode */
        /**写入数据库部分的开始标识*/
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        /** 写入当前数据库号*/
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
         * is currently the largest type we are able to represent in RDB sizes.
         * However this does not limit the actual size of the DB to load since
         * these sizes are just hints to resize the hash tables. */
        uint64_t db_size, expires_size;
        /**获取数据库字典大小和过期键字典大小*/
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        /**写入当前待写入数据的类型,此处为 RDB_OPCODE_RESIZEDB,表示数据库大小*/
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        /**写入获取数据库字典大小和过期键字典大小*/
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        /* Iterate this DB writing every entry */
        /**遍历当前数据库的键值对*/
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            /**初始化 key*/
            initStaticStringObject(key,keystr);
            /**获取键的过期数据 */
            expire = getExpire(db,&key);
            /** 保存键值对数据*/
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    /**保存 Lua 脚本*/
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    /* EOF opcode */
    /**写入结束符*/
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    /**写入CRC64校验*/
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

rdbSaveKeyValuePair(键值对写入)

rdbSaveKeyValuePair-rdb.c

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    /* Save the expire time */
    if (expiretime != -1) {
        /**写入过期时间操作码标识*/
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save the LRU info. */
    if (savelru) {
        /**写入LRU空闲时间操作码标识*/
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    /* Save the LFU info. */
    if (savelfu) {
        /** 写入LFU访问频率操作码标识*/
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        /* We can encode this in exactly two bytes: the opcode and an 8
         * bit counter, since the frequency is logarithmic with a 0-255 range.
         * Note that we do not store the halving time because to reset it
         * a single time when loading does not affect the frequency much. */
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }

    /* Save type, key, value */
    /**写入键值对的类型标识*/
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    /**写入键值对的key*/
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    /**写入键值对的value*/
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    return 1;
}

rdb文件格式

RDB文件为二进制格式保存.

解析rdb文件示例

rdb文件

前置知识

Length Encoding 长度编码

字节数 长度 含义
00pppppp 1个字节 长度小于或等于63个字节(6位)的字符串值
01pppppp(qqqqqqqq) 2字节 长度小于或等于16383字节(14位)的字符串值
10______<4 byte> 5字节 接下来的4个字节包含一个无符号的int。长度大于或等于16384个字节的字符串值
1100____ 3个字节 整数编码为16位带符号(2个字节)
1101____ 5字节 整数编码为32位带符号(4个字节)
1110____ 9字节 整数编码为64位带符号(8个字节)
1111____ 4字节 整数编码为24位带符号(3个字节)

总体结构– rdbSave方法

rdb文件结构(总)

  • REDIS(常量):RDB文件的最开头是REDIS部分,这个部分的长度为5字节,保存着“REDIS”五个字符。 通过这五个字符,程序可以在载入文件时,快速检查所载入的文件是否RDB文件

  • db_version(变量):长度为4字节,它的值是一个字符串表示的整数,这个整数记录了RDB文件的 版本号,

    比如二进制中的30303039,转成十进制是48 48 48 57, 对应的ascii码值是 0 0 0 9,就代表RDB文件的版本为第九版

  • aux_fields(辅助字段) : 通用字符串字段,用于向 RDB 添加状态,Version 7 加入的,向后兼容。AUX 字段由键和值两个字符串组成。主要有以下字段

    • redis-ver:版本号

    • redis-bits:OS Arch

    • ctime:RDB 文件创建时间

    • used-mem:使用内存大小

    • repl-stream-db:Redis 服务器的 db 的索引

    • repl-id:Redis 主实例的 ID(replication id)

    • repl-offset:Redis 主实例的偏称量(replication offset)

    • aof-preamble:是否启动aof快照

      结构图:

      aux结构

  • databases(变量):databases部分包含着零个或任意多个数据库,以及各个数据库中的键值对数据

    • 如果服务器的数据库状态为空(所有数据库都是空的),那么这个部分也为空,长度为 0字节
    • 如果服务器的数据库状态为非空(有至少一个数据库非空),那么这个部分也为非空, 根据数据库所保存键值对的数量、类型和内容不同,这个部分的长度也会有所不同
  • EOF(常量):EOF常量的长度为1字节,这个常量标志着RDB文件正文内容的结束,当读入程序遇到这 个值的时候,它知道所有数据库的所有键值对都已经载入完毕了

  • check_sum(变量):check_sum是一个8字节长的无符号整数,保存着一个校验和,这个校验和是程序通过对 REDIS、db_version、databases、EOF四个部分的内容进行计算得出的。服务器在载入RDB文件时,会将载入数据所计算出的校验和与check_sum所记录的校验和进行对比,以此来检查RDB文件是否有出错或者损坏的情况出现

例子:

rdb文件结构例子

fa(0xfa): 250 RDB_OPCODE_AUX 辅助键值对开始标志,后面跟着扩展键值对.

ff(0xff): 255 RDB_OPCODE_EOF 文件结束标志.

database文件结构

rdb文件结构(database)

  • 每个非空数据库在RDB文件中都可以保存为SELECTDB、db_number、REDISDB、db_size、expires_size、key_value_pairs六个部分:
  • SELECTDB常量:长度为1字节,当读入程序遇到这个值的时候,它知道接下来要读入的将是一个数据库号码
  • db_number:保存着一个数据库号码,根据号码的大小不同,这个部分的长度可以是1字 节、2字节或者5字节。当程序读入db_number部分之后,服务器会调用SELECT命令,根据读入的数据库号码进行数据库切换,使得之后读入的键值对可以载入到正确的数据库中
  • db_size: db中key的个数
  • expires_size: db中存在过期时间的key个数
  • key_value_pairs部分:保存了数据库中的所有键值对数据,如果键值对带有过期时间,那 么过期时间也会和键值对保存在一起。根据键值对的数量、类型、内容以及是否有过期时间 等条件的不同,key_value_pairs部分的长度也会有所不同

例子:

rdb文件结构(database)例子

fe(0xfe): 254 RDB_OPCODE_SELECTDB db选择标志,后面跟着二字节的数据库index.

fb(0xfb): 251 RDB_OPCODE_RESIZEDB 后面跟着当前db存在的key个数与存在过期时间的key个数.

keyValuePair结构

rdbSaveRio在写键值时,会调用 rdbSaveKeyValuePair 函数。

每个键值对都有4个部分:

  • key到期时间戳。这是可选的。

    该部分以一个字节标志开始。该标志是:

    • 0xfd:以秒为单位指定以下过期值。以下4个字节将Unix时间戳表示为无符号整数。
    • 0xfc:指定以下过期值(以毫秒为单位)。以下8个字节将Unix时间戳表示为无符号长。
  • 1个字节的标志,指示值的类型。

  • key,编码为Redis字符串。

  • value,根据值类型进行编码的值。

根据键的不同类型写入不同格式,各种键值的类型和格式如下所示:

RDB_TYPE_STRING 0

rdb存储(Stirng结构)

Redis字符串是二进制安全的——这意味着您可以在其中存储任何内容。它们没有任何特殊的字符串结尾标记。

三种类型的字符串:

  • 长度前缀字符串: 字符串的长度(以字节为单位)首先使用Length Encoding进行编码。此后,将存储字符串的原始字节。

  • 8、16或32位整数: 根据长度编码规则,来确定需要读取的位数作为值.

  • LZF压缩字符串:

    读取方式:

    • 使用长度编码从流中读取压缩的长度 clen
    • 使用长度编码从流中读取未压缩的长度
    • 从流中读取下一个 clen 长度的压缩字节
    • 最后,使用LZF算法解压这些字节

RDB_TYPE_LIST 1

rdb存储(list结构)

  • zlhead: 字符串编码解码,存储当前 key 所属 value 的 bytes 数目以及是否启用了 lzf 等信息。 如 ziplist 以 1B 开头,对应 2 进制为 0b00011011 ,后 6 bit 表示为 十进制 27 ,表示当前 ziplist 共有 27 bytes。从 开始读取直到 。

  • zlbytes: 4 byte 无符号整数,采用小端字节序编码。表示当前 ziplist 总占用字节数。

  • zltail: 4 byte 无符号整数,采用小端字节序编码。代表到达最后一个 entry 需要跳过的字节数。

  • zllen: 2 byte 无符号整数,采用小端字节序编码。ziplist entry 数目。当用于存储 hash 数据时,entry 数为 key 数 + value数。

  • entrys: 存储 entry 列表。每个 entry 按如下方法存储。

    • : 可变长编码,存储前一个 entry 的占用的字节数。使用整数编码。
    • : 同样是可变长编码,存储当前 entry 的类型以及长度。
    • : 存储实际数据。依据 指定的类型和长度。采用整数编码或字符串编码。
  • zlend: 固定以 0xFF 结尾。

0%