一文读懂 Linux epoll 实现原理

1.epoll 的用法先复习下 epoll 的用法。

如下的代码中,先用 epoll_create 创建一个 epoll 文件描述符 epfd,再通过 epoll_ctl 将需要监听的 socket 添加到 epfd 中,最后调用 epoll_wait 等待数据。

代码语言:javascript复制int s = socket(AF_INET, SOCK_STREAM, 0);

bind(s, ...);

listen(s, ...)

int epfd = epoll_create(...);

epoll_ctl(epfd, ...); // 将所有需要监听的 socket 添加到 epfd 中。

#define MAX_EVENTS 10

struct epoll_event events[MAX_EVENTS];

while(1) {

int n = epoll_wait(epfd, events, ...); // 返回就绪的 socket 数量。

for(int i = 0; i < n; ++i) {

if (events[n].data.fd == listen_sock) {

// 处理

}

}

}创建 socket 时,操作系统会创建一个由文件系统管理的 socket 对象。这个 socket 对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该 socket 事件的进程。

2.epoll 的创建要使用 epoll 首先需要调用 epoll_create() 函数创建一个 epoll 的文件描述符,函数原型如下:

代码语言:javascript复制int epoll_create(int size);参数 size 是由于历史原因遗留下来的,自 Linux 2.6.8 以来,已不起作用,但必须大于零。

当用户调用 epoll_create() 函数时,会进入到内核空间,并且调用 do_epoll_create() 内核函数来创建 epoll 句柄,do_epoll_create() 函数代码如下:

代码语言:javascript复制/*

* Open an eventpoll file descriptor.

*/

static int do_epoll_create(int flags)

{

int error, fd;

struct eventpoll *ep = NULL;

struct file *file;

/* Check the EPOLL_* constant for consistency. */

BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)

return -EINVAL;

/*

* Create the internal data structure ("struct eventpoll").

*/

error = ep_alloc(&ep);

if (error < 0)

return error;

/*

* Creates all the items needed to setup an eventpoll file. That is,

* a file structure and a free file descriptor.

*/

fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

if (fd < 0) {

error = fd;

goto out_free_ep;

}

file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,

O_RDWR | (flags & O_CLOEXEC));

if (IS_ERR(file)) {

error = PTR_ERR(file);

goto out_free_fd;

}

ep->file = file;

fd_install(fd, file);

return fd;

out_free_fd:

put_unused_fd(fd);

out_free_ep:

ep_free(ep);

return error;

}sys_epoll_create() 主要做两件事情:

调用 ep_alloc() 函数创建并初始化一个 eventpoll 对象。调用 anon_inode_getfd() 函数把 eventpoll 对象映射到一个文件描述符,并返回这个文件描述符。3.epoll 对象结构从 do_epoll_create() 源码可以看出,epoll 对象实际上是一个 eventpoll,定义如下:

代码语言:javascript复制struct eventpoll {

/* Protect the access to this structure */

spinlock_t lock;

/*

* This mutex is used to ensure that files are not removed

* while epoll is using them. This is held during the event

* collection loop, the file cleanup path, the epoll file exit

* code and the ctl operations.

*/

struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */

wait_queue_head_t wq;

/* Wait queue used by file->poll() */

wait_queue_head_t poll_wait;

/* List of ready file descriptors */

struct list_head rdllist;

/* RB tree root used to store monitored fd structs */

struct rb_root rbr;

/*

* This is a single linked list that chains all the "struct epitem" that

* happened while transferring ready events to userspace w/out

* holding ->lock.

*/

struct epitem *ovflist;

/* wakeup_source used when ep_scan_ready_list is running */

struct wakeup_source *ws;

/* The user that created the eventpoll descriptor */

struct user_struct *user;

struct file *file;

/* used to optimize loop detection check */

int visited;

struct list_head visited_list_link;

}其中有几个成员需要重点关注:

(1)被监听的 Socket 列表(rbr )。

通过 epoll_ctl(2) 将监控的 socket 添加到以红黑树保存的列表中。红黑树结点键是文件描述符,而值是与文件描述符相关的信息 epitem。

(2)等待队列(wq)。

和 socket 一样,它也会有等待队列,当调用 epoll_wait(epfd, ...) 时会把进程添加到 eventpoll 对象的 wq 等待队列中。

(3)就绪 socket 列表(rdllist)。

