基于0.6上对0.2的补充

总结

  • tree.h中的RB_ENTRY用于存放EVLIST_TIMEOUT的event事件
  • 延迟时间的处理函数
  • EVLIST_ACTIVE状态:系统将所有待回调的函数放到activequeue队列中,并在loop中调用event_process_active来回调所有处于EVLIST_ACTIVE的事件
  • select中实现signal的方法

event状态图

libevent0.6状态图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// event.c 核心循环过程
int event_loop(int flags)
{
struct timeval tv;
int res, done;
/* Calculate the initial events that we are waiting for */
if (evsel->recalc(evbase, 0) == -1)
return (-1);
// event_sigcb消息signal回调函数; event_gotsig消息signal的消息量
done = 0;
while (!done) {
while (event_gotsig) {
event_gotsig = 0;
if (event_sigcb) {
res = (*event_sigcb)();
if (res == -1) {
errno = EINTR;
return (-1);
}
}
}
//计算当前到第一个延时函数到期的时间,存放到tv中
/* Check if time is running backwards */
gettimeofday(&tv, NULL);
if (timercmp(&tv, &event_tv, <)) {
struct timeval off;
LOG_DBG((LOG_MIST, 10,
"%s: time is running backwards, corrected",
__FUNCTION__));
timersub(&event_tv, &tv, &off);
timeout_correct(&off);
}
event_tv = tv;
if (!(flags & EVLOOP_NONBLOCK))
timeout_next(&tv);
else
timerclear(&tv);
//判断是否存在事件event需要被调用
if (!event_haveevents())
return (1);
res = evsel->dispatch(evbase, &tv);
if (res == -1)
return (-1);
//处理所有的延时event
timeout_process();
if (TAILQ_FIRST(&activequeue)) {
event_process_active();
if (flags & EVLOOP_ONCE)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
if (evsel->recalc(evbase, 0) == -1)
return (-1);
}
return (0);
}
// select.c 调用的底层实现机制
int select_dispatch(void *arg, struct timeval *tv)
{
int maxfd, res;
struct event *ev, *next;
struct selectop *sop = arg;
memset(sop->event_readset, 0, sop->event_fdsz);
memset(sop->event_writeset, 0, sop->event_fdsz);
TAILQ_FOREACH(ev, &eventqueue, ev_next) {
if (ev->ev_events & EV_WRITE)
FD_SET(ev->ev_fd, sop->event_writeset);
if (ev->ev_events & EV_READ)
FD_SET(ev->ev_fd, sop->event_readset);
}
if (signal_deliver() == -1)
return (-1);
res = select(sop->event_fds + 1, sop->event_readset,
sop->event_writeset, NULL, tv);
if (signal_recalc() == -1)
return (-1);
if (res == -1) {
if (errno != EINTR) {
log_error("select");
return (-1);
}
signal_process();
return (0);
} else if (signal_caught)
signal_process();
LOG_DBG((LOG_MISC, 80, __FUNCTION__": select reports %d",
res));
maxfd = 0;
for (ev = TAILQ_FIRST(&eventqueue); ev != NULL; ev = next) {
next = TAILQ_NEXT(ev, ev_next);
res = 0;
if (FD_ISSET(ev->ev_fd, sop->event_readset))
res |= EV_READ;
if (FD_ISSET(ev->ev_fd, sop->event_writeset))
res |= EV_WRITE;
res &= ev->ev_events;
if (res) {
if (!(ev->ev_events & EV_PERSIST))
event_del(ev);
event_active(ev, res, 1);
} else if (ev->ev_fd > maxfd)
maxfd = ev->ev_fd;
}
sop->event_fds = maxfd;
return (0);
}

常用变量

全局队列

  • static RB_HEAD(event_tree, event) timetree:以红黑树存储所有超时的event
  • static struct event_list activequeue: 存储所有已响应待处理的event,signal和RW事件响应后自动转入该队列
  • struct event_list signalqueue:存储所有signal事件未处理
  • struct event_list eventqueue:存储所有READ|WRITE的event未处理
  • static struct timeval event_tv: 保存系统当前时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 树的响应处理函数tree.h
#define RB_PROTOTYPE(name, type, field, cmp) \
void name##_RB_INSERT_COLOR(struct name *, struct type *); \
void name##_RB_REMOVE_COLOR(struct name *, struct type *, struct type *);\
void name##_RB_REMOVE(struct name *, struct type *); \
struct type *name##_RB_INSERT(struct name *, struct type *); \
struct type *name##_RB_FIND(struct name *, struct type *); \
struct type *name##_RB_NEXT(struct name *, struct type *); \
struct type *name##_RB_MINMAX(struct name *, int); \
#define RB_GENERATE(name, type, field, cmp)
... // RB_PROTOTYPE的各种实现
// event.c
RB_PROTOTYPE(event_tree, event, ev_timeout_node, compare);
RB_GENERATE(event_tree, event, ev_timeout_node, compare);

event结构体和状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct event {
TAILQ_ENTRY (event) ev_next;
TAILQ_ENTRY (event) ev_active_next;
TAILQ_ENTRY (event) ev_signal_next;
RB_ENTRY (event) ev_timeout_node;
int ev_fd; // 文件描述符
short ev_events; //set该event将处理的事件类型
short ev_ncalls; //event将处理的事件类型将出现的次数,即ev_callback调用次数
short *ev_pncalls; /* Allows deletes in callback */
struct timeval ev_timeout;
void (*ev_callback)(int, short, void *arg);
void *ev_arg;
int ev_res; /* result passed to event callback */
int ev_flags; // event当前状态
};
// ev_flags
#define EVLIST_TIMEOUT 0x01
#define EVLIST_INSERTED 0x02
#define EVLIST_SIGNAL 0x04
#define EVLIST_ACTIVE 0x08
#define EVLIST_INIT 0x80
/* EVLIST_X_ Private space: 0x1000-0xf000 */
#define EVLIST_ALL (0xf000 | 0x8f)
// ev_events
#define EV_TIMEOUT 0x01
#define EV_READ 0x02
#define EV_WRITE 0x04
#define EV_SIGNAL 0x08
#define EV_PERSIST 0x10 /* Persistant event */

event(event.h和event.c)

  • event_process_active: 对activequeue中所有的event调用ev_callback回调函数
  • event_haveevents: int XXX(void)判断当前是否有事件
  • event_set: (struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)初始化事件ev_flags = EVLIST_INIT
  • event_pending:挂起事件
  • event_add: 添加事件==添加到对应queue、设置flag、设置tv
  • event_del: 删除时间
  • event_active: 参数(struct event *ev, int res, short ncalls)==加入avtive队列、设置ncalls执行次数、设定res
  • event_queue_remove: (struct event *ev, int queue)将事件移除响应的queue队列
  • event_queue_insert: (struct event *ev, int queue)将事件加入响应的queue队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// event.c
void event_process_active(void)
{
struct event *ev;
short ncalls;
for (ev = TAILQ_FIRST(&activequeue); ev;
ev = TAILQ_FIRST(&activequeue)) {
event_queue_remove(ev, EVLIST_ACTIVE);
/* Allows deletes to work */
ncalls = ev->ev_ncalls;
ev->ev_pncalls = &ncalls;
while (ncalls) {
ncalls--;
ev->ev_ncalls = ncalls;
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
}
}
}
int event_pending(struct event *ev, short event, struct timeval *tv)
{
int flags = 0;
if (ev->ev_flags & EVLIST_INSERTED)
flags |= (ev->ev_events & (EV_READ|EV_WRITE));
if (ev->ev_flags & EVLIST_ACTIVE)
flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT;
event &= (EV_TIMEOUT|EV_READ|EV_WRITE);
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT))
*tv = ev->ev_timeout;
return (flags & event);
}

