基于1.0对0.6的补充

总结

  • 添加对于Windows的兼容
  • 核心添加部分是bufferevent,模拟一个RW的两个缓存池用于管理,同时添加两个RW事件进行将socket的输入输出数据读到缓存池,再由用户读取
  • 将activequeue使用多优先级队列形式实现

evbuffer结构体

evbuffer结构体

evbuffer(event.h)结构体用于保存和管理缓存数据

  • evbuffer的buffer使用队列的方式进行管理,实现数据流的先进先出
  • evbuffer的结构体:libevent的缓冲是一个连续的内存区域,其处理数据的方式是一个队列操作方式:从后写入,从前读出。
  • evbuffer分别设置相关指针(一个指标)用于指示读出位置和写入位置。
  • orig_buffer指向由realloc分配的连续内存区域,buffer指向有效数据的内存区域,
  • totallen表示orig_buffer指向的内存区域的大小,misalign表示buffer相对于orig_buffer的偏移
  • off表示有效数据的长度。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #define EVBUFFER_READ 0x01
    #define EVBUFFER_WRITE 0x02
    #define EVBUFFER_EOF 0x10
    #define EVBUFFER_ERROR 0x20
    #define EVBUFFER_TIMEOUT 0x40
    struct evbuffer{
    u_char *buffer; // 当前有效缓冲区的内存起始地址
    u_char *orig_buffer; // 整个分配(realloc)用来缓冲的内存起始地址
    size_t misalign; // origin_buffer和buffer之间的字节数
    size_t totallen; // 整个分配用来缓冲的内存字节数
    size_t off; // 当前有效缓冲区的长度(字节数)
    void (*cb)(struct evbuffer *, size_t, size_t, void *); //回调函数,当缓冲区有变化的时候会被调用
    void *cbarg; //回调函数的参数
    };

evbuffer相关函数

  • evbuffer_new: 使用calloc创建一个evbuffer块
  • evbuffer_write:(struct evbuffer *buffer, int fd)把缓冲区中的数据,调用send/write函数写入文件描述符fd上,如果sendwrite函数写入的字节数大于0,则调用evbuffer_drain删除已写的数据
  • evbuffer_drain:(struct evbuffer *buf, size_t len)将buf中有效空间减少len
  • evbuffer_read: (struct evbuffer *buf, int fd, int howmuch)调用read/recv函数,从文件描述符fd上读取数据到evbuffer中。如果缓冲区不够,调用evbuffer_expand扩充缓冲区。
  • evbuffer_free: 先释放buffer->orig_buffer中空间在释放buffer自身空间

  • evbuffer_align: 处理先进先出的读写时,有效数值之前的已经被读写的废弃部分

  • evbuffer_add:(struct evbuffer *buf, void *data, size_t datlen)将data中datlen长的数据读入buf中,该函数用于添加一段用户数据到evbuffer中。很简单,就是先判断是否有足够的空闲内存,如果没有则调用evbuffer_expand扩充之,然后直接memcpy,更新off指标。
  • evbuffer_add_buffer:(struct evbuffer *outbuf, struct evbuffer *inbuf)移动数据从一个evbuffer到另一个evbuffer。实际上还是调用了evbuffer_add添加数据到outbuf中。但会清除inbuf中的数据。返回值:成功返回0, 失败返回-1。
  • int evbuffer_expand(struct evbuffer *buf, size_t datlen)将buf中的有效空间大小扩张到buf->misalign + buf->off + datlen;

  • evbuffer_add_printf(struct evbuffer *buf, char *fmt, ...)将数据以fmt格式输出到buf中(类似printf)

  • evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen)该函数用于将evbuffer中的数据复制给用户空间(读数据)
  • evbuffer_find(struct evbuffer *buffer, u_char *what, size_t len)在buffer中查找what
  • void evbuffer_setcb(struct evbuffer *buffer,void (*cb)(struct evbuffer *, size_t, size_t, void *),void *cbarg)为buffer设置回调函数

