libevent入门

libevent简介和使用

测试实例

简单定时器:实现程序每秒输出一个“Game Over!”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <iostream>
// libevent头文件
#include <event.h>
using namespace std;
// 定时事件回调函数
void onTime(int sock, short event, void *arg)
{
cout << "Game Over!" << endl;
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
// 重新添加定时事件(定时事件触发后默认自动删除)
event_add((struct event*)arg, &tv);
}
int main()
{
// 初始化
event_init();
struct event evTime;
// 设置定时事件
evtimer_set(&evTime, onTime, &evTime);
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
// 添加定时事件
event_add(&evTime, &tv);
// 事件循环
event_dispatch();
return 0;
}

组成部分

  • 事件管理包括各种IO(socket)、定时器、信号等事件,也是libevent应用最广的模块;
  • 缓存管理是指evbuffer功能;
  • DNS是libevent提供的一个异步DNS查询功能;
  • HTTP是libevent的一个轻量级http实现,包括服务器和客户端

libevent使用

    每个线程有且只有一个event_base,对应一个struct event_base结构体,用来schedule托管给它的一系列event。当一个事件发生后,event_base会在合适的时间去调用绑定在这个事件上的函数(传入一些预定义的参数,以及在绑定时指定的一个参数),直到这个函数执行完,再返回schedule其他事件。

创建一个event_base

1
2
struct event_base *base = event_base_new();
assert(base != NULL);

event_base内部有一个循环,循环阻塞在epoll/kqueue等系统调用上,直到有一个/一些事件发生,然后去处理这些事件。当然,这些事件要被绑定在这个event_base上。每个事件对应一个struct event,可以是监听一个fd或者POSIX信号量之类。

创建并绑定一个event

struct event使用event_new来创建和绑定,使用event_add来启用:

1
2
3
4
5
6
//创建并绑定一个event
struct event *listen_event;
//参数:event_base, 监听的fd,事件类型及属性,绑定的回调函数,给回调函数的参数
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, (void*)base);
//参数:event,超时时间(struct timeval *类型的,NULL表示无超时设置)
event_add(listen_event, NULL);

libevent支持的事件及属性包括(使用bitfield实现,用 | 来让它们合体)

  • EV_TIMEOUT: 超时
  • EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
  • EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
  • EV_SIGNAL: POSIX信号量,参考manual吧
  • EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
  • EV_ET: Edge-Trigger边缘触发,参考EPOLL_ET

启动event_base的循环

  循环的启动使用event_base_dispatch,循环将一直持续,直到不再有需要关注的事件,或者是遇到event_loopbreak()/event_loopexit()函数。

1
2
//启动事件循环
event_base_dispatch(base);

回调函数callback_func

typedef void(* event_callback_fn)(evutil_socket_t sockfd, short event_type, void *arg)
  绑定到event的回调函数callback_func:传递给它的是一个socket fd、一个event类型及属性bit_field、以及传递给event_new的最后一个参数.

使用libevent创建服务器服务

基础流程

  1. listener = socket(),bind(),listen(),设置nonblocking
  2. 创建一个event_base
  3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。对于listener socket来说,只需要监听EV_READ|EV_PERSIST
  4. 启用该事件
  5. 进入事件循环
  6. (异步) 当有client发起请求的时候,调用该回调函数,进行处理。

创建event处理read和write

  1. 将这个sockfd设置为nonblocking
  2. 创建2个event:
    • event_read,绑上sockfd的EV_READ|EV_PERSIST,设置回调函数和参数
    • event_write,绑上sockfd的EV_WRITE|EV_PERSIST,设置回调函数和参数
  3. 启用event_read事件
  4. (异步) 等待event_read事件的发生, 调用相应的回调函数。回调函数用recv读入的数据,不能直接用send丢给sockfd了事——因为sockfd是nonblocking的。所以需要一个自己管理的缓存用来保存读入的数据中(在accept以后就创建一个struct,作为第2步回调函数的arg传进来)。在合适的时间启用event_write事件[event_add(event_write, NULL)],等待EV_WRITE事件的触发
  5. (异步) 当event_write事件的回调函数被调用的时候,往sockfd写入数据,然后删除event_write事件[event_del(event_write)],等待event_read事件的下一次执行。

