Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libuv源码-Event Loop #11

Open
tsy77 opened this issue Aug 2, 2018 · 0 comments
Open

libuv源码-Event Loop #11

tsy77 opened this issue Aug 2, 2018 · 0 comments

Comments

@tsy77
Copy link
Owner

tsy77 commented Aug 2, 2018

本文将主要介绍libuv的事件循环,包括了事件循环的流程,而我们也知道libuv是使用poll机制来实现网络I/O,通过线程池来实现文件I/O,当然线程间也是通过poll机制来实现通信的,后面就将介绍线程池与事件循环是如何结合的。

event loop流程

事件循环的流程大致如下图所示:

代码如下所示:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  // 有活跃的handle或req
  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    // run pending queue
    ran_pending = uv__run_pending(loop);
    // UV_LOOP_WATCHER_DEFINE,执行队列
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      // 检查下还有没有active handle,返回下次timer发生剩余时间
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

时间循环可以分为以下几个步骤:

1.缓存当前时间
2.执行定时器队列(最小堆)中的callback
3.执行上一轮循环pending的I/O callback
4.执行idle队列中的callback
5.执行prepare队列中的callback
6.计算离下一个timer到来的时间间隔 poll timeout
7.阻塞处理poll I/O, 超时时间为上一步计算的timeout
8.执行check队列中的callback
9.执行close队列中的callback

时间循环结束的条件有如下几种:

1.loop不是alive,也就是说没有活跃的handle或req
2.mode模式为UV_RUN_ONCE或UV_RUN_NOWAIT

下面挑选重要的几点进行讲解:

判断loop是不是alive

决定loop是否是alive取决于是否有活跃的handle或者req,或者被直接stop掉,代码如下:

static int uv__loop_alive(const uv_loop_t* loop) {
  return uv__has_active_handles(loop) ||
         uv__has_active_reqs(loop) ||
         loop->closing_handles != NULL;
}

uv__run_timers

uv__run_timers代码如下:

void uv__run_timers(uv_loop_t* loop) {
  struct heap_node* heap_node;
  uv_timer_t* handle;

  for (;;) {
    // 从timer堆中找出节点
    heap_node = heap_min((struct heap*) &loop->timer_heap);
    if (heap_node == NULL)
      break;

    // 通过heap_node找到结构体起始为止,从而找到handle
    handle = container_of(heap_node, uv_timer_t, heap_node);
    // 还没到时间
    if (handle->timeout > loop->time)
      break;

    // uv__active_handle_rm
    uv_timer_stop(handle);
    uv_timer_again(handle);
    handle->timer_cb(handle);
  }
}

我们注意到,存储timer节点的数据结构是一个以handle->timeout为基准的最小堆,函数循环过程中主要做了如下几件事:

1.从最小堆中取出当前timeout最小的节点,也就是说最先执行的阶段
2.如果最小的节点还没到时间去执行,break退出
3.如果到了该执行的时间,调用heap_remove从堆中删除节点,调用uv__active_handle_rm将loop->active_handles减1

uv__run_pending

uv__run_pending主要是将loop->pending_queue中的callback取出执行,代码如下:

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  QUEUE_MOVE(&loop->pending_queue, &pq);

  while (!QUEUE_EMPTY(&pq)) {
    q = QUEUE_HEAD(&pq);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}

后面的uv__run_idle和uv__run_prepare与之类似。

poll I/O

poll I/O是事件循环的重点,它基于IO多路复用的机制,所有网络操作都使用 non-blocking 套接字,并使用各个平台上性能最好的 poll 机制例如 linux 上的 epoll,OSX 的 kqueue 等等;而所有文件I/O基于线程池实现,但线程间通信同样基于相应的poll机制。

下面的uv__io_poll是基于linux伤的epoll来实现,其他平台的实现也类似,具体代码如下:

void uv__io_poll(uv_loop_t* loop, int timeout) {
  /* A bug in kernels < 2.6.37 makes timeouts larger than ~30 minutes
   * effectively infinite on 32 bits architectures.  To avoid blocking
   * indefinitely, we cap the timeout and poll again if necessary.
   *
   * Note that "30 minutes" is a simplification because it depends on
   * the value of CONFIG_HZ.  The magic constant assumes CONFIG_HZ=1200,
   * that being the largest value I have seen in the wild (and only once.)
   */
  static const int max_safe_timeout = 1789569;
  static int no_epoll_pwait;
  static int no_epoll_wait;
  struct uv__epoll_event events[1024];
  struct uv__epoll_event* pe;
  struct uv__epoll_event e;
  int real_timeout;
  QUEUE* q;
  uv__io_t* w;
  sigset_t sigset;
  uint64_t sigmask;
  uint64_t base;
  int have_signals;
  int nevents;
  int count;
  int nfds;
  int fd;
  int op;
  int i;

  // loop->watchers[w->fd] = w in uv__io_start func
  if (loop->nfds == 0) {
    assert(QUEUE_EMPTY(&loop->watcher_queue));
    return;
  }

  // 取出观察者队列中的fd, 调用uv__epoll_ctl监听
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    q = QUEUE_HEAD(&loop->watcher_queue);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
	
	 // QUEUE_DATA类似container
    w = QUEUE_DATA(q, uv__io_t, watcher_queue);
    assert(w->pevents != 0);
    assert(w->fd >= 0);
    assert(w->fd < (int) loop->nwatchers);

    e.events = w->pevents;
    e.data = w->fd;

    if (w->events == 0)
      op = UV__EPOLL_CTL_ADD;
    else
      op = UV__EPOLL_CTL_MOD;

    /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
     * events, skip the syscall and squelch the events after epoll_wait().
     */
    // fd = uv__epoll_create1(UV__EPOLL_CLOEXEC); loop->backend_fd = fd;
    if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
      if (errno != EEXIST)
        abort();

      assert(op == UV__EPOLL_CTL_ADD);

      /* We've reactivated a file descriptor that's been watched before. */
      if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e))
        abort();
    }

    w->events = w->pevents;
  }

  sigmask = 0;
  if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGPROF);
    sigmask |= 1 << (SIGPROF - 1);
  }

  assert(timeout >= -1);
  base = loop->time;
  count = 48; /* Benchmarks suggest this gives the best throughput. */
  real_timeout = timeout;

  for (;;) {
    /* See the comment for max_safe_timeout for an explanation of why
     * this is necessary.  Executive summary: kernel bug workaround.
     */
    if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
      timeout = max_safe_timeout;

    if (sigmask != 0 && no_epoll_pwait != 0)
      if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        abort();

    if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
      // 返回需要处理的事件数目
      nfds = uv__epoll_pwait(loop->backend_fd,
                             events,
                             ARRAY_SIZE(events),
                             timeout,
                             sigmask);
      if (nfds == -1 && errno == ENOSYS)
        no_epoll_pwait = 1;
    } else {
      nfds = uv__epoll_wait(loop->backend_fd,
                            events,
                            ARRAY_SIZE(events),
                            timeout);
      if (nfds == -1 && errno == ENOSYS)
        no_epoll_wait = 1;
    }

    if (sigmask != 0 && no_epoll_pwait != 0)
      if (pthread_sigmask(SIG_UNBLOCK, &sigset, NULL))
        abort();

    /* Update loop->time unconditionally. It's tempting to skip the update when
     * timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
     * operating system didn't reschedule our process while in the syscall.
     */
    SAVE_ERRNO(uv__update_time(loop));

    if (nfds == 0) {
      assert(timeout != -1);

      if (timeout == 0)
        return;

      /* We may have been inside the system call for longer than |timeout|
       * milliseconds so we need to update the timestamp to avoid drift.
       */
      // 没有需要处理的事件
      goto update_timeout;
    }

    if (nfds == -1) {
      if (errno == ENOSYS) {
        /* epoll_wait() or epoll_pwait() failed, try the other system call. */
        assert(no_epoll_wait == 0 || no_epoll_pwait == 0);
        continue;
      }

      if (errno != EINTR)
        abort();

      if (timeout == -1)
        continue;

      if (timeout == 0)
        return;

      /* Interrupted by a signal. Update timeout and poll again. */
      goto update_timeout;
    }

    have_signals = 0;
    nevents = 0;

    assert(loop->watchers != NULL);
    loop->watchers[loop->nwatchers] = (void*) events;
    loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
    for (i = 0; i < nfds; i++) {
      pe = events + i;
      // (*pe).data
      fd = pe->data;

      /* Skip invalidated events, see uv__platform_invalidate_fd */
      if (fd == -1)
        continue;

      assert(fd >= 0);
      assert((unsigned) fd < loop->nwatchers);

      w = loop->watchers[fd];

      if (w == NULL) {
        /* File descriptor that we've stopped watching, disarm it.
         *
         * Ignore all errors because we may be racing with another thread
         * when the file descriptor is closed.
         */
        // 从红黑树中删除fd
        uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe);
        continue;
      }

      /* Give users only events they're interested in. Prevents spurious
       * callbacks when previous callback invocation in this loop has stopped
       * the current watcher. Also, filters out events that users has not
       * requested us to watch.
       */
      pe->events &= w->pevents | POLLERR | POLLHUP;

      /* Work around an epoll quirk where it sometimes reports just the
       * EPOLLERR or EPOLLHUP event.  In order to force the event loop to
       * move forward, we merge in the read/write events that the watcher
       * is interested in; uv__read() and uv__write() will then deal with
       * the error or hangup in the usual fashion.
       *
       * Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the user
       * reads the available data, calls uv_read_stop(), then sometime later
       * calls uv_read_start() again.  By then, libuv has forgotten about the
       * hangup and the kernel won't report EPOLLIN again because there's
       * nothing left to read.  If anything, libuv is to blame here.  The
       * current hack is just a quick bandaid; to properly fix it, libuv
       * needs to remember the error/hangup event.  We should get that for
       * free when we switch over to edge-triggered I/O.
       */
      if (pe->events == POLLERR || pe->events == POLLHUP)
        pe->events |= w->pevents & (POLLIN | POLLOUT | UV__POLLPRI);

      if (pe->events != 0) {
        /* Run signal watchers last.  This also affects child process watchers
         * because those are implemented in terms of signal watchers.
         */
        if (w == &loop->signal_io_watcher)
          have_signals = 1;
        else
          // uv__async_io, uv__async_start中的uv__io_init注册
          w->cb(loop, w, pe->events);

        nevents++;
      }
    }

    if (have_signals != 0)
      loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);

    loop->watchers[loop->nwatchers] = NULL;
    loop->watchers[loop->nwatchers + 1] = NULL;

    if (have_signals != 0)
      return;  /* Event loop should cycle now so don't poll again. */

    if (nevents != 0) {
      if (nfds == ARRAY_SIZE(events) && --count != 0) {
        /* Poll for more events but don't block this time. */
        timeout = 0;
        continue;
      }
      return;
    }

    if (timeout == 0)
      return;

    if (timeout == -1)
      continue;