保存已经就绪的 socket 文件描述符列表。当程序执行到 epoll_wait 时,如果 rdlist 已经存在就绪的 socket,那么 epoll_wait 直接返回,如果 rdlist 为空,阻塞进程。

下图展示了 eventpoll 对象与被监听的文件关系:

由于被监听的文件是通过 epitem 对象来管理的,所以上图中的节点都是以 epitem 对象的形式存在的。为什么要使用红黑树来管理被监听的文件呢?这是为了能够通过文件描述符快速查找到其对应的 epitem 对象。

代码语言:javascript复制/*

* Each file descriptor added to the eventpoll interface will

* have an entry of this type linked to the "rbr" RB tree.

* Avoid increasing the size of this struct, there can be many thousands

* of these on a server and we do not want this to take another cache line.

*/

struct epitem {

union {

/* RB tree node links this structure to the eventpoll RB tree */

struct rb_node rbn;

/* Used to free the struct epitem */

struct rcu_head rcu;

};

/* List header used to link this structure to the eventpoll ready list */

struct list_head rdllink;

/*

* Works together "struct eventpoll"->ovflist in keeping the

* single linked chain of items.

*/

struct epitem *next;

/* The file descriptor information this item refers to */

struct epoll_filefd ffd;

/*

* Protected by file->f_lock, true for to-be-released epitem already

* removed from the "struct file" items list; together with

* eventpoll->refcount orchestrates "struct eventpoll" disposal

*/

bool dying;

/* List containing poll wait queues */

struct eppoll_entry *pwqlist;

/* The "container" of this item */

struct eventpoll *ep;

/* List header used to link this item to the "struct file" items list */

struct hlist_node fllink;

/* wakeup_source used when EPOLLWAKEUP is set */

struct wakeup_source __rcu *ws;

/* The structure that describe the interested events and the source fd */

struct epoll_event event;

};epitem 用于表示 epoll 中的每个被监视的文件描述符的信息。以下是一些重要的字段:

rbn:红黑树结点,用于将 epitem 结构体链接到 eventpoll 的红黑树中。rdllink:链接到 eventpoll 的就绪链表,以便进行快速访问。ffd:该项所引用的文件描述符信息。pwqlist:等待队列。ep:包含当前项的 eventpoll 对象指针。event:描述感兴趣的事件和源 fd。4.向 epoll 添加文件描述符前面介绍了怎么创建 epoll,接下来介绍一下怎么向 epoll 添加要监听的 socket。

通过调用 epoll_ctl() 函数可以向 epoll 添加要监听的文件,其原型如下:

代码语言:javascript复制long epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);下面说明一下各个参数的作用:

epfd: 通过调用 epoll_create() 函数返回的文件描述符。op: 要进行的操作,有 3 个选项。 EPOLL_CTL_ADD:表示要进行添加操作。EPOLL_CTL_DEL:表示要进行删除操作。EPOLL_CTL_MOD:表示要进行修改操作。fd: 要监听的文件描述符。event: 告诉内核需要监听什么事件。其定义如下:代码语言:javascript复制struct epoll_event {

__uint32_t events; /* Epoll events */

epoll_data_t data; /* User data variable */

};events 可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭)。EPOLLOUT:表示对应的文件描述符可以写。EPOLLPRI:表示对应的文件描述符有紧急的数据可读。EPOLLERR:表示对应的文件描述符发生错误。EPOLLHUP:表示对应的文件描述符被挂断。EPOLLET:将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。data 用来保存用户自定义数据。

epoll_ctl() 函数会调用 do_epoll_ctl() 内核函数,do_epoll_ctl() 的实现如下:

代码语言:javascript复制int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock)