evbuffer_align和evbuffer_expand调整原来buffer的空间

  • evbuffer_add:(struct evbuffer *buf, void *data, size_t datlen)将data中datlen长的数据读入buf中
  • evbuffer_free: 先释放buffer->orig_buffer中空间在释放buffer自身空间
  • evbuffer_align: 处理先进先出的读写时,有效数值之前的已经被读写的废弃部分
  • int evbuffer_expand(struct evbuffer *buf, size_t datlen)将buf中的有效空间大小扩张到buf->misalign + buf->off + datlen;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    // buffer.c
    int evbuffer_add(struct evbuffer *buf, void *data, size_t datlen)
    {
    size_t need = buf->misalign + buf->off + datlen;
    size_t oldoff = buf->off;
    // 调整空间使得buf有足够的空间
    if (buf->totallen < need) {
    if (evbuffer_expand(buf, datlen) == -1)
    return (-1);
    }
    memcpy(buf->buffer + buf->off, data, datlen);
    buf->off += datlen;
    // 回调buf中的cb函数
    if (datlen && buf->cb != NULL)
    (*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
    return (0);
    }
    static inline void evbuffer_align(struct evbuffer *buf)
    {
    memmove(buf->orig_buffer, buf->buffer, buf->off);
    buf->buffer = buf->orig_buffer;
    buf->misalign = 0;
    }
    int evbuffer_expand(struct evbuffer *buf, size_t datlen)
    {
    size_t need = buf->misalign + buf->off + datlen;
    if (buf->totallen >= need)
    return (0);
    if (buf->misalign >= datlen) {
    evbuffer_align(buf);
    } else {
    void *newbuf;
    size_t length = buf->totallen;
    if (length < 256)
    length = 256;
    while (length < need)
    length <<= 1;
    if (buf->orig_buffer != buf->buffer)
    evbuffer_align(buf);
    if ((newbuf = realloc(buf->buffer, length)) == NULL)
    return (-1);
    buf->orig_buffer = buf->buffer = newbuf;
    buf->totallen = length;
    }
    return (0);
    }
    // 将buf中的有效空间大小扩张到buf->misalign + buf->off + datlen;
    int evbuffer_expand(struct evbuffer *buf, size_t datlen)
    {
    size_t need = buf->misalign + buf->off + datlen;
    /* If we can fit all the data, then we don't have to do anything */
    if (buf->totallen >= need)
    return (0);
    if (buf->misalign >= datlen) {
    evbuffer_align(buf);
    } else {
    void *newbuf;
    size_t length = buf->totallen;
    if (length < 256)
    length = 256;
    while (length < need)
    length <<= 1;
    if (buf->orig_buffer != buf->buffer)
    evbuffer_align(buf);
    if ((newbuf = realloc(buf->buffer, length)) == NULL)
    return (-1);
    buf->orig_buffer = buf->buffer = newbuf;
    buf->totallen = length;
    }
    return (0);
    }

evbuffer_write和evbuffer_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
int evbuffer_read(struct evbuffer *buf, int fd, int howmuch)
{
u_char *p;
size_t oldoff = buf->off;
if (howmuch < 0 || howmuch > n)
howmuch = n =EVBUFFER_MAX_READ;
/* If we don't have FIONREAD, we might waste some space here */
if (evbuffer_expand(buf, howmuch) == -1)
return (-1);
/* We can append new data at this point */
p = buf->buffer + buf->off;
n = read(fd, p, howmuch);
if (n == -1)
return (-1);
if (n == 0)
return (0);
buf->off += n;
/* Tell someone about changes in this buffer */
// buf->cb 一般为bufferevent_read_pressure_cb
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
return (n);
}
void bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,void *arg) {
struct bufferevent *bufev = arg;
if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
evbuffer_setcb(buf, NULL, NULL);
if (bufev->enabled & EV_READ)
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
}
}
// write函数
int evbuffer_write(struct evbuffer *buffer, int fd){
int n;
n = write(fd, buffer->buffer, buffer->off);
if (n == -1)
return (-1);
if (n == 0)
return (0);
evbuffer_drain(buffer, n);
return (n);
}
voidevbuffer_drain(struct evbuffer *buf, size_t len){
size_t oldoff = buf->off;
if (len >= buf->off) {
buf->off = 0;
buf->buffer = buf->orig_buffer;
buf->misalign = 0;
goto done;
}
buf->buffer += len;
buf->misalign += len;
buf->off -= len;
done:
/* Tell someone about changes in this buffer */
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
}

bufferevent