update_timeout:
    assert(timeout > 0);

    real_timeout -= (loop->time - base);
    if (real_timeout <= 0)
      return;

    timeout = real_timeout;
  }
}

这里主要做了如下几件事:

1.取出loop->watcher_queue中所有对象的uv__io_t handle(w),调用调用uv__epoll_ctl来监听w.fd
2.循环阻塞调用uv__epoll_pwait,其返回当时需要处理的事件数目
3.如果当前没有要处理的事件,检查是否超时
4.如果有需要处理的事件,那么从loop->watchers根据相应的fd取出uv__io_t handle w,调用w.cb()执行其对应的回调

这里需要注意的有以下几点:

loop->backend_fd

uv__epoll_ctl(loop->backend_fd, op, w->fd, &e),了解epoll的同学都会知道这里loop->backend_fd在内核高速缓冲区,用来表示当前这个epoll在所在红黑树的起点。

其在uv__platform_loop_init中被赋值,代码如下:

fd = uv__epoll_create1(UV__EPOLL_CLOEXEC);

loop->watchers

epoll通过调用uv__epoll_pwait来获取需要处理事件的数据,参数events用来从内核得到事件的集合,这也是epoll的优势之一(共享内存的方式)。我们从events中取出相应的fd,然后根据fd从loop->watchers中取出handle并执行起callback,那么loop->watchers是如何初始化的呢?

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
  assert(0 != events);
  assert(w->fd >= 0);
  assert(w->fd < INT_MAX);

  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  if (w->events == w->pevents)
    return;
#endif

  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

其在uv__io_start中被初始化,loop->watchers是一个数组类型,其index用来表示uv__io_t handle中的fd,这样我们根据fd可以轻松的找出其uv__io_t handle。