{

...

f = fdget(epfd);

if (!f.file)

goto error_return;

/* Get the "struct file *" for the target file */

tf = fdget(fd);

if (!tf.file)

goto error_fput;

...

/*

* At this point it is safe to assume that the "private_data" contains

* our own data structure.

*/

ep = f.file->private_data;

error = epoll_mutex_lock(&ep->mtx, 0, nonblock);

if (error)

goto error_tgt_fput;

...

/*

* Try to lookup the file inside our RB tree. Since we grabbed "mtx"

* above, we can be sure to be able to use the item looked up by

* ep_find() till we release the mutex.

*/

epi = ep_find(ep, tf.file, fd);

error = -EINVAL;

switch (op) {

case EPOLL_CTL_ADD:

if (!epi) {

epds->events |= EPOLLERR | EPOLLHUP;

error = ep_insert(ep, epds, tf.file, fd, full_check);

} else

error = -EEXIST;

break;

case EPOLL_CTL_DEL:

if (epi)

error = ep_remove(ep, epi);

else

error = -ENOENT;

break;

case EPOLL_CTL_MOD:

if (epi) {

if (!(epi->event.events & EPOLLEXCLUSIVE)) {

epds->events |= EPOLLERR | EPOLLHUP;

error = ep_modify(ep, epi, epds);

}

} else

error = -ENOENT;

break;

}

mutex_unlock(&ep->mtx);

error_tgt_fput:

if (full_check) {

clear_tfile_check_list();

loop_check_gen++;

mutex_unlock(&epmutex);

}

fdput(tf);

error_fput:

fdput(f);

error_return:

return error;

}sys_epoll_ctl() 函数会根据传入不同 op 的值来进行不同操作,比如传入 EPOLL_CTL_ADD 表示要进行添加操作,那么就调用 ep_insert() 函数来进行添加操作。

我们继续来分析添加操作 ep_insert() 函数的实现:

代码语言:javascript复制static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)

{

...

error = -ENOMEM;

// 申请一个 epitem 对象

if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))

goto error_return;

// 初始化 epitem 对象

INIT_LIST_HEAD(&epi->rdllink);

INIT_LIST_HEAD(&epi->fllink);

INIT_LIST_HEAD(&epi->pwqlist);

epi->ep = ep;

ep_set_ffd(&epi->ffd, tfile, fd);

epi->event = *event;

epi->nwait = 0;

epi->next = EP_UNACTIVE_PTR;

epq.epi = epi;

// 等价于: epq.pt->qproc = ep_ptable_queue_proc

init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

// 调用被监听文件的 poll 接口.

// 这个接口由各自文件系统实现, 如 socket 的 tcp_poll().

revents = tfile->f_op->poll(tfile, &epq.pt);

...

ep_rbtree_insert(ep, epi); // 把 epitem 对象添加到epoll的红黑树中进行管理

spin_lock_irqsave(&ep->lock, flags);

// 如果被监听的文件已经可以进行对应的读写操作

// 那么就把文件添加到 epoll 的就绪队列 rdllist 中, 并且唤醒调用 epoll_wait() 的进程.

if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {

list_add_tail(&epi->rdllink, &ep->rdllist);

if (waitqueue_active(&ep->wq))

wake_up_locked(&ep->wq);

if (waitqueue_active(&ep->poll_wait))

pwake++;

}

spin_unlock_irqrestore(&ep->lock, flags);

...

return 0;

...

}被监听的文件是通过 epitem 对象进行管理的,也就是说被监听的文件会被封装成 epitem 对象,然后会被添加到 eventpoll 对象的红黑树中进行管理(如上述代码中的 ep_rbtree_insert(ep, epi))。

tfile->f_op->poll(tfile, &epq.pt) 这行代码的作用是调用被监听文件的 poll() 接口,如果被监听的文件是一个 socket 句柄,那么就会调用 tcp_poll(),我们来看看 tcp_poll() 做了什么操作:

代码语言:javascript复制unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)

{

struct sock *sk = sock->sk;

...

poll_wait(file, sk->sk_sleep, wait);

...

return mask;

}每个 socket 对象都有个等待队列用于存放等待 socket 状态更改的进程。

从上述代码可以知道,tcp_poll() 调用了 poll_wait() 函数,而 poll_wait() 最终会调用 ep_ptable_queue_proc() 函数,其实现如下:

代码语言:javascript复制static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)

{

struct epitem *epi = ep_item_from_epqueue(pt);

struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {

init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

pwq->whead = whead;

pwq->base = epi;

add_wait_queue(whead, &pwq->wait);

list_add_tail(&pwq->llink, &epi->pwqlist);

epi->nwait++;

} else {

epi->nwait = -1;

}

}ep_ptable_queue_proc() 函数主要工作是把当前 epitem 对象添加到 socket 对象的等待队列中,并且设置唤醒函数为 ep_poll_callback(),也就是说,当 socket 状态发生变化时,会触发调用 ep_poll_callback() 函数。

ep_poll_callback() 函数实现如下:

代码语言:javascript复制static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

...

// 把就绪的文件添加到就绪队列中

list_add_tail(&epi->rdllink, &ep->rdllist);

is_linked:

// 唤醒调用 epoll_wait() 而被阻塞的进程。

if (waitqueue_active(&ep->wq))

wake_up_locked(&ep->wq);

...