time

数据结构

libevent将所有设置里EVLIST_TIMEOUT的event存放到一个event_tree结构体的timetree树中,同时创建以event_tree为节点类型的树结构的响应函数:

  • event_tree结构体
  • timetree:实际存储超时event的红黑树
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #define RB_ENTRY(type) \
    struct { \
    struct type *rbe_left; /* left element */ \
    struct type *rbe_right; /* right element */ \
    struct type *rbe_parent; /* parent element */ \
    int rbe_color; /* node color */ \
    }
    // 使用tree.h的RB_HEAD结构创建以event_tree为名的结构体
    #define RB_HEAD(name, type) \
    struct name { \
    struct type *rbh_root; /* root of the tree */ \
    }
    static RB_HEAD(event_tree, event) timetree;

相关函数

  • timeout_next: 获取eventree中最小时间的tv用于dispatch中的tv参数
  • timeout_correct: event.c中设有全局变量event_tv保存当前时间,用于校验时间
  • timeout_process: 处理所有超时的时间,并将其加入到active队列中
  • timeout_insert: (struct event *ev)将事件插入超时queue
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // off是event_tv与当前时间的差
    void timeout_correct(struct timeval *off)
    {
    struct event *ev;
    /* We can modify the key element of the node without destroying
    * the key, beause we apply it to all in the right order.
    */
    RB_FOREACH(ev, event_tree, &timetree)
    timersub(&ev->ev_timeout, off, &ev->ev_timeout);
    }
    void timeout_process(void)
    {
    struct timeval now;
    struct event *ev, *next;
    gettimeofday(&now, NULL);
    for (ev = RB_MIN(event_tree, &timetree); ev; ev = next) {
    if (timercmp(&ev->ev_timeout, &now, >))
    break;
    next = RB_NEXT(event_tree, &timetree, ev);
    event_queue_remove(ev, EVLIST_TIMEOUT);
    /* delete this event from the I/O queues */
    event_del(ev);
    LOG_DBG((LOG_MISC, 60, "timeout_process: call %p",
    ev->ev_callback));
    event_active(ev, EV_TIMEOUT, 1);
    }
    }

