学习 Twisted 的 reactor (3)

   
    今天抽空看了  E 文版电子书,因为 E 文水平有限,看起来颇为困难,但也稍稍有点感觉吧,主要看了第二章 >,下面也贴些使用 reactor 创建 client 和一个 server 端的代码来进一步巩固一下。

"""
    这是一个 client 端的程序
    简单的连接指定的主机及其端口
"""
from twisted.internet import reactor, defer, protocol
class CallbackAndDisconnectProtocol(protocol.Protocol):
    """协议类
    """
    def connectionMade(self):
        """当连接建立成功后调用该方法
        """
        self.factory.deferred.callback("Connected!")
        self.transport.loseConnection()
class ConnectionTestFactory(protocol.ClientFactory):
    """协议工厂类(专门用于生产 “协议” 对象的:))
    而且还是一个 “客户端工厂”,为一些基本的 client 端操作提供了支持,如:
        * startedConnecting(self, connector)
            当连接开始启动后调用该方法,
            其中 connector 为连接器对象(用过 ace 的话会更熟悉一点)
   
        * clientConnectionFailed(self, connector, reason)
            当连接失败时调用该方法,同时把失败的原因作为参数传入
            其中 reason: L{twisted.python.failure.Failure}
        * clientConnectionLost(self, connector, reason)
            当连接丢失的时候调用该方法,同时把原因作为参数传入
    """
    protocol = CallbackAndDisconnectProtocol
   
    def __init__(self):
        """初始化,用于创建一个 Deferred 对象
        """
        self.deferred = defer.Deferred()
    def clientConnectionFailed(self, connector, reason):
        """当客户端连接失败后,
        调用 deferred 中所注册的 errback 中的方法,
        并将 reason 作为参数
        """
        self.deferred.errback(reason)
def testConnect (host, port):
    """接受需要连接的主机和端口号作为参数,
    生成 “协议工厂”,然后进行 TCP 连接
    并返回 deferred 对象
    """
    testFactory = ConnectionTestFactory ()
    ### 因为非阻塞连接,所以可以立即返回
    reactor.connectTCP (host, port, testFactory)
    return testFactory.deferred
def handleSuccess (result, port):
    """连接成功后的处理方法
    """
    print "Connected to port %i" % port
    reactor.stop ()
def handleFailure (failure, port):
    """连接失败后的处理方法
    """
    print "Error connecting to port %i: %s" % (port, failure.getErrorMessage())
    reactor.stop ()
if __name__ == "__main__":
    import sys
    if not len(sys.argv) == 3:
        print "Usage: connectiontest.py host port"
        sys.exit(1)
    host = sys.argv[1]
    port = int(sys.argv[2])
    # 从这里开始创建协议工厂,
    # 并进行 TCP 连接(到指定的 host 和 port)
    connecting = testConnect(host, port)
   
    # 添加 callback 方法 handleSuccess
    connecting.addCallback(handleSuccess, port)
    # 当出错时调用这里指定的 errback 方法 handleFailure
    connecting.addErrback(handleFailure, port)
    reactor.run( )
    代码中添加了一些注释,便于理解该段代码的基本的运行流程:
    * 首先调用方法 testConnect(host, port) 来创建一个协议工厂(协议主要用于交流,这里自然是 client 和 server 之间的交流),之后使用 TCP 来尝试连接指定的主机及其端口,并返回 deferred 对象。
    * 添加 callback 方法 handleSuccess
   
    * 添加 errback 方法 handleFailure
    下面对源代码进行更细的跟踪一下:
    (1) testConnect 中调用的 ConnectionTestFactory() 来构造工厂对象,里面同时创建了 Deferred 对象(Deferred 的英文是延期的意思,可以想象有点 “缓期执行” 的味道):
-- /twisted/internet/defer.py
    class Deferred:
   
        def __init__(self):
            self.callbacks = []
            if self.debug:
                self._debugInfo = DebugInfo()
                self._debugInfo.creator = traceback.format_stack()[:-1]
    这里很简单,只是创建了一个空列表 callbacks,当然这个列表里还是很有文章的了。
    (2) 同时也初始化了 CallbackAndDisconnectProtocol 协议对象,其派生之 protocol.Protocol:
    (3) 下面 reactor 开始通过 tcp 连接指定的主机及其端口:
