POE学习笔记

POE学习笔记

在程序需要同时处理大量连接的情况下,比如服务器程序、spider程序等,一般可以采用多进程、多线程和非阻塞IO三种方式。我自己编程只喜欢用非阻塞IO。在C下面有libevent库可以用,相比较POE是一款高端产品,刚开始有一点摸不着边际,熟悉之后感觉还是很贴心的。
    POE主要分以下几个组件Kernel、Session、Wheel、Filter、Driver,还有更高级的Component组件,不过基本上是前面几种组件的组合。Kernel是POE核心,内部实现了IO读写信号回调等处理,简单应用程序与Kernel交互并不多。Session是一个处理线程,比如一个服务器程序每一个客户端连接就应该对应于一个Session,同理Spider程序中对于每一个web服务器的连接也应该对应于一个Session,一个应用程序可以有很多Session。Wheel、Filter、Driver是对底层IO的封装。
    来看一个简单的客户端的例子:

[Copy to clipboard] [ - ]
CODE:
use warnings;
use strict;

use POE;
use POE::Wheel::SocketFactory;
use POE::Wheel::ReadWrite;

POE::Session->create
        ( inline_states =>
                { _start => \&start,
                  connected => \&connected,
                  flushed => \&flushed,
                }
        );

POE::Kernel->run;       
       
sub start {
        print "_start\n";
        my $wheel = POE::Wheel::SocketFactory->new
                ( RemoteAddress => 'localhost',
                  RemotePort => 8000,
                  SuccessEvent => "connected",
                  FailureEvent => "_stop" ,
                );
        $_[HEAP]->{wheel} = $wheel;
}

sub connected {
        print "connected\n";
        my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
        my $wheel = POE::Wheel::ReadWrite->new
                ( Handle => $socket,
               FlushedEvent => 'flushed',
                );
        $heap->{wheel} = $wheel;
        $wheel->put("hello server");
}

sub flushed {
        print "flushed\n";
        delete $_[HEAP]->{wheel};
}

打开两个终端,在其中一个输入nc -l -p 8000,另一个终端中执行上面的程序,可以看到在nc终端中输出了hello server,在客户端终端中输出了_started、connected、flushed三行信息。
        这个程序基本上可以分为三部分
        1) POE::Session->create,创建一个Session。
        2) POE::Kernel->run, 启动框架消息分发。
        3) sub start ... 定义各种状态的回调函数。
在创建一个Session时,最重要的参数是inline_states,指定各种状态的回调函数。其中以_字符开始的状态是POE系统定义状态,其他是应用程序自定义状态。POE启动后就会自动将系统中每一个Session的状态设为_start,因此_start对应的处理函数就会被调用,因此下面的代码:

[Copy to clipboard] [ - ]
CODE:
use POE;

POE::Session->create
        ( inline_states =>
                { _start => sub { print "hello world\n"; }
                }
        );
       
POE::Kernel->run;

会直接打印hello world后退出。
    接着看上面的例子中的_start状态的处理函数,其中的代码
      my $wheel = POE::Wheel::SocketFactory->new
   创建了一个Wheel::SocketFactory,这个Wheel会根据你指定的参数去连接远程的服务,在连接完成后,自动触发SuccessEvent参数指定的状态(上面的程序是connected状态)。
    在connected状态的处理函数中,
      my $wheel = POE::Wheel::ReadWrite->new
    创建了一个Wheel::ReadWrite,在构造函数中你可以指定可读、出错和所有数据已经发送三种情况的对应状态,然后你就可以在对应状态的处理函数中处理各种情况了。
    上面的程序通过指定FlushedEvent,标明了当所有数据被发送后这个Session会被触发到flushed状态。
   (简单总结一下,对应Wheel的参数一般是当某种情况发生时,所属的Session的应该被置为什么状态,而Session的参数主要是各种状态的处理函数。)
    在各个回调函数中,你可以通过@_[KERNEL]得到系统Kernel对象,通过@_[HEAP]得到Session相关数据对象(比如在服务器程序每个Session中就会保留这个客户端的相关信息,地址用户名等),其他不同的处理函数得到的额外参数个数不同,分别可以通过@_[ARG0]、@_[ARG1]、@_[ARG2]得到。比如connected的处理函数就通过@_[ARG0]得到了SocketFactory创建的socket对象。
    下面这一据代码:
      $heap->{wheel} = $wheel;
    有着特殊的意义,当一个Wheel被创建后,应用程序必须把它保存在某处,否则当离开作用域对象被注销后,所有的功能都无法实现了。保存在Session相关数据HEAP中是一个非常自然的选择。类似于flushed处理函数中的代码:
      delete $_[HEAP]->{wheel};
    如果不执行这一句,Wheel就永远存在,程序也就无法退出了。
    下面给出一个简单POE应用程序的框架代码:

[Copy to clipboard] [ - ]
CODE:
use warnings;
use strict;

use POE;
use POE::Wheel::Somewheel;

POE::Session->create
        ( inline_states =>
                { _start => sub {
                        # do something initial
                        # new some Wheel
                        # put the wheel in HEAP
                  },
                  state_1 => sub {
                          # handle state 1
                          # new some other Wheel
                  },
                  state_2 => sub {
                    # handle state 2
                  },
                }
        );

POE::Kernel->run;

顶,先占位
其实 POE 不光可以用来作为网络服务器框架。
事实上它是一个包含了很多现成的组件(包括网络组件)的自动机。
凡是可用有穷自动机算法描述的应用,都可以用 POE。
而且 POE 是纯 Perl 实现,因此不用担心会依赖什么。它什么都不依赖。移植性特别好。
另外,用过 PDK 的人都知道,纯 Perl 实现的模块是最好打包的了。

下面给出一个简单的例子,就当是 Hello world 吧。

[Copy to clipboard] [ - ]
CODE:
D:\MoChou\poetut>cat tpoe.pl
#!/usr/bin/perl

use warnings;
use strict;
use POE;

POE::Session->create (
    inline_states =>
    {
        _start => \&session_start,
        _stop  => \&session_stop,
        count  => \&session_count,
    }
);

print "启动 POE 内核...\n";
POE::Kernel->run();
print "POE 内核运行结束。\n";
exit;

sub session_start {
    print "Session 启动。Session ID = ", $_[SESSION]->ID, "\n";
    $_[HEAP]->{count} = 0;
    $_[KERNEL]->yield("count");
}

sub session_stop {
    print "Session 停止。Session ID = ", $_[SESSION]->ID, ".\n";
}

sub session_count {
    my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
    my $session_id = $_[SESSION]->ID;

    my $count = ++$heap->{count};
    print "数数 $count\n";

    $kernel->yield("count") if $count < 10;
}

D:\MoChou\poetut>tpoe
Session 启动。Session ID = 2
启动 POE 内核...
数数 1
数数 2
数数 3
数数 4
数数 5
数数 6
数数 7
数数 8
数数 9
数数 10
Session 停止。Session ID = 2.
POE 内核运行结束。

D:\MoChou\poetut>

比较讨厌的是这东西无法通过PDK直接进行编译!


QUOTE:
原帖由 战鹰 于 2007-3-14 10:26 发表
比较讨厌的是这东西无法通过PDK直接进行编译!

呵呵。
那是因为你对 PDK 理解不深,没法自行解决这个问题。
POE 是最好打包的模块了。因为它是纯 Perl 实现的(难以想象)。
POE 里有好多模块都是动态加载的,而 PDK 没法检测到动态加载的模块并包含进去,因此需要解决一下这个问题。
我的解决方法就是,PDK 打包以后,运行时如果缺什么模块,就把那个模块显式地 use 一下。这样问题就解决了。
当然也可以通过配置 perlapp 的工程文件,不过我不想过多地研究 PDK。
比如我那个决战文本客户端里,就有这么一段代码:

[Copy to clipboard] [ - ]
CODE:
# 加载 POE 以及本程序中所使用到的组件
# 之所以要写这么多是因为有些动态加载的模块 PerlAPP 识别不到,
# 编译后缺少东西不能正常运行,所以必须得写明了。
use POE;
use POE::Session;
use POE::Loop::Select;
use POE::Wheel::SocketFactory;
use POE::Wheel::ReadWrite;
use POE::Driver::SysRW;
use POE::Filter::Line;
use POE::Filter::Stream;
use POE::Resource::Aliases;
use POE::Resource::Events;
use POE::Resource::Extrefs;
use POE::Resource::FileHandles;
use POE::Resource::SIDs;
use POE::Resource::Sessions;
use POE::Resource::Signals;
use POE::Resource::Statistics;
use POE::Resource::Controls;
use POE::Wheel;

Wheel:数据传送带

