Redis事件处理

机会只留给那些准备充分的人

Posted by irving.gx on April 18, 2020

redis是一种事件驱动的程序,什么是事件呢,比如我们向redis输入一个命令执行或者redis在某个时间点到来的时候执行一定的动作,这些都是事件。 redis服务器会处理文件事件以及时间事件两类,前者是服务器和客户端通过套接字进行通信或者和其他服务器进行通信时所采取动作的抽象,后者是服务器 对定时事件操作的抽象。

服务器对于socket所采取的连接,读取,写入,关闭等等操作都是文件事件,因为一台服务器可能同时要和多个客户端或者服务器进行通信,而且和同一个对象 进行通信的时候可能同时会有多种操作,因此就会涉及多种事件流的同时处理。

如果我们是redis的设计者,一种可能想到的方式就是采取多线程多进程的方式,构建一个线程池,每当需要有事件进行处理的时候我们就将这个事件指定给某个 空闲的线程进行处理,但是这样的话切换线程,保存上下文就是一种开销。而且redis是一种单线程的设计,之所以设计成单线程的模式,是因为redis是基于 内存的操作,而CPU不是瓶颈,设计成单线程更加简单方便。所以这种方式不太适合redis。另外一种可能的方案是采取轮询的方式,构建一个while循环, 然后在每次遍历的时候对于所有的socket都询问一次,但是这样的方式会消耗大量的CPU,造成了资源无端的浪费。

而redis采用的方式是基于reactor模式构建的事件处理器,一个关于reactor模式经常被提到的现实中的例子就是餐厅的服务员的例子,传统的多线程是 每当有一个任务需要处理就指派一个线程去处理,就好像餐厅里每来一个客人就要有一个专门的服务员去接待客人,从接待到点菜,如果采用这种模式的话这个 服务员的很多时间都在等待,这个服务员在等待顾客点菜的时候完全可以去接待别的新来的顾客,reactor模式就是基于这样的思想,一个线程可以处理多个 事件流,当有事件准备好读状态或者写状态的时候可以通知线程去处理。redis采用的是I/O多路复用技术来同时监听多个套接字,将常用的I/O多路复用函数 进行封装,这些函数有select,epoll,kqueue,以及evport函数,对应的文件都是以”ae_“为前缀,后面加上对应函数的文件,redis会根据不同的编译 器采用不同的I/O多路复用技术。

 下面我们看一下文件事件结构的代码

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

其中mask表示文件事件的状态,分别是可读,可写以及阻塞,rfileProc为读事件函数,wfileProc为写文件函数,clientData为客户端传入的数据。

   我们再看一下时间事件的结构的代码

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier.时间事件唯一标识 */
    long when_sec; /* seconds 时间事件到达的秒数*/
    long when_ms; /* milliseconds 时间事件到达的毫秒数*/
    aeTimeProc *timeProc;//事件处理函数
    aeEventFinalizerProc *finalizerProc;//事件终结函数
    void *clientData;//客户端传入的数据
    struct aeTimeEvent *prev;//上一个时间事件
    struct aeTimeEvent *next;//下一个时间事件
} aeTimeEvent;

事件处理函数的主要逻辑如下所示

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

下面是aeProcessEvents函数的主要逻辑,在主要逻辑后面加入了注释

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. 返回处理过的事件数目*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    //初始化处理过的事件数目为0
    int processed = 0, numevents;
/* Nothing to do? return ASAP 没有事件直接返回*/
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
     //没有文件事件或者有时间事件并且未设置不阻塞标识
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            //获取最近发生的时间事件
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
            //获取时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
/* How many milliseconds we need to wait for the next
             * time event to fire? 计算时长*/
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            //未到时
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
            //时间事件链表为空
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
/* Call the multiplexing API, will return only on timeout or when
         * some event fires.调用多路复用接口,只有当时间事件到来或者文件事件触发才会返回 */
        numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback.睡眠之后回调 */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
        //遍历文件事件表
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            //获取文件类型,文件描述符
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client.
             一般情况下优先读事件,其次写事件,但是如果设置AE_BARRIER的话就不会在读之后触发写,这样我们就可以使操作倒置*/
            int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. 可读事件发生*/
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
/* Fire the writable event. 可写事件发生*/
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
/* If we have to invert the call, fire the readable event now
             * after the writable one. 如果设置了倒置,可读事件发生*/
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
processed++;
        }
    }
    /* Check time events 执行时间事件*/
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

以上就是redis事件处理的一些简单分析。


如果对你有帮助,请作者喝一杯牛奶吧