1. 首页
  2. redis实践

Redis源码分析–AOF文件增量追写源码阅读

重要说明,在看这篇文章之前,最好先通过 剖析Redis协议 了解Redis协议,AOF文件增量追写就是根据Redis协议生成的;

这个方法主要是实时追写AOF文件的业务逻辑,比如配置了appendonly yes的场景下,执行set ,hset,lpush等(导致内存数据变化)命令,就会调用这个方法实时刷新AOF文件:

  /* Write the append only file buffer on disk.
     *
     * Since we are required to write the AOF before replying to the client,
     * and the only way the client socket can get a write is entering when the
     * the event loop, we accumulate all the AOF writes in a memory
     * buffer and write it on disk using this function just before entering
     * the event loop again.
     *
     * About the 'force' argument:
     *
     * When the fsync policy is set to 'everysec' we may delay the flush if there
     * is still an fsync() going on in the background thread, since for instance
     * on Linux write(2) will be blocked by the background fsync anyway.
     * When this happens we remember that there is some aof buffer to be
     * flushed ASAP, and will try to do that in the serverCron() function.
     *
     * However if force is set to 1 we'll write regardless of the background
     * fsync. */
    #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */

    // force:是否强制刷新,只有从appendonly yes切换到appendonly(通过config set)时force才为0,其他情况都是0;
    void flushAppendOnlyFile(int force) {
        ssize_t nwritten;
        int sync_in_progress = 0;
        mstime_t latency;
        // 如果AOF buffer中没有任何数据(非读的redis命令操作都会记录到aof_buf中),那么不需要flush AOF文件;
        if (sdslen(server.aof_buf) == 0) return;

        if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            // 判断是否有正在进行中的AOF fsync任务
            sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

        // 如果刷新策略是EVERYSEC,默认策略,即每秒刷新,且force为0
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
            /* With this append fsync policy we do background fsyncing.
             * If the fsync is still in progress we can try to delay
             * the write for a couple of seconds. */
            // 如果AOF fsync任务正在进行中
            if (sync_in_progress) {
                // 如果以前从来没有推迟aof flush,那么设置aof_flush_postponed_start 的值为当前时间并退出;
                if (server.aof_flush_postponed_start == 0) {
                    /* No previous write postponing, remember that we are
                     * postponing the flush and return. */
                    server.aof_flush_postponed_start = server.unixtime;
                    return;
                // 如果以前有推迟aof flush,但是与当前时间间隔不超过2s,那么认为还OK,继续推迟,可以退出;即两次aof flush的时间间隔要超过2s,否则推迟aof flush,让redis使用者通过日志排查是否服务器有问题;
                } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                    /* We were already waiting for fsync to finish, but for less
                     * than two seconds this is still ok. Postpone again. */
                    return;
                }
                // 否则(即两次AOF flush的任务时间间隔超过2s)输出日志提示,disk is busy? ....this may slow down Redis;即AOF flush的速度太慢了;
                /* Otherwise fall trough, and go write since we can't wait
                 * over two seconds. */
                server.aof_delayed_fsync++;
                redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
            }
        }
        /* We want to perform a single write. This should be guaranteed atomic
         * at least if the filesystem we are writing is a real physical one.
         * While this will save us against the server being killed I don't think
         * there is much to do about the whole server stopping for power problems
         * or alike */

        latencyStartMonitor(latency);
        // 将AOF buffer中的内容写入aof文件中;并返回写入内容长度nwritten 
        nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
        latencyEndMonitor(latency);
        /* We want to capture different events for delayed writes:
         * when the delay happens with a pending fsync, or with a saving child
         * active, and when the above two conditions are missing.
         * We also use an additional event name to save all samples which is
         * useful for graphing / monitoring purposes. */
        if (sync_in_progress) {
            latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
        } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
            latencyAddSampleIfNeeded("aof-write-active-child",latency);
        } else {
            latencyAddSampleIfNeeded("aof-write-alone",latency);
        }
        latencyAddSampleIfNeeded("aof-write",latency);

        /* We performed the write so reset the postponed flush sentinel to zero. */
        server.aof_flush_postponed_start = 0;
        // 写入内容长度nwritten与AOF buf不一致,即aof flush失败
        if (nwritten != (signed)sdslen(server.aof_buf)) {
            static time_t last_write_error_log = 0;
            int can_log = 0;
            // 限制aof flush失败的日志输出,即每两次aof flush的warning日志要超过30s(AOF_WRITE_LOG_ERROR_RATE定义),否则can_log=0,即不能输出日志
            /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
            if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
                can_log = 1;
                last_write_error_log = server.unixtime;
            }

            // nwritten为-1表示写入aof文件失败,输出warnings日志;
            /* Log the AOF write error and record the error code. */
            if (nwritten == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                        strerror(errno));
                    server.aof_last_write_errno = errno;
                }
            // 如果nwritten不为-1,表示写入aof文件的内容与期望的内容不一致,输出warnings日志;
            } else {
                if (can_log) {
                    redisLog(REDIS_WARNING,"Short write while writing to "
                                           "the AOF file: (nwritten=%lld, "
                                           "expected=%lld)",
                                           (long long)nwritten,
                                           (long long)sdslen(server.aof_buf));
                }

                // 由于只是AOF文件没有写完整,所以尝试通过ftruncate()函数修复AOF文件(server.aof_current_size就是最后一次AOF成功的文件大小)
                if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                    if (can_log) {
                        redisLog(REDIS_WARNING, "Could not remove short write "
                                 "from the append-only file.  Redis may refuse "
                                 "to load the AOF the next time it starts.  "
                                 "ftruncate: %s", strerror(errno));
                    }
                } else {
                    /* If the ftruncate() succeeded we can set nwritten to
                     * -1 since there is no longer partial data into the AOF. */
                    nwritten = -1;
                }
                server.aof_last_write_errno = ENOSPC;
            }

            // 如果aof flush出错,且AOF flush的策略为AOF_FSYNC_ALWAYS,即总是刷新,这种情况下不能恢复aof文件,只能通过warnings日志告知用户;
            /* Handle the AOF write error. */
            if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
                /* We can't recover when the fsync policy is ALWAYS since the
                 * reply for the client is already in the output buffers, and we
                 * have the contract with the user that on acknowledged write data
                 * is synced on disk. */
                redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
                exit(1);
            } else {
                /* Recover from failed write leaving data into the buffer. However
                 * set an error to stop accepting writes as long as the error
                 * condition is not cleared. */
                server.aof_last_write_status = REDIS_ERR;

                /* Trim the sds buffer if there was a partial write, and there
                 * was no way to undo it with ftruncate(2). */
                if (nwritten > 0) {
                    server.aof_current_size += nwritten;
                    sdsrange(server.aof_buf,nwritten,-1);
                }
                return; /* We'll try again on the next call... */
            }
        } else {
            /* Successful write(2). If AOF was in error state, restore the
             * OK state and log the event. */
            if (server.aof_last_write_status == REDIS_ERR) {
                redisLog(REDIS_WARNING,
                    "AOF write error looks solved, Redis can write again.");
                server.aof_last_write_status = REDIS_OK;
            }
        }
        // aof flush成功后更新aof文件size,即增加此次写入内容长度nwritten
        server.aof_current_size += nwritten;

        /* Re-use AOF buffer when it is small enough. The maximum comes from the
         * arena size of 4k minus some overhead (but is otherwise arbitrary). */
        if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
            sdsclear(server.aof_buf);
        } else {
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }

        // 如果有正在执行中的RDB或者AOF持久化任务,且no-appendfsync-on-rewrite配置为true(可以通过config配置,或者配置文件),那么不执行fsync;
        /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
         * children doing I/O in the background. */
        if (server.aof_no_fsync_on_rewrite &&
            (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
                return;

        // 如果aof flush的策略是AOF_FSYNC_ALWAYS,那么调用aof_fsync(),即调用fdatasync进行数据同步;如果aof flush的策略是AOF_FSYNC_EVERYSEC ,那么调用aof_background_fsync()即创建一个job任务进行fsync;无论哪种策略都记录最后一次fsync的时间到server.aof_last_fsync中;
        /* Perform the fsync if needed. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* aof_fsync is defined as fdatasync() for Linux in order to avoid
             * flushing metadata. */
            latencyStartMonitor(latency);
            aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
            latencyEndMonitor(latency);
            latencyAddSampleIfNeeded("aof-fsync-always",latency);
            server.aof_last_fsync = server.unixtime;
        } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                    server.unixtime > server.aof_last_fsync)) {
            if (!sync_in_progress) aof_background_fsync(server.aof_fd);
            server.aof_last_fsync = server.unixtime;
        }
    }

作者:阿飞的博客

来源:https://www.jianshu.com/p/91cf48c8c082


JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

本文著作权归作者所有,如若转载,请注明出处

转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

标题:Redis源码分析–AOF文件增量追写源码阅读

链接:https://www.javajike.com/article/1853.html

« Redis源码分析–AOF文件全量重写源码阅读
index»

相关推荐

QR code