C1000K之Libevent源码分析

简介

说到异步IO,高并发之类的名词, 可能很多人第一反应就是 select, poll, epoll, kqueue 之类的底层代码库。 但是其实除非你要写一个 Nginx 性能级别的服务器, 否则直接使用 epoll 之类的还是太过底层, 诸多不便,要榨干整个异步编程的高并发性能还需要开发很多相关组件, 而 Libevent 就是作为更好用的高性能异步编程网络库而生, 他帮你包装了各种 buffer 和 event, 甚至也提供了更加高层的 http 和 rpc 等接口, 可以让你脱离底层细节,更加专注于服务的其他核心功能的实现。 当然,要真正用好它,还是需要懂不少关于他的实现原理。

如果是第一次接触 Libevent 的可以先看一篇非常好的入门文章: Libevent-book , 文章主要从 C10K 问题的发展循序渐进, 分别讲了在高并发连接的情况下, 多线程解决方案, 多进程解放方案会遇到的问题, 从而引出为什么异步IO是当前解决高并发连接最有效的方案。

ideawu 实现的 C1000K 服务器 icomet 核心就是基于 Libevent 实现的。

本文源码分析基于 Libevent master 分支 commit 6dba1694c89119c44cef03528945e5a5978ab43a 版本的代码。

事件循环

既然是异步IO,事件驱动,自然会有事件循环(event loop) 。 Libevent 的事件循环是通过调用 event_base_dispatch 来实现, 其实 event_base_dispatch 函数也是调用了 event_base_loop, 代码如下:

int
event_base_dispatch(struct event_base *event_base)
{
    return (event_base_loop(event_base, 0));
}

运行事件循环的函数肯定是阻塞函数, 拿 linux 平台来说, libevent 的事件循环其实就是循环调用 epoll_wait 函数,

int epoll_wait(int epfd, struct epoll_event *events,
              int maxevents, int timeout);

在没有任何事件被触发的时候,epoll_wait 是阻塞等待的, 而且,在 Libevent 里,定时器的实现其实就是通过 epoll_wait 的 timeout 参数实现的。

如果只有一个单线程的话,一旦调用了 event_base_dispatch 之后,这个线程就会被事件完完全全的霸占, 无法进行任何其他的操作。

如果我们需要临时手动激活任何其他事件的话, 则需要借助另一个线程来操作(因为主线程仍然在阻塞等待中)。

在 event_active 函数的解释里面就有一句话说的就是这个事情:

One common use in multithreaded programs is to wake the thread running event_base_loop() from another thread.

事件是 Libevent 的最小单位

这里的事件指的就是 struct event 数据结构。

事件可以注册的各种信号如下:

#define EV_TIMEOUT   0x01
   Indicates that a timeout has occurred.
#define EV_READ   0x02
   Wait for a socket or FD to become readable.
#define EV_WRITE   0x04
   Wait for a socket or FD to become writeable.
#define EV_SIGNAL   0x08
   Wait for a POSIX signal to be raised.
#define EV_PERSIST   0x10
   Persistent event: won't get removed automatically when
   activated.
#define EV_ET   0x20
   Select edge-triggered behavior, if supported by the backend.

几乎所有其它更上层的数据结构都是基于 struct event 的包装来完成的。

核心数据结构

就拿 基于 Libevent 写一个 HTTP 服务器举例来说说, 编程时需要理解的几个核心数据结构是:

  • evhttp_request
  • evhttp_connection
  • bufferevent
  • evbuffer

从上到下是从高层到底层的关系, 下文的顺序也是从高层往底层分析。

evhttp_request

struct evhttp_request {
    // 每个 evhttp_request 都内含一个 evhttp_connection 来负责数据传输
    struct evhttp_connection *evcon;


    //输入输出的两个buffer(这两个buffer的数据是从 evhttp_connection 里面的 input_buffer 和 output_buffer 拷贝过来的。)

    struct evbuffer *input_buffer;  /* read data */
    ev_int64_t ntoread;
    unsigned chunked:1,     /* a chunked request */
        userdone:1;         /* the user has sent all data */

    struct evbuffer *output_buffer; /* outgoing post or data */

    //和HTTP协议有关的各种回调函数:

    void (*cb)(struct evhttp_request *, void *);
    void *cb_arg;

    void (*chunk_cb)(struct evhttp_request *, void *);

