学习 Twisted 的 reactor (2)

    可能是自己上一次到最后太热,只想着吃冰棍了,以为 reactor 初始化结束了,其实,还有戏,很精彩

... ...

-- /twisted/internet/posixbase.py
    from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram
    from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary
    from twisted.internet.interfaces import IReactorProcess, IReactorMulticast
    from twisted.internet.interfaces import IHalfCloseableDescriptor
    from twisted.internet import error
    from twisted.internet import tcp, udp
   
    from twisted.python import log, threadable, failure, util
    from twisted.persisted import styles
    from twisted.python.runtime import platformType, platform
   
    from twisted.internet.base import ReactorBase
   
    try:
        from twisted.internet import ssl
        sslEnabled = True
    except ImportError:
        sslEnabled = False
   
    try:
        from twisted.internet import unix
        unixEnabled = True
    except ImportError:
        unixEnabled = False
   
    processEnabled = False
    if platformType == 'posix':
        from twisted.internet import fdesc
        import process
        processEnabled = True
   
    if platform.isWindows():
        try:
            import win32process
            processEnabled = True
        except ImportError:
            win32process = None
    这里不仅倒入了一些很流行的模块 ssl, unxi,而且还根据操作系统平台的不同来导入适应当前操作平台的 module(win32process, fdesc) 了,如进程,多线程等
-- /twisted/internet/posixbase.py
    if platformType == 'posix':
        _Waker = _UnixWaker
    elif platformType == 'win32':
        _Waker = _Win32Waker
    这里仍然根据不同平台,来创建不同的 waker(唤醒者) -- _UnixWaker, _Win32Waker:
-- /twisted/internet/posixbase.py
    class _Win32Waker(log.Logger, styles.Ephemeral):
        """I am a workaround for the lack of pipes on win32.
   
        I am a pair of connected sockets which can wake up the main loop
        from another thread.
        """
        disconnected = 0
   
        def __init__(self, reactor):
            """Initialize.
            """
            self.reactor = reactor
            # Following select_trigger (from asyncore)'s example;
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.setsockopt(socket.IPPROTO_TCP, 1, 1)
            server.bind(('127.0.0.1
    对于 win32 平台总是要 “特别” 对待,初始化动作也较多了,创建了一个 server(SOCK_STREAM) 和一个 client (SOCK_STREAM),同时让该 client 连接 server,然后接受连接后的 read socket object 和 client 都设置为非阻塞模式,然后再设置 reader 为读入端,client 为写入端,并且使用了同一个文件描述符。说白了,就是使得他们本地读取 “文件”。
    而对于 unxi 平台则要简洁的多了
-- /twisted/internet/posixbase.py
    class _UnixWaker(log.Logger, styles.Ephemeral):
        """This class provides a simple interface to wake up the event loop.
   
        This is used by threads or signals to wake up the event loop.
        """
        disconnected = 0
   
        i = None
        o = None
   
        def __init__(self, reactor):
            """Initialize.
            """
            self.reactor = reactor
            self.i, self.o = os.pipe()
            fdesc.setNonBlocking(self.i)
            fdesc.setNonBlocking(self.o)
            self.fileno = lambda: self.i
    在 unix 平台下,通过使用创建的管道进行非阻塞的通信显然比本地的 socket 通信性能要好多了。
-- /twisted/internet/base.py
    if platform.supportsThreads():
        classImplements(ReactorBase, IReactorThreads)
    如果操作系统平台支持多线程,则使用多线程来实现 ReactorBase,使得尽可能的提升性能。
    在 “学习 Twisted 的 reactor (1)” 中分析 ReactorBase.__init__(...) 时,还有最后两句代码没有分析呢,现在可以继续了

-- /twisted/internet/base.py
    def __init__(self):
        self.threadCallQueue = []
        self._eventTriggers = {}
        self._pendingTimedCalls = []
        self._newTimedCalls = []
        self._cancellations = 0
        self.running = 0
        self.waker = None
        self.addSystemEventTrigger('during', 'shutdown', self.crash)
        self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
        if platform.supportsThreads():
            self._initThreads() // **
    ** 上一次这里还没有深入看下去:
    -- /twisted/internet/base.py
    # IReactorThreads
    if platform.supportsThreads():
        threadpool = None
        def _initThreads(self):
            self.usingThreads = 1
            self.resolver = ThreadedResolver(self) // (1)
            self.installWaker() // (2)
    * 首先置使用多线程标志
    * (1) 找到 ThreadedResolver() -- 线程解析器,该类中的 getHostByName(self, name, timeout = (1, 3, 11, 45)) 方法用于解析域名到 IP 地址的
-- /twisted/internet/base.py
    class ThreadedResolver:
        implements(IResolverSimple)
   
        def __init__(self, reactor):
            self.reactor = reactor
            self._runningQueries = {}
   
    这里也很简单,只是做一些简单的初始化工作:
        * 设置已经做过初始化的 reactor
        * 初始化 “运行时的查询”
    * (2) 找到 installWaker()
-- /twisted/internet/base.py
    def installWaker(self):
        """Install a `waker' to allow threads and signals to wake up the IO thread.
        We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake
        the reactor. On Windows we use a pair of sockets.
        """
        if not self.waker:
            self.waker = _Waker(self)
            self.addReader(self.waker)
    创建唤醒者(waker),通过 _Waker(self),我们上面已经看到根据不同的平台创建不同的 waker 代码了,如果为 unix 平台,则为 _UnixWaker; 如果为 win32 平台,则为 _Win32Waker。对于这种两种类型的初始化动作我们在上面也已经分析了,下面再来看一下 addReader(self.waker) 吧,比较简单了:
-- /twisted/internet/selectreactor.py
    def addReader(self, reader):
        """Add a FileDescriptor for notification of data available to read.
        """
        reads[reader] = 1
   
    歇一会 ... ...