signal相关的函数

  • signal_deliver: 解除sop中的信号中断屏蔽
  • signal_process: 将signalqueue中所有event转到event_queue中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // event.c event_dispatch中
    int signal_deliver(void)
    {
    //sigprocmask用于解除(SIG_UNBLOCK)进程中的信号中断屏蔽
    return (sigprocmask(SIG_UNBLOCK, &sop.evsigmask, NULL));
    /* XXX - pending signals handled here */
    }
    // event.c event_dispatch中
    void signal_process(void)
    {
    struct event *ev;
    short ncalls;
    TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
    ncalls = evsigcaught[EVENT_SIGNAL(ev)]; //#define EVENT_SIGNAL(ev) ev->ev_fd
    if (ncalls) {
    if (!(ev->ev_events & EV_PERSIST))
    event_del(ev);
    event_active(ev, EV_SIGNAL, ncalls); //ev->ev_ncalls = ncalls;
    //event_queue_insert(ev, EVLIST_ACTIVE);
    //将中断事件和中断次数放到event,并添加到activequeue
    }
    }
    memset(evsigcaught, 0, sizeof(evsigcaught));
    signal_caught = 0;
    }

select中实现signal的方法

libevent使用evsigcaught表示每次第i=fd的终端事件发生的次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
short evsigcaught[NSIG];
//设置每次终端,增加响应的evsigcaught的值,记录终端次数
static void signal_handler(int sig)
{
evsigcaught[sig]++;
signal_caught = 1;
}
int signal_recalc(void)
{
struct sigaction sa;
struct event *ev;
if (sigprocmask(SIG_BLOCK, &sop.evsigmask, NULL) == -1)
return (-1);
/* Reinstall our signal handler. */
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_handler;
sa.sa_mask = sop.evsigmask;
sa.sa_flags |= SA_RESTART;
TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
if (sigaction(EVENT_SIGNAL(ev), &sa, NULL) == -1)
return (-1);
}
return (0);
}
void signal_process(void)
{
struct event *ev;
short ncalls;
TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
ncalls = evsigcaught[EVENT_SIGNAL(ev)];
if (ncalls) {
if (!(ev->ev_events & EV_PERSIST))
event_del(ev);
event_active(ev, EV_SIGNAL, ncalls); //ev->ev_ncalls = ncalls;
//event_queue_insert(ev, EVLIST_ACTIVE);
//将中断事件和中断次数放到event,并添加到activequeue
}
}
memset(evsigcaught, 0, sizeof(evsigcaught));
signal_caught = 0;
}