    int (*header_cb)(struct evhttp_request *, void *);

    void (*error_cb)(enum evhttp_request_error, void *);

    void (*on_complete_cb)(struct evhttp_request *, void *);
    void *on_complete_cb_arg;

    // 其它
    // ......
};

evhttp_connection

evhttp_request 和 evhttp_connection 的关系很简单,拿协议栈来对比的话, 前者代表的是 HTTP 协议,即应用层协议, 后者代表的是 TCP 协议,即传输层协议。 前者需要管理所有和 HTTP 相关的数据内容,比如 HTTP header 数据和 body 数据。

struct evhttp_connection {

    // socket文件描述符
    evutil_socket_t fd;

    // evhttp_connection 含有一个bufferevent,基于它进行数据传输。
    struct bufferevent *bufev;

    // 和数据传输有关的状态
    enum evhttp_connection_state state;

    //和 HTTP 协议无关的数据传输回调函数
    void (*cb)(struct evhttp_connection *, void *);
    void *cb_arg;

    void (*closecb)(struct evhttp_connection *, void *);
    void *closecb_arg;

    // 其它
    // ......
};

evhttp_connection 结构里面含有 enum evhttp_connection_state state 变量, 这个和 Thrift异步IO服务器源码分析 里面的 保持每个连接的状态是一个道理。 维护状态变化是异步IO服务编程的必要条件。

enum evhttp_connection_state {
    EVCON_DISCONNECTED, /**< not currently connected not trying either*/
    EVCON_CONNECTING,   /**< tries to currently connect */
    EVCON_IDLE,     /**< connection is established */
    EVCON_READING_FIRSTLINE,/**< reading Request-Line (incoming conn) or
                 **< Status-Line (outgoing conn) */
    EVCON_READING_HEADERS,  /**< reading request/response headers */
    EVCON_READING_BODY, /**< reading request/response body */
    EVCON_READING_TRAILER,  /**< reading request/response chunked trailer */
    EVCON_WRITING       /**< writing request/response headers/body */
};

bufferevent

bufferevent 就是包装了 EV_READ event 和 EV_WRITE event , 并且带有读写缓冲区(evbuffer)的更高层的单位。

struct bufferevent {
    // 读写事件
    struct event ev_read;
    struct event ev_write;

    // 读写缓冲区
    struct evbuffer *input;
    struct evbuffer *output;

    // 注意三个回调函数是核心
    bufferevent_data_cb readcb;
    bufferevent_data_cb writecb;
    bufferevent_event_cb errorcb;
    void *cbarg;

    // 其他
    // ......
};

我觉得上面代码就非常简洁易懂了, 两个读写时间没什么好说的, 两个读写缓冲区也是必须的(evbuffer的实现在后面会谈到), 三个回调函数就是核心。 读和写的回调函数没什么好说的, 唯一需要注意的是 bufferevent_event_cb errorcb 这个回调函数是必须注册的, 它关系到当该 bufferevent 对应的时间发生读写外的任何行为(比如socket关闭)时, 都会触发。

整理一遍 bufferevent 的事件处理过程就是:

  • 当可读事件发生时,调用 readcb 将 socket 的数据 通过 recv 读出来存入 input 缓冲区;
  • 当可写事件发生时,调用 writecb 将 output 缓冲区里面的数据通过 socket send 发送出去;
  • 当其他事件发生时,比如 socket close 发生,进行相应的数据清理退出工作。

evbuffer

基本上的异步IO服务里的buffer都是一个德行(包括Nginx也是这样), 都是即是数组又是链表(类似C++ STL里面的deque)。 对于 libevent 来说, evbuffer 是一个链表,管理整个缓冲区的头指针和尾指针,

struct evbuffer {
    struct evbuffer_chain *first;
    struct evbuffer_chain *last;
    size_t total_len;
    //......
};

对于 evbuffer 这个链表的每个单元,也就是 evbuffer_chain 来说, 则是数组(连续内存空间),

struct evbuffer_chain {
    struct evbuffer_chain *next;
    size_t buffer_len;
    size_t off;
    unsigned char *buffer;
    //......
};

总结

对于我个人而言,读源码的时候主要是从核心数据结构入手, 如果理解了这几个核心数据结构, 一般就能猜到这些数据结构的相关函数都有哪些。 可以围绕着这些结构去找相关的函数为己所用。

转载请注明出处: C1000K之Libevent源码分析