Python 网络编程: 非阻塞模式 的实例一则


                                //----------- 服务器端 -----------//
#!/usr/bin/python
class Server(FORK, TCP):
    pass
class MyRequestHandler(SRH):
    def handle(self):
        print '...connected from :', self.client_address
        cmd = self.get_cmd()
        if not cmd :
            print '... not receive parameters.'
        else:
            self.my_send(cmd)
    def get_cmd(self):
        self.cmd = self.rfile.readline().strip()
        if self.cmd and 1 == int(self.cmd[0]):
            self.cmd = self.cmd[2:]
            self.cmd = self.cmd.split(',')
            self.cmd = 'php /u01/python/testing/spider.php '+self.cmd[0]+' ' +self.cmd[1]+' '+self.cmd[2]
            os.system(self.cmd)
            return self.cmd
        else :
            return 'finished' ;
    def my_send(self, cmd):
            if 'finished' == cmd :
                self.wfile.write('Finished at %s' % ctime())
            else :
                self.wfile.write('[%s] %s' % (ctime(), 'command executed.'))
if __name__ == '__main__' :
    import os
    from SocketServer import TCPServer as TCP, StreamRequestHandler as SRH, ForkingMixIn as FORK
    from time import ctime
    HOST = '192.168.1.182'
    PORT = 31608
    ADDR = (HOST, PORT)
    #tcpServ = TCP(ADDR, MyRequestHandler)
    tcpServ = Server(ADDR, MyRequestHandler)
    print 'waiting for connection...'
    tcpServ.serve_forever()
//----------- 客户端 -----------//
#!d:\python25/python
def get_task(db):   
    '''get spider tasks.'''
   
    if not db:
        db = OCI.connect('spidertk/n0sm0king@spider')   
    cursor = db.cursor()
    thread = []
        
    sql = '''
          select *
              from (select t7.*,
                           row_number() over(partition by t7.st_id order by dbms_random.value) rn2
                      from (select t6.*
                              from (select t5.st_id,
                                           t5.sp_id,
                                           t5.run_param,
                                           t5.sn2,
                                           t5.ss_ip,
                                           t5.create_time,
                                           row_number() over(partition by t5.sp_id, t5.sn2 order by dbms_random.value) rn1
                                      from (select t4.*
                                              from (select t1.st_id,
                                                           t1.sp_id,
                                                           t1.run_param,
                                                           t1.ss_name sn1,
                                                           t1.status,
                                                           t1.create_time,
                                                           t2.ss_name sn2,
                                                           t2.ss_ip,
                                                           t2.ss_thread
                                                      from test_task t1, test_server t2
                                                     where t2.ss_thread < 5) t4
                                             where t4.status = 0
                                               and not EXISTS
                                             (select t3.sp_id, t3.sn2
                                                      from (select t1.sp_id,
                                                                   t1.ss_name sn1,
                                                                   t2.ss_name sn2
                                                              from test_task   t1,
                                                                   test_server t2
                                                             where t2.ss_thread < 5) t3
                                                     where lower(t3.sn2) = lower(t3.sn1)
                                                       and t3.sn2 = t4.sn2
                                                       and t4.sp_id = t3.sp_id)) t5) t6               
                             where t6.rn1 = 1) t7) t8
             where t8.rn2 = 1
             order by t8.create_time
          '''
    cursor.execute(sql)   
    res = cursor.fetchall()
   
    print 'Total tasks is : %d' % cursor.rowcount        
    return res
def send_task(tasks, db):
    HOST = '192.168.1.182'
    PORT = 31608
    ADDR = (HOST, PORT)
    BUFSIZ = 1024
   
    if not db:
        db = OCI.connect('spidertk/n0sm0king@spider')
        
    cursor = db.cursor()
    cmd = '1:'
    for i in tasks:        
        p = cmd+str(i[0])+';'+str(i[1])+';'+i[2]
        
        sql = "update test_task set ss_name = '"+i[3]+"', status = 1, deliver_time = sysdate where st_id="+str(i[0])
        cursor.execute(sql)
        db.commit()
               
        HOST = str(i[2])
        
        tcpClntSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #tcpClntSock.connect((HOST, PORT))
        
        tcpClntSock.connect(ADDR)
        tcpClntSock.send('%s \r\n' % p)
        
        data = tcpClntSock.recv(BUFSIZ)
        if not data :
            break
            
        print "buffer: "+data.strip()+" from python buff"
        
        #close socket connection
        tcpClntSock.close()   
  
   
   
if __name__ == '__main__' :   
    import socket
    import cx_Oracle as OCI
        
    # create new global database connection   
    db = OCI.connect('spidertk/n0sm0king@spider')   
   
    res = get_task(db)
    send_task(res, db)
   
    #close global database connection
    db.close()