对于应用程序而言,输入输出是一个非常重要而且耗时的部分。通过使用Wheel,应用程序可以方便地监控IO事件并简化对输入输出操作的编写。对于Socket通讯程序来说,ListenAccept、SocketFactory、ReadWrite三种Wheel分别对应监听端口、建立连接和传送数据三种Socket操作。
1. ListenAccept
功用:处理一个监听端口的连接事件。
事件参数:AcceptEvent,其值在有客户连接时被触发。新建立的socket通过ARG0传送给处理函数。
2. SocketFactory
功用:建立到远程的连接。
事件参数:SuccessEvent,其值在连接完成时被触发。新建立的socket通过ARG0传送给处理函数。
3. ReadWrite
功用:读写数据。
事件参数:InputEvent,其值在当有数据到达时被触发。数据通过ARG0传送给处理函数。
        FlushedEvent,其值在当所有缓冲的数据被发送出去后被触发。你可以在这个事件的处理函数中发送新数据、关闭连接等。

下面通过一个具体的聊天室server/client程序来说明Wheel的使用。客户端接收从服务端传送过来的数据,同时监控标准输入,如果有用户输入则把内容传送给服务端;服务端监听新的连接,并把每一个客户端传送过来的数据广播到所有的客户端。客户端这儿使用了一个Wheel ReadLine,它可以监控终端输入。
chats服务端

[Copy to clipboard] [ - ]
CODE:
use warnings;
use strict;

use IO::Socket;
use POE qw /Wheel::ListenAccept Wheel::ReadWrite/;

# 创建监听Socket及处理Session
POE::Session->create
        ( inline_states =>
                { _start => \&start_server,
                  new_connected => \&new_connected,
                  client_input => \&client_input,
                }
        );

POE::Kernel->run;       

sub start_server {
        my ($kernel, $heap) = @_[KERNEL, HEAP];
        my $server = IO::Socket::INET->new
                ( LocalPort => 8000,
          Listen => 16,
          Reuse  => "yes",
        ) or die "can't make server socket: $@\n";
        
        $heap->{server} = POE::Wheel::ListenAccept->new
                ( Handle => $server,
                  AcceptEvent => 'new_connected',
                );
}

sub new_connected {
        my ($heap, $client) = @_[HEAP, ARG0];
        my $wheel = POE::Wheel::ReadWrite->new
                ( Handle => $client,
                  InputEvent => 'client_input',
                );
        # 系统中每个wheel的ID是唯一的
        $heap->{client}->{ $wheel->ID } = $wheel;
}

sub client_input {
        my ($heap, $input, $wid) = @_[HEAP, ARG0, ARG1];
        # 广播数据。如果愿意,可以屏蔽掉$wid,即发送消息的客户端
        map { $heap->{client}->{$_}->put( $input ) } keys %{$heap->{client}};
}

chatc客户端

[Copy to clipboard] [ - ]
CODE:
use warnings;
use strict;

use IO::Socket;
use POE qw /Wheel::SocketFactory Wheel::ReadWrite Wheel::ReadLine/;

POE::Session->create
        ( inline_states =>
                { _start => \&start_chat,
                  connected => \&connected,
                  connect_fail => \&connect_fail,
                  server_input => \&server_input,
                  user_input => \&user_input,
                }
        );
       
POE::Kernel->run;

sub start_chat {
        my $wheel = POE::Wheel::SocketFactory->new
                ( RemoteAddress => 'localhost',
                  RemotePort => 8000,
                  SuccessEvent => "connected",
                  FailureEvent => "connect_fail",
                );
        $_[HEAP]->{server} = $wheel;
}       

sub connected {
        my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
        my $wheel = POE::Wheel::ReadWrite->new
                ( Handle => $socket,
                  InputEvent => "server_input",
                  ErrorEvent => "error_happened",
                );
        $heap->{server} = $wheel;
       
        my $console = POE::Wheel::ReadLine->new
                ( InputEvent => 'user_input'
                );
        # 告诉ReadLine监控终端
        $console->get( 'input your message, bye to quit: ');
        $heap->{console} = $console;
}

sub connect_fail {
        delete $_[HEAP]->{server};
}

sub server_input {
        my ($heap, $input) = @_[HEAP, ARG0];
        # 如果使用print "$input\n"会搞乱终端
        $heap->{console}->put( $input );
}

sub user_input {
        my ($heap, $input) = @_[HEAP, ARG0];
        if ($input =~ /(quit)|(exit)|(bye)/i) {
                delete $heap->{server};
                delete $heap->{console};
                return;
        }
        # 发送到服务端
        $heap->{server}->put( $input );
        # 继续监控终端
        $heap->{console}->get( 'input your message, bye to quit: ');
}

Wheel::ReadLine 在 windows 下好用不?


QUOTE:
原帖由 flw 于 2007-3-14 13:25 发表
Wheel::ReadLine 在 windows 下好用不?

查了资料好像是不支持,它使用select监控终端。而win32的select只能用在socket上。