-- /twisted/internet/posixbase.py
    class PosixReactorBase(ReactorBase):
        
        def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
            """@see: twisted.internet.interfaces.IReactorTCP.connectTCP
            """
            c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
            c.connect()
            return c
        * 这里首先创建了一个 Connector(连接器)对象:
-- /twisted/internet/tcp.py
    class Connector(base.BaseConnector):
        def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
            self.host = host
            if isinstance(port, types.StringTypes):
                try:
                    port = socket.getservbyname(port, 'tcp')
                except socket.error, e:
                    raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
            self.port = port
            self.bindAddress = bindAddress
            base.BaseConnector.__init__(self, factory, timeout, reactor)
        * Connector(连接器)对象的 __init__ 主要是对一些基本连接参数(如:host, port等等)做初始化工作,另外在这里,可以看到对于端口的处理可以传递服务名作为参数(不过是需要 tcp 协议的服务的)。
        此外,也调用了 base.BaseConnector 的 __init__ 进行初始化:
-- /twisted/internet/base.py
    class BaseConnector(styles.Ephemeral):
   
        implements(IConnector)
   
        timeoutID = None
        factoryStarted = 0
   
        def __init__(self, factory, timeout, reactor):
            self.state = "disconnected"
            self.reactor = reactor
            self.factory = factory
            self.timeout = timeout
   
        def connect(self):
            """Start connection to remote server."""
            if self.state != "disconnected":
                raise RuntimeError, "can't connect in this state"
   
            self.state = "connecting"
            if not self.factoryStarted:
                self.factory.doStart()
                self.factoryStarted = 1
            self.transport = transport = self._makeTransport()
            ... ...
            self.factory.startedConnecting(self)
        * 下面紧接着开始连接指定的主机及其端口了,也就是调用上面的 BaseConnector.connect 方法,根据 BaseConnector 的初始化工作 __init__ 可以知道此时 self.state == "disconnected",并且 self.factoryStarted == 0,所以调用:
            self.factory.doStart()
            self.factoryStarted = 1
          并且该 self.factory 其实就是 ConnectionTestFactory,所以就调用了
-- /twisted/internet/protocol.py
    class Factory:
        """This is a factory which produces protocols.
   
        By default, buildProtocol will create a protocol of the class given in
        self.protocol.
        """
   
        implements(interfaces.IProtocolFactory)
   
        # put a subclass of Protocol here:
        protocol = None
   
        numPorts = 0
        noisy = True
   
        def doStart(self):
            """Make sure startFactory is called.
   
            Users should not call this function themselves!
            """
            if not self.numPorts:
                if self.noisy:
                    log.msg("Starting factory %r" % self)
                self.startFactory()
            self.numPorts = self.numPorts + 1
          下面开始进行数据的传输:self._makeTransport()
    -- /twisted/internet/tcp.py
    class Connector(base.BaseConnector):
        def _makeTransport(self):
            return Client(self.host, self.port, self.bindAddress, self, self.reactor)
    -- /twisted/internet/tcp.py
    class Client(BaseClient):
        """A TCP client."""
   
        def __init__(self, host, port, bindAddress, connector, reactor=None):
            # BaseClient.__init__ is invoked later
            # 即 Connector(连接器)对象
            self.connector = connector
            self.addr = (host, port)
            # 对地址做进一步的解析,可以参见:
            # BaseClient.resolveAddress()
            whenDone = self.resolveAddress
            err = None
            skt = None
   
            # 以非阻塞模式创建 socket,可以参见:
            # BaseClient.createInternetSocket
            try:
                skt = self.createInternetSocket()
            except socket.error, se:
                err = error.ConnectBindError(se[0], se[1])
                whenDone = None
            # 绑定地址
            if whenDone and bindAddress is not None:
                try:
                    skt.bind(bindAddress)
                except socket.error, se:
                    err = error.ConnectBindError(se[0], se[1])
                    whenDone = None
            self._finishInit(whenDone, skt, err, reactor)
    (4) 下面在 Deferred 对象上注册需要 callback 和 errback 的方法了:
-- /twisted/internet/defer.py
    def addCallbacks(self, callback, errback=None,
                     callbackArgs=None, callbackKeywords=None,
                     errbackArgs=None, errbackKeywords=None):
        """Add a pair of callbacks (success and error) to this Deferred.
        These will be executed when the 'master' callback is run.
        """
        assert callable(callback)
        assert errback == None or callable(errback)
        cbs = ((callback, callbackArgs, callbackKeywords),
               (errback or (passthru), errbackArgs, errbackKeywords))
        self.callbacks.append(cbs)
        if self.called:
            self._runCallbacks()
        return self
      可见这里保存了需要调用的 callback, errback 方法的名称,参数列表等信息,之后调用时,有如下顺序:
-- /twisted/internet/defer.py
    def callback(self, result):
        """Run all success callbacks that have been added to this Deferred.
        Each callback will have its result passed as the first
        argument to the next; this way, the callbacks act as a
        'processing chain'. Also, if the success-callback returns a Failure
        or raises an Exception, processing will continue on the *error*-
        callback chain.
        """
        assert not isinstance(result, Deferred)
        self._startRunCallbacks(result) ###
    def _startRunCallbacks(self, result):
        ... ...
        self._runCallbacks() ###
    def _runCallbacks(self):
        """循环遍历 callback list 并进行执行
        """
        if not self.paused:
            cb = self.callbacks
            self.callbacks = []
            while cb:
                item = cb.pop(0)
                callback, args, kw = item[
                    isinstance(self.result, failure.Failure)]
                args = args or ()
                kw = kw or {}
                try:
                    ### 这一行就是调用的 callback 方法
                    self.result = callback(self.result, *args, **kw)
                    ### 从下面两行可以知道一个 callback 方法的返回结果
                    ### 也可以是一个 Deferred 对象
                    ### 并且也对该返回的 Deferred 对象进行 call
                    if isinstance(self.result, Deferred):
                        self.callbacks = cb
                        # note: this will cause _runCallbacks to be called
                        # "recursively" sometimes... this shouldn't cause any
                        # problems, since all the state has been set back to
                        # the way it's supposed to be, but it is useful to know
                        # in case something goes wrong. deferreds really ought
                        # not to return themselves from their callbacks.
                        self.pause()
                        self.result.addBoth(self._continue)
                        break
    (5) 这里面可能还有一个疑点的,就是:
        * 为什么会自动 “当连接建立成功后调用该方法” 的呢?
        * 为什么 CallbackAndDisconnectProtocol 中 self.factory 指的就是 ConnectionTestFactory 呢?
-- /twisted/internet/tcp.py
   
    class BaseClient(Connection):
        def resolveAddress(self):
            if abstract.isIPAddress(self.addr[0]):
                self._setRealAddress(self.addr[0])
                        |
                        V
        def _setRealAddress(self, address):
            self.realAddress = (address, self.addr[1])
            self.doConnect()
        def doConnect(self):
            ... ...
            
            err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            ... ...
            
            # doConnect gets called twice. The first time we actually need to
            # start the connection attempt. The second time we don
        ### 1   该处调用了连接器的构建协议的方法,该方法同时调用的是 self.factory.buildProtocol(addr),而这里的 factory 其实就是我们的 ConnectionTestFactory,从该工厂的 buildProtocol 的方法中我们就可以找到上面第二个问题的答案了。
-- /twisted/internet/protocol.py
    def Factory.buildProtocol(self, addr):
        """Create an instance of a subclass of Protocol.
        The returned instance will handle input on an incoming server
        connection, and an attribute \"factory\" pointing to the creating
        factory.
        Override this method to alter how Protocol instances get created.
        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
        """
        p = self.protocol()
        p.factory = self ### 将该工厂告诉创建的 protocol,我就是创建你的工厂
        return p
        ### 2   协议创建好了,这样可以使用该协议来创建连接了,
-- /twisted/internet/protocol.py
    def Protocal.makeConnection(self, transport):
        self.connected = 1
        self.transport = transport
        self.connectionMade() ### 这里 callback connectionMade 了