return 1;

}ep_poll_callback() 函数的主要工作是把就绪的文件描述符添加到 eventepoll 对象的就绪队列中,然后唤醒调用 epoll_wait() 被阻塞的进程。

5.阻塞和唤醒进程把被监听的文件描述符添加到 epoll 后,就可以通过调用 epoll_wait() 等待被监听的文件状态发生改变。epoll_wait() 调用会阻塞当前进程,当被监听的文件状态发生改变时,epoll_wait() 调用便会返回。

epoll_wait() 系统调用的原型如下:

代码语言:javascript复制long epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);参数说明:

epfd: 调用 epoll_create() 函数创建的epoll句柄。events: 用来存放就绪文件列表。maxevents: events 数组的大小。timeout: 设置等待的超时时间。关于返回值:

成功时,epoll_wait() 返回做好准备的文件描述符的数量,如果在请求的超时毫秒内没有文件描述符准备就绪,则返回零。 当发生错误时,epoll_wait()返回-1,并适当设置errno。

epoll_wait() 函数会调用 sys_epoll_wait() 内核函数,而 sys_epoll_wait() 函数最终会调用 ep_poll() 函数,我们来看看 ep_poll() 函数的实现:

代码语言:javascript复制static int ep_poll(struct eventpoll *ep,

struct epoll_event __user *events, int maxevents, long timeout)

{

...

// 如果就绪文件列表为空

if (list_empty(&ep->rdllist)) {

// 把当前进程添加到epoll的等待队列中

init_waitqueue_entry(&wait, current);

wait.flags |= WQ_FLAG_EXCLUSIVE;

__add_wait_queue(&ep->wq, &wait);

for (;;) {

set_current_state(TASK_INTERRUPTIBLE); // 把当前进程设置为睡眠状态

if (!list_empty(&ep->rdllist) || !jtimeout) // 如果有就绪文件或者超时, 退出循环

break;

if (signal_pending(current)) { // 接收到信号也要退出

res = -EINTR;

break;

}

spin_unlock_irqrestore(&ep->lock, flags);

jtimeout = schedule_timeout(jtimeout); // 让出CPU, 切换到其他进程进行执行

spin_lock_irqsave(&ep->lock, flags);

}

// 有3种情况会执行到这里:

// 1. 被监听的文件集合中有就绪的文件

// 2. 设置了超时时间并且超时了

// 3. 接收到信号

__remove_wait_queue(&ep->wq, &wait);

set_current_state(TASK_RUNNING);

}

/* 是否有就绪的文件? */

eavail = !list_empty(&ep->rdllist);

spin_unlock_irqrestore(&ep->lock, flags);

if (!res && eavail

&& !(res = ep_send_events(ep, events, maxevents)) && jtimeout)

goto retry;

return res;

}ep_poll() 函数主要做以下几件事:

判断被监听的文件集合中是否有就绪的文件,如果有就返回。如果没有就把当前进程添加到 epoll 的等待队列中,并且进入睡眠。进程会一直睡眠直到有以下几种情况发生: 被监听的文件集合中有就绪的文件设置了超时时间并且超时了接收到信号如果有就绪的文件,那么就调用 ep_send_events() 函数把就绪文件复制到 events 参数中。返回就绪文件描述符个数。6.小结下面通过文字来描述一下这个 epoll 实现 IO 多路复用的整个过程:

通过调用 epoll_create() 函数创建并初始化一个 eventpoll 对象。通过调用 epoll_ctl() 函数把被监听的文件描述符 (如 socket 文件描述符) 封装成 epitem 对象并且添加到 eventpoll 对象的红黑树中进行管理。通过调用 epoll_wait() 函数等待被监听的文件状态发生改变。当被监听的文件状态发生改变时(如 socket 接收到数据): 会把文件描述符对应 epitem 对象添加到 eventpoll 对象的就绪队列 rdllist 中。并且把就绪队列的文件列表复制到 epoll_wait() 函数的 events 参数中。操作系统将该 socket 等待队列上的进程重新放回到工作队列,该进程变成运行状态,即唤醒调用 epoll_wait() 函数被阻塞(睡眠)的进程。参考文献epoll_create(2) - Linux manual page - man7.org

linux内核Epoll 实现原理

Linux source code (v6.0) - Elixir Bootlin

如果这篇文章说不清epoll的本质,那就过来掐死我吧!


梦幻西游搜索页面
模拟山羊祭祀仪式怎么操作 祭祀仪式操作方法分享