uv__io_start在多处被用到,包括uv__async_start中调用uv__io_start来监听线程间通信用到的fd,还有在tcp、udp模块中都有用其监听fd。

我们可以看出,IO事件都会调用 uv__io_start 函数,该函数将需要监听的事件保存到 event loop的watcher_queue队列中

超时

我们发现uv__io_poll其实是阻塞的,为了解决阻塞的问题,在调用的时候加入了timeout参数,timeout参数表示距离下一个timer需要执行(超过了timer的timeout)的时间,当没有要处理的事件时,会根据进入uv__io_poll时的事件来计算是否需要break。update_timeout的代码如下:

assert(timeout > 0);

real_timeout -= (loop->time - base);
if (real_timeout <= 0)
  return;

timeout = real_timeout;

线程池实现文件异步I/O

Libuv的文件I/O是基于线程池来实现的,大致原理是主线程提交任务到任务队列,发送信号给线程池,线程池中的worker收到信号,从任务队列中取出任务并执行,工作线程执行完任务后,将任务对应uv_async_t handle的pending状态置0,通过fd通知主线程(该 fd 同样由epoll管理),主线程监听该fd,当有epoll事件时,执行非pending的uv_async_t handle对应的回调,然后根据层层回调,最终会调用到用户注册的回调函数

说到线程池,几乎所有线程池的实现都遵循如下模型,也就是任务队列+线程池的模型,libuv的实现也是基于此。

libuv中任务队列基于一个双向链表,其中的任务的struct声明如下:

struct uv__work {
  void (*work)(struct uv__work *w);
  void (*done)(struct uv__work *w, int status);
  struct uv_loop_s* loop;
  void* wq[2];
};

我们可以看到,其中work代表线程池实际要做的工作,done代表任务执行后的callback,wq数组为两个指针,分别指向任务队列中的前后节点。

下面我们首先看一下主线程如何提交任务到任务队列:

首先在fs.c中有这样一段逻辑,其中所有的文件操作都会调用POST,代码如下:

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      // 回调为 null 是同步调用                                                  \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)

// 操作完成后的回调函数
static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  if (status == -ECANCELED) {
    assert(req->result == 0);
    req->result = -ECANCELED;
  }

  req->cb(req);  // 调用用户注册的回调
}

POST宏中调用了uv__work_submit将任务提交到队列,下面我们看下uv__work_submit的代码:

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq);
}

这里主要做了两件事:

1.初始化线程池,这里利用了&once,来保证只执行一次,在这里我们也可以看出,libuv中的线程池是在第一次使用时被初始化
2.post提交

uv__work_submit这块涉及的逻辑如下:

static void init_once(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  // UV_THREADPOOL_SIZE决定线程池中线程的数量
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }

  if (uv_cond_init(&cond))
    abort();

  if (uv_mutex_init(&mutex))
    abort();

  QUEUE_INIT(&wq);

  if (uv_sem_init(&sem, 0))
    abort();

  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();

  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

/* To avoid deadlock with uv_cancel() it's crucial that the worker
 * never holds the global mutex and the loop-local mutex at the same time.
 */
static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;

  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;

  for (;;) {
    uv_mutex_lock(&mutex);

    while (QUEUE_EMPTY(&wq)) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }

    q = QUEUE_HEAD(&wq);

    if (q == &exit_message)
      uv_cond_signal(&cond);
    else {
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
                             executing. */
    }

    uv_mutex_unlock(&mutex);

    if (q == &exit_message)
      break;

    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
  }
}


static void post(QUEUE* q) {
  uv_mutex_lock(&mutex);
  QUEUE_INSERT_TAIL(&wq, q);
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

这里需要关注的有以下几点:

1.init_once关键代码其实就是获取线程池中线程的数量并创建对应数量的线程,每个线程中执行worker函数,
2.线程池中线程数量从UV_THREADPOOL_SIZE环境变量中获取,默认是4
3.在worker中,工作线程等待cond信号,如果有,则取任务队列中的任务来执行,执行后调用uv_async_send通知主线程,后面会详细介绍uv\_async\_send
4.post方法用来将wq插入到任务队列,并发出信号

我们再来看下工作线程执行完任务后是如何通知主线程的,也就是上述的uv_async_send方法:

int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;

  if (cmpxchgi(&handle->pending, 0, 1) == 0)
    uv__async_send(&handle->loop->async_watcher);

  return 0;
}

