ioloop分析

咱们先来简单说一下ioloop的源码
while True:
    poll_timeout = 3600.0

    # Prevent IO event starvation by delaying new callbacks
    # to the next iteration of the event loop.
    with self._callback_lock:
        #上次循环的回调列表
        callbacks = self._callbacks
        self._callbacks = []
    for callback in callbacks:
        #执行遗留回调
        self._run_callback(callback)

    if self._timeouts:
        now = self.time()
        while self._timeouts:
            #超时回调
            if self._timeouts[0].callback is None:
                # 最小堆维护超时事件
                heapq.heappop(self._timeouts)
            elif self._timeouts[0].deadline <= now:
                timeout = heapq.heappop(self._timeouts)
                self._run_callback(timeout.callback)
            else:
                seconds = self._timeouts[0].deadline - now
                poll_timeout = min(seconds, poll_timeout)
                break

    if self._callbacks:
        # If any callbacks or timeouts called add_callback,
        # we don't want to wait in poll() before we run them.
        poll_timeout = 0.0

    if not self._running:
        break

    if self._blocking_signal_threshold is not None:
        # clear alarm so it doesn't fire while poll is waiting for
        # events.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

    try:
        #这里的poll就是epoll,当有事件发生,就会返回,详情参照tornado里e#poll的代码
        event_pairs = self._impl.poll(poll_timeout)
    except Exception as e:
        if (getattr(e, 'errno', None) == errno.EINTR or
            (isinstance(getattr(e, 'args', None), tuple) and
             len(e.args) == 2 and e.args[0] == errno.EINTR)):
            continue
        else:
            raise

    if self._blocking_signal_threshold is not None:
        signal.setitimer(signal.ITIMER_REAL,
                         self._blocking_signal_threshold, 0)

    #如果有事件发生,添加事件,
    self._events.update(event_pairs)
    while self._events:
        fd, events = self._events.popitem()
        try:
            #根据fd找到对应的回调函数,
            self._handlers[fd](fd, events)
        except (OSError, IOError) as e:
            if e.args[0] == errno.EPIPE:
                # Happens when the client closes the connection
                pass
            else:
                app_log.error("Exception in I/O handler for fd %s",
                              fd, exc_info=True)
        except Exception:
            app_log.error("Exception in I/O handler for fd %s",
                          fd, exc_info=True)
简单来说一下流程,首先执行上次循环的回调列表,然后调用epoll等待事件的发生,根据fd取出对应的回调函数,然后执行。IOLoop基本是个事件循环,因此它总是被其它模块所调用。
咱们来看一下ioloop的start方法,start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

1.超时的处理,是用一个最小堆来维护每个回调函数的超时时间,如果超时,取出对应的回调,如果没有则重新设置poll_timeout的值

2.通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs,这个上面也说过了。

results matching ""

    No results matching ""