使用bufferevent改进服务

struct bufferevent内建了两个event(read/write)和对应的缓冲区[struct evbuffer input, output],并提供相应的函数用来操作缓冲区(或者直接操作bufferevent)。每当有数据被读入input的时候,read_cb函数被调用;每当output被输出完的时候,write_cb被调用;在网络IO操作出现错误的情况(连接中断、超时、其他错误),error_cb被调用。

  1. 设置sockfd为nonblocking
  2. 使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base.
  3. 使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg)将EV_READ/EV_WRITE对应的函数.
  4. 使用bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)来启用read/write事件
  5. (异步)处理函数
  • 在read_cb里面从input读取数据,处理完毕后塞到output里(会被自动写入到sockfd)
  • 在write_cb里面
  • error_cb里面处理遇到的错误
1
2
3
4
5
6
7
8
9
10
11
void read_cb(struct bufferevent *bev, void *arg) {
char line[256];
int n;
evutil_socket_t fd = bufferevent_getfd(bev);
while (n = bufferevent_read(bev, line, 256), n > 0)
bufferevent_write(bev, line, n);
}
void error_cb(struct bufferevent *bev, short event, void *arg) {
bufferevent_free(bev);
}

可以从bev中用libevent的API提取出event_base、sockfd、input/output等相关数据

范例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#define LISTEN_PORT 9999
#define LISTEN_BACKLOG 32
void do_accept(evutil_socket_t listener, short event, void *arg);
void read_cb(struct bufferevent *bev, void *arg);
void error_cb(struct bufferevent *bev, short event, void *arg);
void write_cb(struct bufferevent *bev, void *arg);
int main(int argc, char *argv[])
{
int ret;
evutil_socket_t listener;
listener = socket(AF_INET, SOCK_STREAM, 0);
assert(listener > 0);
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(LISTEN_PORT);
if (bind(listener, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
perror("bind");
return 1;
}
if (listen(listener, LISTEN_BACKLOG) < 0) {
perror("listen");
return 1;
}
printf ("Listening...\n");
evutil_make_socket_nonblocking(listener);
struct event_base *base = event_base_new();
assert(base != NULL);
struct event *listen_event;
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
event_add(listen_event, NULL);
event_base_dispatch(base);
printf("The End.");
return 0;
}
void do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = (struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_in sin;
socklen_t slen = sizeof(sin);
fd = accept(listener, (struct sockaddr *)&sin, &slen);
if (fd < 0) {
perror("accept");
return;
}
if (fd > FD_SETSIZE) {
perror("fd > FD_SETSIZE\n");
return;
}
printf("ACCEPT: fd = %u\n", fd);
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, NULL, error_cb, arg);
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
}
void read_cb(struct bufferevent *bev, void *arg)
{
#define MAX_LINE 256
char line[MAX_LINE+1];
int n;
evutil_socket_t fd = bufferevent_getfd(bev);
while (n = bufferevent_read(bev, line, MAX_LINE), n > 0) {
line[n] = '\0';
printf("fd=%u, read line: %s\n", fd, line);
bufferevent_write(bev, line, n);
}
}
void write_cb(struct bufferevent *bev, void *arg) {}
void error_cb(struct bufferevent *bev, short event, void *arg)
{
evutil_socket_t fd = bufferevent_getfd(bev);
printf("fd = %u, ", fd);
if (event & BEV_EVENT_TIMEOUT) {
printf("Timed out\n"); //if bufferevent_set_timeouts() called
}
else if (event & BEV_EVENT_EOF) {
printf("connection closed\n");
}
else if (event & BEV_EVENT_ERROR) {
printf("some other error\n");
}
bufferevent_free(bev);
}