void uv__async_send(struct uv__async* wa) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = wa->wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = wa->io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

这里主要做了如下几件事:

1.将uv_async_t handle(也就是&w->loop->wq_async)的pending状态码置0,代表执行完毕
2.调用uv__async_send方法,向handle->loop->async_watcher->io_watcher.fd写入一个空字节(主线程epoll会监听到)

当主线程监听到async_watcher->io_watcher.fd的变化后,通过层层回调,最终调用uv__work的done函数,也就是用户注册的回调。这部分我们首先从前向后看下回调的注册:

// async.c
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  // 加入到async_handles上
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

// async.c
// 将loop->async_io_watcher.fd加入loop->watcher_queue监听
static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;

  if (loop->async_io_watcher.fd != -1)
    return 0;

  err = uv__async_eventfd();
  if (err >= 0) {
    pipefd[0] = err;
    pipefd[1] = -1;
  }
  else if (err == UV_ENOSYS) {
    err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
    /* Save a file descriptor by opening one of the pipe descriptors as
     * read/write through the procfs.  That file descriptor can then
     * function as both ends of the pipe.
     */
    if (err == 0) {
      char buf[32];
      int fd;

      snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
      fd = uv__open_cloexec(buf, O_RDWR);
      if (fd >= 0) {
        uv__close(pipefd[0]);
        uv__close(pipefd[1]);
        pipefd[0] = fd;
        pipefd[1] = fd;
      }
    }
#endif
  }

  if (err < 0)
    return err;

  // 注册 async io 事件的 callback 为 uv__async_io
  // loop->async_io_watcher注册fd等
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  // 将该 io_watcher 添加到 loop->watcher_queue, epoll会取出
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

// core.c
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
  assert(cb != NULL);
  assert(fd >= -1);
  QUEUE_INIT(&w->pending_queue);
  QUEUE_INIT(&w->watcher_queue);
  w->cb = cb;
  w->fd = fd;
  w->events = 0;
  w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
  w->rcount = 0;
  w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}

// core.c
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
  assert(0 != events);
  assert(w->fd >= 0);
  assert(w->fd < INT_MAX);

  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  if (w->events == w->pevents)
    return;
#endif

  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

这块按照执行顺序做了如下几件事:

1.uv_loop_init中调用uv_async_init初始化loop->async_io_watcher.fd, 同时将loop->async_io_watcher加入到loop->async_handles中
2.uv__async_start调用uv__io_init和uv__io_start
3.uv__io_init注册 async io 事件的 callback 为 uv__async_io,并在loop->async_io_watcher上注册fd
4.uv__io_start将loop->async_io_watcher.fd加入loop->watcher_queue供epoll监听,同时在loop->watchers中通过fd注册loop->async_io_watcher

现在我们来梳理下当主线程接收到事件后,如何层层回调,最终执行uv__work的done即用户提交的回调函数。

在uv__io_poll方法中,通过uv__epoll_pwait监听到时间后,会执行loop->watchers取出uv__io_start中注册的uv__io_t(也就是上面注册的loop->async_io_watcher),然后执行其注册的回调(uv__async_io)。

uv__async_io代码如下:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  // 将在uv__async_send()中向fd中写入的数据取干净
  for (;;) {
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  // 执行loop->async_handles里的回调函数
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    // h->pending == 0
    if (cmpxchgi(&h->pending, 1, 0) == 0)
      continue;

    if (h->async_cb == NULL)
      continue;

    h->async_cb(h);
  }
}

这里主要做了两件事:

1.将在uv__async_send()中向fd中写入的数据取干净
2.执行loop->async_handles中,pending状态码为0的handle的回调函数(async_cb),其async_cb就是我们再uv_loop_init中调用uv_async_init注册的uv__work_done方法,在其中最终调用了用户注册的回调。

总结

由于Node.js异步I/O依赖libuv,libuv的核心又是event loop,本文主要介绍了event loop的流程以及线程池的实现。

@tsy77 tsy77 changed the title libuv源码-event loop libuv源码-Event Loop Aug 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant