一、redis多路复用涉及的结构
struct redisServer {
// 描述符
int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
// 描述符数量
int ipfd_count; /* Used slots in ipfd[] */
// 事件状态
aeEventLoop *el;
}
/* State of an event based program
*
* 事件处理器的状态
*/
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */ // 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据, 这个结构在epoll中是aeApiState结构
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
typedef struct aeFileEvent {
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 调用rfileProc或wfileProc会读的私有数据,这个私有数据的初始化在aeCreateFileEvent中传入。
void *clientData;
} aeFileEvent;
/* A fired event
*
* 已就绪事件
*/
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
/* 事件状态*/
typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
二、redis多路复用服务端调用流程
1.初始化服务
在InitServer中,调用
- server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
- aeCreateEventLoop函数返回eventLoop(aeEventLoop结构),函数中赋值eventLoop各结构,包括分配eventLoop->events、eventLoop->fired空间,
- aeCreateEventLoop会调用aeApiCreate(eventLoop),赋值eventLoop->apidata = state(aeApiState结构),包括分配state->events空间,调用state->epfd = epoll_create(1024);
- 调用listenToPort(server.port,server.ipfd,&server.ipfd_count),bind和listen端口
- listenToPort函数会遍历server.bindaddr_count,根据server.bindaddr地址格式创建ipv4(anetTcpServer函数)或者ipv6的tcp服务(anetTcp6Server函数),若server.bindaddr_count为0,则默认创建一个ipv4和一个ipv6的tcp服务。创建好后,调用anetNonBlock去设置这个fd为非阻塞I/O。创建的anetTcp6Server和anetTcp6Server返回的socket的fd赋值给server.ipfd数组。
- anetTcpServer和anetTcp6Server均调用_anetTcpServer(err, port, bindaddr, AF_INET/AF_INET6, backlog)函数
- 调用s=socket()函数,通过anetSetReuseAddr(会进一步调用setsockopt)设置端口可重用,根据bindaddr构建addr的信息,调用anetListen(err,s,ai_addr,ai_addrlen,backlog),_anetTcpServer的返回值就是s。
- anetListen会调用bind(s,sa,len)和listen(s, backlog),若返回-1则会调用close(s)关闭套接字
- 为 TCP 连接关联连接应答(accept)处理器用于接受并应答客户端的 connect() 调用,即遍历server.ipfd,调用aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
- fd = server.ipfd[j],调用aeApiAddEvent(eventLoop, fd, mask),取出aeFileEvent *fe = &eventLoop->events[fd],赋值fe->mask为AE_READABLE或AE_WRITABLE,赋值fe->clientData为传进来的clientData(监听套接字为NULL),通过fd更新最大值eventLoop->maxfd,根据mask赋值fe->rfileProc或fe->wfileProc为传进来的proc执行函数指针(如accept是acceptTcpHandler关联的事这个事件处理器)。
- 在aeApiAddEvent中调用epoll_ctl(state->epfd,op,fd,&ee),
- 其中state->epfd为epoll自身描述符,
- 当eventLoop->events[fd].mask为AE_NONE时,op是EPOLL_CTL_ADD事件,否则是EPOLL_CTL_MOD事件,
- fd为aeApiAddEvent传进来的fd。
- 构建epoll_event ee,若mask是AE_READABLE时,赋值ee.events为EPOLLIN,若mask是AE_WRITABLE时,赋值ee.events为EPOLLOUT,ee.data.fd = fd。
- 调用epoll_ctl函数。
2.服务器运行期间
aeMain(server.el),运行事件处理器(包括事件事件处理器和文件事件处理器),这里就是服务器运行的主逻辑。
- 当eventloop->stop == 0时(仅在benchmark模式下可能会设置stop=1),循环执行eventLoop->beforesleep(eventLoop)和aeProcessEvents(eventLoop, AE_ALL_EVENTS);
- aeProcessEvents开始处理时间事件和文件事件
- 获取最近的时间事件shortest=aeSearchNearestTimer(eventLoop);
- 计算距今最近的时间事件还要多久才能到达aeGetTime,存在tvp结构中,若小于0直接赋0
- 根据tvp设置aeApiPoll的阻塞事件,调用numevents = aeApiPoll(eventLoop, tvp),获取就绪的文件事件数目。
- aeApiState *state = eventLoop->apidata;
- numevents = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
- 用j下标根据numevents(到达事件数,遍历次数)遍历struct epoll_event *e = state->events+j,通过e.events去赋值eventLoop->fired[j].mask,通过e->data.fd赋值eventLoop->fired[j].fd
- 返回numevents数目
- 用j下标根据numevents遍历eventLoop->events[eventLoop->fired[j].fd]
- 当eventLoop->events的mask和eventLoop->fired的mask都是AE_READABLE时,调用之前设置在eventLoop->events的rfileProc(eventLoop,fd,fe->clientData,mask)函数
- 当eventLoop->events的mask和eventLoop->fired的mask都是AE_WRITABLE时,调用之前设置在eventLoop->events的wfileProc(eventLoop,fd,fe->clientData,mask)函数
- 注意对于同一个fd,读或者写只能执行其中一个,先检查是否可执行读,再检查是否可执行写。
3.服务器关闭
这种关闭方式目前只用于benchmark模式,需要eventLoop->stop=1去退出aeMain循环,才能进入下面的函数。
调用aeDeleteEventLoop(server.el);
- 调用aeApiFree(eventLoop),然后释放eventLoop相关的空间。
- aeApiState *state = eventLoop->apidata,调用close(state->epfd),关闭epoll描述符。
其他关闭方式:
(1)通过信号,捕捉后设置shutdown_asap标记,在servercron中调用prepareForShutdown函数,然后exit。
参考:
https://www.sidney.wiki/system/1175
(2)通过shutdown命令,直接在命令处理中调用prepareForShutdown函数,然后exit。
4.rfileProc或者wfileProc事件处理器的执行内容举例
(1)事件处理器简介
初始化事件处理器时都会调用aeCreateFileEvent去调用epoll_ctl,去增加或者修改描述符。
事件处理器有以下这些。
acceptTcpHandler accept事件处理器
readQueryFromClient 读事件处理器
sendReplyToClient 写事件处理器
sendBulkToSlave master写slave RDB的事件处理器
readSyncBulkPayload slave读master RDB事件处理器
syncWithMaster 同步主服务器的 读+写事件 处理器
其他还包括script脚本环境的事件处理器、sentinel的事件处理器、cluster的事件处理器
(2)acceptTcpHandler处理器的执行流程
acceptTcpHandler是在initServer时,调用aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)去初始化的rfileProc。
它是用来做accept操作的事件处理器。
acceptTcpHandler的执行流程:
调用一个max次的循环,循环内调用cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);和acceptCommonHandler(cfd,0);,若anetTcpAccept返回的cfd是ANET_ERR的,代表没有连接到来,则在当前循环就break掉了,而如果fd有返回,则accept一个fd后,会继续循环并accept下一个fd。
- anetTcpAccept会调用fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen),然后记录接收到的ip和port,从网络序转为主机序
- anetGenericAccept会在一死循环里调用fd=accept,当fd为-1时,且errno是EINTR(被其他的系统调用中断了, 对于句柄进行操作比较容易出现,一般裸用如recv都是需要判断的, 处理也很简单, 再进行一次操作就可以了)时,会continue继续accept;如果errno是其他值,则会直接返回ANET_ERR;若有可用的fd,则也会直接返回fd
- anetTcpAccept调用acceptCommonHandler(cfd, 0),(cfd是连接客户端套接字fd,0会复制给clients->flags,代表无状态)为客户端创建客户端状态
- 调用createClient(fd)创建客户端结构。(fd不为-1时会创建带网络连接的客户端;如果 fd 为 -1 ,那么创建无网络连接的伪客户端)
- fd不为-1时,网络连接设置创建:
- anetNonBlock(fd)设置非阻塞
- 禁用 Nagle 算法(在建立连接时,为什么要禁用Nagle 算法)
- 若server.tcpkeepalive,调用setsocket,设置tcp的keep alive选项
- 调用aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c),注意这里的c是redisClient指针,它会被赋值到具体某个时间的aeFileEvent->clientData私有数据中,后续在调用事件处理器时(如readQueryFromClient),将这个clientData作为事件处理器的一个参数。
- 不管fd是否为-1,即不管是否是虚拟客户端,都需要初始化客户端的各个属性
- 初始化完客户端状态后,对于真实客户端,需要添加到服务器的客户端链表中server.clients的表尾中(调用listAddNodeTail函数)
- 当server.clients数组的数量超过server.maxclients时,write一个err回去,并且释放拒绝当前处理的client,即调用freeClient(c)释放客户端状态
- freeClient除了清除相关客户端状态外,当c->fd不为-1时,会调用aeDeleteFileEvent(server.el,c->fd,AE_READABLE)和aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE)从事件处理器中删除套接字的事件,并调用close(c->fd)删除套接字。
- aeDeleteFileEvent会调用aeApiDelEvent取消对给定 fd 的给定事件的监视
- aeApiDelEvent会调用epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee)将指定的ee.events(对应mask)删掉,或者epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee)删掉对fd的整个监视。
- 更新客户端clients->flags
(3)readQueryFromClient处理器的执行流程
readQueryFromClient的执行流程:
检查如果是REDIS_REQ_MULTIBULK的reqtype,设置剩余长度为要读取的readlen,否则默认读取16M。
获取查询缓冲区querybuf的当前内容的长度qblen,并且为querybuf这个sds新分配至少qblen+readlen空间(按照sds的分配策略,小于1M时,分配2倍大小,否则分配1M大小)
(注意 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面,qblen为没有处理命令的滞留的缓冲区长度,因为这些滞留内容也许不能完整构成一个符合协议的命令)
从缓存的querybuf的qblen开始写入数据,最多一次写入readlen的长度,nread = read(fd, c->querybuf+qblen, readlen),nread为读取的长度。
检查读取状态,若nread == -1且errno == EAGAIN,设置nread=0,等待下次读事件处理器的调用,其他的(nread<=0)则会调用freeClient(c)释放客户端状态;如果read后缓冲区超过服务器最大缓冲区长度,则会清空缓冲区,并调用freeClient(c)释放客户端状态;
若读取成功,更新querybuf的这个sds的len和free属性,若客户端是master,还会更新它的复制偏移量c->reploff += nread
调用processInputBuffer(c),从查询缓存重读取内容,创建参数,并执行命令,函数会执行到缓存中的所有内容都被处理完为止。
- processInputBuffer的处理是一个循环,当querybuf不为空时,循环处理querybuf。尽可能地处理查询缓冲区中的内容,但如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面,这些滞留内容也许不能完整构成一个符合协议的命令,需要等待下次读事件的就绪。
- 若客户端被暂停、阻塞、关闭,直接返回
- 若未设置c->reqtype请求类型,则通过querybuf[0]是否是“*”决定是否是REDIS_REQ_MULTIBULK或者REDIS_REQ_INLINE;
- 当命令类型是REDIS_REQ_INLINE时,调用processInlineBuffer读取命令,querybuf会被resize到原始大小-2(即去掉了 \r\n),读取单条命令到c->argc、c->argv中(e.g.SET msg hello \r\n,会去掉\r \n,然后填充个数argc和参数argv),然后释放querybuf
- 当命令类型是REDIS_REQ_MULTIBULK时,调用processMultibulkBuffer读取命令
- 每次循环只读取一条完整命令,(比如
*3\r\n$3\r\nSET\r\n$3\r\nMSG\r\n$5\r\nHELLO\r\n
,将被转换为:argv[0] = SET、argv[1] = MSG 、argv[2] = HELLO 、argc=3 - 从querybuf中删除当前已经读取的内容,sdsrange(c->querybuf,pos,-1);(相当于读到pos了)
- 若processInlineBuffer、processMultibulkBuffer返回值不是REDIS_OK,可能代表short read,则会break掉当前循环,等待下一次read进更多的数据进缓冲区,再做处理
- 调用processCommand(c)执行命令,返回值代表执行状态是REDIS_OK或者REDIS_ERR,下面是简化版(详细版)
- 调用lookupCommand查找命令,包括执行函数,参数合法性等,若有异常则调用addReply或者addReplyErrorFormat
- 调用call执行命令,call会调用c->cmd->proc(c)执行命令的实现函数,实现完后同样也会addReply
- addReply会调用prepareClientToWrite(c),为客户端安装写处理器到事件循环,然后写的输出缓冲区
- prepareClientToWrite中会判断客户端是否可写(除lua外无连接的为客户端不可写,主服务器且不接受查询不可写),然后调用aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,acceptUnixHandler,sendReplyToClient, c),安装写处理器sendReplyToClient
- 写输出缓冲区
- 首先调用_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr),复制内容到c->buf中,这样可以避免内存分配
- 如果c->buf中的空间不够,就调用_addReplyObjectToList(c,obj),复制到c->reply链表中,可能会引起内存分配
- 执行完命令后,调用resetClient(c),重置argc、grgv、reqtype、multibulklen、bulklen等
(4)sendReplyToClient写处理器的执行流程
循环while(c->bufpos > 0 || listLength(c->reply)),一直到回复缓冲区为空
- 当c->bufpos > 0时
- 调用nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen),
- 写入失败时(nwritten <= 0),退出循环
- 出现short read时,写入未能一次完成,通过nwritten记录写成功的sentlen,c->sentlen += nwritten,下一次循环时,c->buf+c->sentlen 就会偏移到正确(未写入)内容的位置上。
- 写入完毕时,修改c->bufpos=0,
- c->reply的处理类似,只是缓冲区是链表,并且写入完毕后会直接删除写入完毕的节点listDelNode(c->reply,listFirst(c->reply))。
- 使用towriteten已经写入的数据,为了避免一个非常大的回复独占服务器,当写入的总数量大于 REDIS_MAX_WRITE_PER_EVENT(64k),临时中断写入(break循环),将处理时间让给其他客户端,剩余的内容等下次写入就绪再继续写入。
- 不过,上述条件中,如果服务器的内存占用已经超过了限制,那么为了将回复缓冲区中的内容尽快写入给客户端,然后释放回复缓冲区的空间来回收内存,这时即使写入量超过了 REDIS_MAX_WRITE_PER_EVENT,也不会中断写入(不会break)
写入出错检查:当nwritten == -1且errno == EAGAIN时,设置nwritten=0,等下一次写事件处理器的调用;其他情况的nwritten == -1时,直接调用freeClient(t)清空客户端状态并退出。
写完后,调用aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE),删除写相关的event,若指定了写入后需要关闭客户端c->flags & REDIS_CLOSE_AFTER_REPLY,则调用freeClient清空客户端状态。