Redis服务器

机会只留给那些准备充分的人

Posted by irving.gx on April 18, 2020

在这一篇当中我们对于redis服务器做简要的分析。redis服务器的代码位于src/server.c当中,这一个文件有4000+行,如果对每个api进行分析的话, 反倒有一种一叶障目不见泰山的感觉,所以这一篇我们首先对于服务器的整体流程进行分析,然后对于其中重要的代码进行解读。 

首先我们说下redis服务器执行命令的简要流程。不管是什么服务器,一般的流程都分为三步,客户端发出命令,服务器进行解析,服务器把执行结果发给 客户端,和把一只大象装进冰箱里的流程一样都是三步。下面我们对于执行命令的简要流程进行分析。       

解析命令

redis服务器接收到一个客户端发出的命令的时候首先要知道这个命令是要做什么的,具体怎么做,在server.c这个文件里面有一个映射表,存储的是redis命令 与具体需要执行的函数以及关于这个命令的一些参数,我们先看下其中一部分:

这些只是命令的一部分,每个命令在这个结构当中都有对应的映射关系。对于每一条命令,

第一部分name是这个命令的名字,比如第三行set命令,set就是这个命令的名字。

第二部分function表示要执行这个命令的具体的函数,set命令要执行的函数就是setCommand这个。

第三部分arity表示命令的参数个数,如果是正数表示就是几个参数,如果是负数,表示大于等于多少个参数,比如set这个命令的参数就要大于等于3个。

第四部分sflags表示命令的标志位。下面是每个标志位的具体含义:

  • r:读命令
  • w: 写命令
  • m: 一旦调用可能会增加内存使用量
  • a: 管理员命令,比如SAVE
  • p:发布/订阅相关命令
  • f: 强制命令复制
  • s: 脚本中不允许这个命令
  • R: 随机命令,也就是说命令参数一样,但是执行结果可能不同,比如SPOP
  • S: 从脚本中调用会将输出结果排序
  • l: 加载数据库的时候也会允许命令执行
  • t: 如果从服务器数据过期也会允许这个命令执行,一般这种情况下是没有命令被允许的,只有少部分可以
  • M: 当处于MONITOR调试的时候不要自动传播这个命令
  • k: 对于这个命令执行一下隐式的ASKING,这样这个命令可以用于集群模式
  • F: 快速命令,O(1)或者O(log(N))命令,不应该被延迟执行

第五部分flags表示redis通过标志位计算出来的掩码,我们可以看到上图中大部分命令是0

第六部分get_keys_proc是一个可选函数来获取键的参数,只有当第七,八,九三部分无法确定参数的时候才会用到这部分,一般这部分为null

第七部分first_key_index表示第一个参数是一个键

第八部分last_key_index表示最后一个参数是一个键

第九部分key_step表示哪些参数是键,比如一个命令可以有多个键,MSET key1,val1,key2,val2这样的

第十部分microseconds表示执行这个命令所需要的微秒数

最后一部分calls表示这个命令被调用的总次数

初始化检查以及执行命令

在解析完命令之后,接下来就要为执行命令做准备,这部分代码仍然在src/redis.c这个文件里面,函数名称为processCommand,下面是这个函数的具 体执行步骤和细节

int processCommand(client *c) {
//单独处理quit命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
//查找命令并且检查命令参数是否有问题
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
/* 检查用户是否通过身份验证 */
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return C_OK;
    }
/* 查看集群模式是否开启,是否执行集群重定向,如果命令发出者是主服务器或者命令没有
     *  键的参数,就不执行重定向
      */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }
/* 
     如果打开了maxmemory功能,检查内存占用情况,看是否有足够的内存
     */
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
if (server.current_client == NULL) return C_ERR;
/* 如果内存不够 */
        if (out_of_memory &&
            (c->cmd->flags & CMD_DENYOOM ||
             (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return C_OK;
        }
    }
/* 如果硬盘持久化有问题的话不执行写命令 */
    int deny_write_type = writeCommandsDeniedByDiskError();
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
        flagTransaction(c);
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return C_OK;
    }
/* 没有足够的从服务器的话也不执行写命令 */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return C_OK;
    }
/*
     * 如果是只读的从服务器不执行写命令,如果是主服务器的话接受写命令
     */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
        addReply(c, shared.roslaveerr);
        return C_OK;
    }
/*
     * 在发布/订阅模式下只接受SUBSCRIBE以及UNSUBSCRIBE命令
    */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }
/* 当主从数据不同步的时候只允许带有t的标志的命令执行 */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE))
    {
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return C_OK;
    }
/*
     * 命令没有CMD_LOADING标志位的时候并且在加载数据库
     */
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return C_OK;
    }
/*
     * 如果lua脚本执行太慢,只允许一部分受限的命令执行
     */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return C_OK;
    }
/* 执行命令 */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

在执行完上述的检查工作之后服务器就会调用对应命令的执行函数来执行命令,执行完之后就会把结果返回给客户端。

   以上就是服务器处理命令执行结果的基本过程。


如果对你有帮助,请作者喝一杯牛奶吧