bufferevent结构体用于管理input和output缓存,模拟socket通信中的两个端口,同时使用ev_read和ev_write事件作为控制
bufferevent使用ev_read和ev_write作为读写事件,用于将跟event的整个loop做关联,完成fd的读写控制. bufferevent中用于存放数据的缓存(input和output), bufferevent_readcb数据从fd读入到input(这是在ev_read事件的回调函数触发的),即bufferevent_readcb是ev_read事件的回调函数,并在合适时候调用readcb. bufferevent_readcb数据从output读入到fd(这是在ev_write事件的回调函数触发的),即bufferevent_writecb是ev_write事件的回调函数,并在合适时候调用writecb.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct bufferevent {
//bufferevent的读写事件
struct event ev_read;
struct event ev_write;
//bufferevent中用于存放数据的缓存,bufferevent_readcb数据从fd读入到input(这是在ev_read事件的回调函数触发的),
//即bufferevent_readcb是ev_read事件的回调函数,并在合适时候调用readcb
//bufferevent_readcb数据从output读入到fd(这是在ev_write事件的回调函数触发的),
//即bufferevent_writecb是ev_write事件的回调函数,并在合适时候调用writecb
struct evbuffer *input;
struct evbuffer *output;
//用于记录当前input可以读写的大小范围,如果超过bufev->wm_read.high,则停止读写
struct event_watermark wm_read;
struct event_watermark wm_write;
evbuffercb readcb; //可读时的回调函数指针
evbuffercb writecb; //可写时的回调函数指针
everrorcb errorcb; //错误发生时的回调函数指针
void *cbarg; //在初始化时被赋值,一般为struct bufferevent *bufev
int timeout_read; /* in seconds */
int timeout_write; /* in seconds */
//用于标注bufferevent的EV_READ或EV_WRITE功能
short enabled; /* events that are currently enabled */
};

bufferevent处理函数

  • bufferevent_new:(int fd, evbuffercb readcb, evbuffercb writecb,everrorcb errorcb, void *cbarg)创建input和output,并添加RW两个event,将其处理函数设为bufferevent_readcb和bufferevent_writecb(调用readcb和writecb)
  • bufferevent_add:(struct event *ev, int timeout)调用event_add将ev(一般来源于bufferevent的ev_write和ev_read事件)加入消息控制当中
  • bufferevent_writecb:处理buffer中的写消息函数
  • bufferevent_readcb:处理buffer中的读消息函数

  • bufferevent_write(struct bufferevent bufev, void data, size_t size)将data写进bufev的缓存区,并添加写event

  • bufferevent_write_buffer(struct bufferevent bufev, struct evbuffer buf)将buf缓存中的数据加入bufev的缓存区,并添加写event
  • size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size)从bufev中读取数据到data中

  • bufferevent_disable(struct bufferevent *bufev, short event)根据event移除bufev中的R或W事件,并从队列中删除

  • bufferevent_enable(struct bufferevent *bufev, short event)根据event添加bufev中的R或W事件,并加入队列中
  • bufferevent_free(struct bufferevent *bufev)销毁RW的struct event和RW的struct evbuffer
  • bufferevent_priority_set(struct bufferevent *bufev, int priority)调用event的event_priority_set函数将bufev中RW都设置为priority优先级
  • bufferevent_settimeout(struct bufferevent *bufev, int timeout_read, int timeout_write)设置bufev中RW时间的timeout_write和timeout_read
  • bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark)设置bufev中events事件的lowmark和highmark
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    struct bufferevent *bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
    {
    struct bufferevent *bufev;
    if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
    return (NULL);
    if ((bufev->input = evbuffer_new()) == NULL) {
    free(bufev);
    return (NULL);
    }
    if ((bufev->output = evbuffer_new()) == NULL) {
    evbuffer_free(bufev->input);
    free(bufev);
    return (NULL);
    }
    event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
    event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
    bufev->readcb = readcb;
    bufev->writecb = writecb;
    bufev->errorcb = errorcb;
    bufev->cbarg = cbarg;
    bufev->enabled = EV_READ | EV_WRITE;
    return (bufev);
    }

处理buffer中的读写消息函数

  • bufferevent_writecb:处理buffer中的写消息函数
  • bufferevent_readcb:处理buffer中的读消息函数

bufferevent读取流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
static void
bufferevent_writecb(int fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_WRITE;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
if (EVBUFFER_LENGTH(bufev->output)) {
res = evbuffer_write(bufev->output, fd);
if (res == -1) {
if (errno == EAGAIN ||
errno == EINTR ||
errno == EINPROGRESS)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
}
// 每次event被处理过去之后都会移除队列,需要重新加入
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
// 当off<low时说明buff中已写的缓存够多,才发送数据
if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
return;
reschedule:
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
return;
error:
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}
static void bufferevent_readcb(int fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_READ;
size_t len;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
res = evbuffer_read(bufev->input, fd, -1);
if (res == -1) {
if (errno == EAGAIN || errno == EINTR)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
//当input缓存超出high水位线之后,系统将bufev->input的回调函数设为bufferevent_read_pressure_cb,并将将bufev->ev_read拉出就绪队列
//每次在操作input缓存时,都进行判断其是否低于high水位,若是将其加入到就绪队列
len = EVBUFFER_LENGTH(bufev->input);
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
return;
}
(*bufev->readcb)(bufev, bufev->cbarg);
return;
reschedule:
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
return;
error:
(*bufev->errorcb)(bufev, what, bufev->cbarg);
}

event_base结构体

event_base结构体用于作为整个event库的管理中心类,定义了event控制类(evsel和evbase),各种消息队列(除了全局队列,eventqueue、activequeues和timetree)。其中activequeues表示多个队列,用于保存不同优先级的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct event_base {
const struct eventop *evsel; //在select版本中evsel为sop
// struct selectop {
// int event_fds; /* Highest fd in fd set */
// int event_fdsz;
// fd_set *event_readset;
// fd_set *event_writeset;
// sigset_t evsigmask;
// } sop;
void *evbase; //在select版本中evsel为selectops
// const struct eventop selectops = {
// "select",
// select_init,
// select_add,
// select_del,
// select_recalc,
// select_dispatch
// };
//
int event_count; //用于记录当前base中事件数
int event_count_active; //用于记录当前base中被激活的事件数
/* Handle signals - This is a deprecated interface */
int (*event_sigcb)(void); /* Signal callback when gotsig is set */
int event_gotsig; //为1时表示触发中断,回调event_sigcb函数
int event_gotterm; //为1(调用event_loopexit将其设为1)时,event_base_loop结束循环,为0时,继续循环
/* active event management */
struct event_list **activequeues; //activequeues最高维的长度为nactivequeues
int nactivequeues;
struct event_list eventqueue;
struct timeval event_tv;
RB_HEAD(event_tree, event) timetree;
};

event_priority_init和event_base_priority_init

1
2
3
4
5
6
//重新分配base中activequeues数组的大小,将其设为nactivequeues = npriorities
int event_priority_init(int npriorities){
return event_base_priority_init(current_base, npriorities);
}
int event_base_priority_init(struct event_base *base, int npriorities)

loop相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
int event_base_loop(struct event_base *base, int flags){
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
int res, done;
/* Calculate the initial events that we are waiting for */
if (evsel->recalc(base, evbase, 0) == -1)
return (-1);
done = 0;
while (!done) {
/* Terminate the loop if we have been asked to */
//使用event_gotterm控制循环是否结束,event_loopexit用来将其置1
if (base->event_gotterm) {
base->event_gotterm = 0;
break;
}
//回调event_sigcb处理中断
while (base->event_gotsig) {
base->event_gotsig = 0;
if (base->event_sigcb) {
res = (*base->event_sigcb)();
if (res == -1) {
errno = EINTR;
return (-1);
}
}
}
//调整运行的时间event_tv
gettimeofday(&tv, NULL);
if (timercmp(&tv, &base->event_tv, <)) {
struct timeval off;
LOG_DBG((LOG_MISC, 10,
"%s: time is running backwards, corrected",
__func__));
timersub(&base->event_tv, &tv, &off);
timeout_correct(base, &off);
}
base->event_tv = tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK))
timeout_next(base, &tv);
else
timerclear(&tv);
/* If we have no events, we just exit */
if (!event_haveevents(base))
return (1);
res = evsel->dispatch(base, evbase, &tv);
if (res == -1)
return (-1);
//处理延迟事件
timeout_process(base);
//处理active事件
if (base->event_count_active) {
event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
if (evsel->recalc(base, evbase, 0) == -1)
return (-1);
}
return (0);
}
int event_dispatch(void){
return (event_loop(0));
}
int event_base_dispatch(struct event_base *event_base){
return (event_base_loop(event_base, 0));
}
static void event_loopexit_cb(int fd, short what, void *arg){
struct event_base *base = arg;
base->event_gotterm = 1;
}
int event_loopexit(struct timeval *tv){
return (event_once(-1, EV_TIMEOUT, event_loopexit_cb,
current_base, tv));
}
int event_loop(int flags){
return event_base_loop(current_base, flags);
}

event_once函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct event_once {
struct event ev;
void (*cb)(int, short, void *);
void *arg;
};
static void event_once_cb(int fd, short events, void *arg)
{
struct event_once *eonce = arg;
(*eonce->cb)(fd, events, eonce->arg);
free(eonce);
}
int event_once(int fd, short events,void (*callback)(int, short, void *), void *arg, struct timeval *tv)

其他函数

1
2
3
4
int event_haveevents(struct event_base *base) //判断是否有激活事件
static void event_process_active(struct event_base *base) //处理激活事件
void event_set(struct event *ev, int fd, short events,void (*callback)(int, short, void *), void *arg)//用于设置event的响应参数
int event_base_set(struct event_base *base, struct event *ev) //用于设置eventbase的响应参数