Channels ▼
RSS

Web Development

Getting Started With POE


sungo is a professional perl programmer, the author of several CPAN modules and a POE committer. He may be reached at sungo@pobox.com or on irc.perl.org #perl.


Introduction

Enterprise Perl is a phrase that is thrown around a lot in the Perl community these days. Perl is so flexible that corporations must come to terms with how much of Perl they're going to allow in their development environment. Most of these discussions, however, assume that Perl is only a good choice for automation tasks (single run applications) or WWW applications. Perl simply can't handle the amount of data, transactions, users, etc that other languages can. One would never implement high performance, long running applications in Perl. (Or so the thinking goes.)

In the mid 90s, these thoughts were more or less true. Since then, Perl has grown up. Perl is now capable of handling all but the most speed-thirsty applications and is a prime choice for server-based application design. Several frameworks now exist to make these applications easier to build and easier to maintain.

My framework of choice is the Perl Object Environment, or POE. POE is a single-threaded, event driven, cooperative multitasking environment for Perl. Basically, POE is an application framework in which a single threaded perl process waits for events to occur so it can act accordingly. This event loop comprises the core of a POE process.

If all POE offered was an event loop, there would not be much to talk about, though. Nor would POE be particularly special. Several event loop modules already exist on CPAN. Event, Coro, IO::Events, and IO::Poll all offer similar functionality. However, any worthwhile application demands more than a simple set of actions.

The Lingo

As an application framework, POE offers several abstractions to make design and implementation easier. It is important to understand POE's unique lingo to use it properly.

Kernel

The Kernel is the core of a POE system, living in the POE::Kernel namespace. It is analagous to the kernel of an operating system (hence the name) and it handles all event queuing and dispatching and all manner of other low-level functionality. Nearly three-quarters of the POE API is utilized by operating on the kernel. As such, the kernel object (which is created for you behind the scenes) is always available via the global variable $poe_kernel. The POE system is initialized and launched using the run() method on the kernel. This method blocks and will only return when POE has completely shutdown.

Sessions

A Session is the fundamental unit of the POE environment and lives in the POE::Session namespace. It is essentially a worker class and embodies a series of states and events. Sessions are slightly analogous to threads in that they have a unique runtime context and a semi-private data store (called the "heap"). Each session operates independently from other sessions, receiving time-slices from the POE kernel. It is important to remember that, despite the similarity to threads, all POE sessions run in the same single-threaded process and share CPU time.

Wheels

For some tasks, a full session is unnecessary. Sometimes, it makes more sense to alter the abilities of an existing session to provide the desired functionality. Wheels mutate or alter the abilities of a session to provide some new functionality. They live in the POE::Wheel namespace.

Wheels share their entire operating context with the session but do not have the same features as a session. Wheels do not have their own heap and cannot create aliases for themselves. In many ways, they are like a parasite clinging to the side of the user's code.

The upside to using Wheels is the loss of internal POE overhead. Sessions require a certain amount of maintenance to keep running. POE checks sessions to see if they still have work to do, if there are timers or alarms outstanding for them, if they should be garbage collected, etc. The more sessions that exist in a system, the more that overhead grows. Wheels have none of this overhead. They piggyback on top of the user's session so, apart from any events they may trigger as part of their normal operation, there is no inherent internal POE overhead in using a wheel.

POE ships with a few core wheels. POE::Wheel::SocketFactory allows a session to talk to network sockets or listen on network sockets. This is generally used in conjuction with POE::Wheel::ReadWrite which notifies a session of the readability or writability of file descriptors. POE::Wheel::FollowTail allows a session to "tail" a file (like the "tail" command in most unices) and receive notification when new content is available.

Filters

Many wheels handle incoming and outgoing data. For instance, FollowTail catches new content from whatever file it is watching and presents it to the session. When watching a log file, simply getting a blob of text back is perfectly acceptable. In real systems applications, however, POE is generally watching more complex data streams like HTTP, XML-RPC, MP3 streams or proprietary protocols.

Filters provide translation services to allow users to get back sane objects or data structures instead of giant blobs of network data. Basically, filters are very simple data-parsing modules. Most POE filters are limited enough to be used outside of a POE environment. They know nothing of POE or of the running POE environment. Most wheels accept filters for both input and output and allow for filters to be hot-plugged at runtime.

Drivers

Drivers provide low-level IO primitives. Generally, wheels use drivers to read and write from file descriptors and the like without needing to know the details of the operation. Currently, POE ships with a single driver, POE::Driver::SysRW, that abstracts "sysread" and "syswrite" semantics. SysRW defaults to a block size of 65536 which can be customized via a constructor parameter.

Components

Components are sessions that provide services. Unlike a wheel, which plugs into a session and adds functionality, Components run seperately in the background and offer some functionality through an abstracted API. They are analogous to system daemons on modern unices. For instance, POE::Component::Client::DNS offers asynchronous DNS resolution.

Standard State Parameters

All POE states receive a series of standard parameters. For reasons of speed, these are passed as part of @_ and accessed via subroutine constant indexes.

my $kernel = $_[KERNEL];
my ($kernel, $session) = @_[KERNEL, SESSION];

These are the fields that make up the standard parameter list:

  • KERNEL
    • A reference to $poe_kernel

  • SESSION
    • A reference to the current session

  • SENDER
    • A reference to the session that sent an event

  • STATE
    • The event name that caused the state to occur

  • HEAP
    • A reference to the session's storage space

  • OBJECT
    • For object states, this contains the object whose method is being invoked. For package states, this contains the name of the package whose method is being invoked. This parameter will always be undefined for inline states.

  • CALLER_FILE / CALLER_LINE / CALLER_STATE
    • The file, line number, and state from which this event was sent

  • ARG0 .. ARG9
    • The first ten custom parameters to a state. They will always be at the end of @_ so it is possible to send more than 10 parameters. Often they are accessed using $#_ like so:

               my @args = @_[ARG0 .. $#_];

Working with POE

With those concepts in mind, it is time to look at some code. To make life a bit easier, let's say we would like to accept and parse data that resembles CGI query strings. This data will be key value pairs in which the key and value are seperated by "="'s and the pairs themselves are delimited by "&". An example string is as follows:

        foo=bar&baz=1&bat=2

A Filter

First, we need a parser for our data streams. As discussed earlier, Filters are much easier to deal with because they are unaware of their environment and the POE context in which they are run.

package POE::Filter::SimpleQueryString;

use warnings;
use strict;

use Carp qw(carp croak);

Next we need a constructor.

sub new {
    my $class = shift;

    my $self = bless {
        buffer => undef, 
    }, $class;

    return $self;
}

This is about the most simplistic constructor possible. This very simple filter requires no parameters to operate. It is perfectly reasonable, however, to demand parameters of the user. For instance, if the filter could decrypt the incoming data before parsing, a parameter could turn that feature on. NOTE: Use constructor parameters sparingly in filters. Some wheels and components only take the name of the filter and do not allow parameter passing.

POE has two possible APIs for filters. The easiest, and the oldest, interface uses get() and put() to hand off the data. get() is passed a large data chunk and it parses the data, returning as many records as the filter can find as an array reference. put() works similarly. It is handed a set of records and translates them back into a raw data stream, returning an array reference of data chunks.

The second, and more complex, API for filters allows for runtime switchout of filters. The second API uses get_one_start() and get_one() to parse data into records and put() to translate records into a data stream. This API version also calls for a get_pending() method that returns the contents of the current internal buffer, allowing wheels to trade out filters without data loss. get_one_start() accepts the initial data buffer as an array reference containing data chunks. It adds this data to an internal buffer and returns. get_one() processes that buffer, returning an array reference of records. Unlike get() in the previous API, get_one() is not greedy and should return either zero or one record.

We will be using the newer get_one_start()/get_one() version of the Filter API. If this code were destined for CPAN, we would also implement the older API for backwards compatibility.

get_one_start / get_one

As stated above, get_one_start()'s job is simply to add data to the internal buffer.

sub get_one_start {
    my $self = shift;
    my $incoming = shift;

    $self->{buffer} .= join '', @$incoming;

    return;
}

Note that we are creating one big string buffer from the incoming data chunks. The filter has no control over how data is chunked. Our parser, however, has specific requirements about what a chunk should look like. Smashing everything back into a string buffer allows the parser inside get_one() to chunk data however it wants.

get_one()'s job is to transform raw data into cooked record sets. The example string above foo=bar&baz=1&bat=2 will become a hash.

sub get_one {
    my $self = shift;

In our super-easy format, an individual record is terminated by a newline, "\n". Key value pairs are delimited by "&". The key and value themselves are separated by an "=". Note that we aren't dealing with issues like character escaping or data taint.

    my @chunks;

Each parsed line makes up a chunk of data. We want to represent each record as a distinct entity to the user.

    $self->{buffer} =~ s/^(.+?)\n//;
    my $line = $1;

    if(defined $line && length $line) {
        my @pairs = split(/&/, $line);
        my %chunk;

        foreach my $pair (@pairs) {
            my ($key, $value) = split(/=/, $pair, 2);

So what happens if there is more than one instance of a given key in a record? Simple. We make an array reference. The user will need to inspect the value of each key to determine if they have more than one value.

            if(defined $chunk{ $key }) {
                if(ref $chunk{ $key } eq 'ARRAY') {
                    push @{ $chunk{ $key } }, $value;
                } else {
                    $chunk{ $key } = [ $chunk{ $key }, $value ];
                }
            } else {
                $chunk{ $key } = $value;
            }
        }

        push @chunk, \%chunk;
    }

    return \@chunks;

A call to get_one() returns an array reference containing hashes of our data.

    $VAR1 = [
        {
            'bat' => '2',
            'baz' => '1',
            'foo' => 'bar'
        }
    ];

put

put() goes the other direction with our data. It accepts an array reference of records like we created above and creates raw data chunks.

sub put {
    my $self = shift;
    my $records = shift;

    my $buffer;

    foreach my $record (@$records) {
        my @pairs;
        foreach my $key (keys %$record) {
            if(ref $record->{$key} eq 'ARRAY') {
                foreach my $value (@{ $record->{ $key }) {
                    push @pairs, "$key=$value";
                }
            } else {
                my $value = $record->{$key};
                push @pairs, "$key=$value";
            }
        }
        $buffer .= join("&", @pairs) . "\n";
    }

    return [ $buffer ];
}

A Session

Now that we have the ability to parse data, we need to get data from somewhere. Let's provide a listening socket on the network to catch incoming data. First we need a session.

Construction

The Session constructor defines a small state machine. The "inline_states" parameter creates this by using an explicit name to code mapping. The key of the hash above is the state name and the value is a code reference to execute when that event happens.

POE::Session->create(
    inline_states => {
        _start       => \&start,
        _stop        => sub {},

        mystate      => \&mystate,
        myotherstate => \&myotherstate,
    },
);

The package_states parameter creates states using the name of functions in a given package. In this example, the state name is the same as the function name.

POE::Session->create(
    package_states => {
        MyPackage => [ 'function1', 'function2', ],
    },
);

It is also possible to specify different state names.

POE::Session->create(
    package_states => {
        MyPackage => {
            mystate      => 'function1', 
            myotherstate => 'function2', 
        ],
    },
);

The object_states parameter creates states using the name of methods on a given object.

POE::Session->create(
    object_states => {
        $some_object => [ 'method1', 'method2' ],
    },
);

As with package states, it is possible to specify different state names.

POE::Session->create(
    object_states => {
        $some_object => {
            mystate      => 'method1', 
            myotherstate => 'method2',
        },
    },
);

The two states _start and _stop are mandatory and session construction will fail without them. _start is called when the POE environment is fully active. Use this state to set up any events or timers that the session needs. _stop is called as the final event in the lifespan of a session. Use this to clean up any filehandles or other objects that need to be explicitly closed or otherwise shutdown.

The Heap

Sessions also have a place to store internal data. This storage is called the heap and it can be allocated in the Session constructor.

POE::Session->create(
    # ...
    heap => {
        hostname => 'localhost',
    },
);

The heap is unique to the session and should be used to store all data that the session needs to operate. Some wheels will put data in the heap, though that is generally discouraged. Make sure to check the documentation on the wheels you are using and avoid name collisons.

Setup

We are going to plug POE::Wheel::SocketFactory and POE::Wheel::Readwrite objects into our session, so we need several states to handle their needs.

POE::Session->create(
    inline_states => {
        _start          => \&start,
        factory_success => \&factory_success,

        client_input    => \&client_input,

        fatal_error     => sub { die "A fatal error occurred" },
        _stop           => sub {},
    },
);

When the session starts up, we launch a POE::Wheel::SocketFactory wheel. With the Reuse flag on, SocketFactory will continuously listen on the specified port and address, handing us events for each client.

sub start {
    $_[HEAP]->{factory} = POE::Wheel::SocketFactory->new(
        BindAddress     => '127.0.0.1',
        BindPort        => '31337',
        SuccessEvent    => 'factory_success',
        FailureEvent    => 'fatal_error',
        SocketProtocol  => 'tcp',
        Reuse           => 'on',
    );
}

Notice that wheels take the name of the states we created above as parameters. POE uses the word "Event" to signify the name and the word "State" to signify the code that is run.

When a client makes a connection, the SocketFactory notifies the session via the SuccessEvent. It is our job to figure out what to do with the filehandle that SocketFactory built for us. In this case, we want read/write functionality using the filter we built above. POE::Wheel::ReadWrite provides this functionality, including the ability to plug in our filter. Each wheel-based event provides a unique identifier so it is possible to handle more than one client per session. The unique id passed to the SuccessEvent identifies each client.

sub factory_success {
    my( $handle, $wheel_id ) = @_[ARG0, ARG1];
    $_[HEAP]->{clients}->{ $wheel_id }  =
        POE::Wheel::ReadWrite->new(
            Handle      => $handle,
            Driver      => POE::Driver::SysRW->new(),
            Filter      => POE::Filter::SimpleQueryString->new(),
            InputEvent  => 'client_input',
        );
}

POE::Wheel::ReadWrite will take data from the incoming socket, run it through our filter, and then give it to us via the InputEvent we provided in the constructor. What we do with the data is up to us. Let's print it out and echo it back to the client.

sub client_input {
    my ($input, $wheel_id) = @_[ARG0, ARG1];

    use Data::Dumper;
    print Dumper $input;

    $_[HEAP]->{clients}->{ $wheel_id }->put( $input );
}

Data::Dumper handles printing out the structure for us. The put() call sends our data through our filter and back out to the client. If our algorithms are correct, we should get the same data back that we put in.

sungo@nodens% telnet localhost 31337
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
foo=bar

The server prints out:

sungo@nodens% perl -Ilib examples/server.pl
$VAR1 = { 
    'foo' => 'bar'
};

And then echoes back to us:

foo=bar

We're in business!

A Component

That was a lot of code to get a simple TCP server up and running. Surely this can be simplified. POE itself comes to the rescue. POE ships with a component specifically designed to simplify TCP server creation. We can replace all that code above with a simple call to the component's constructor.

POE::Component::Server::TCP->new(
    Address => '127.0.0.1',
    Port    => '31337',

    ClientFilter => "POE::Filter::SimpleQueryString",
    ClientInput => sub {
        my $input = $_[ARG0];
        use Data::Dumper;
        print Dumper $input;

        $_[HEAP]->{client}->put($input);
}

The downside is that Server::TCP doesn't allow for argument passing to the filter's constructor and we lose the flexibility of doing things by hand. For a lot of situations, however, this component does the trick quite nicely.

To release this code to the world, we need to make our own component. For the purpose of this example, we're going to wrap the smaller code above instead of the larger wheel-based example. There is no reason why you couldn't use the wheel-based code in your component, however.

package POE::Component::SimpleQueryString;

use warnings;
use strict;

use vars qw($VERSION);
$VERSION = '0.01';

use POE;
use POE::Component::Server::TCP;

use POE::Filter::SimpleQueryString;

use Carp qw(croak);

sub new {
    my $class = shift;
    my %args = @_;

    my $addr = delete $args{ListenAddr} 
        or croak "ListenAddr required";

    my $port = delete $args{ListenPort} 
        or croak "ListenPort required";

    my $input_event = delete $args{InputEvent} 
        or croak "InputEvent required";

    my $server = POE::Component::Server::TCP->new(
        Address => $addr,
        Port    => $port,

        ClientInput  => $input_event,
        ClientFilter => "POE::Filter::SimpleQueryString",
    );

    return $server;
}

1;

Now our users can just load up the component like so:

POE::Component::SimpleQueryString->new(
    ListenAddr => '127.0.0.1',
    ListenPort => '31337',
    InputEvent => sub {
        my $input = $_[ARG0];
        use Data::Dumper;
        print Dumper $input;

        $_[HEAP]->{client}->put($input);
    },
);

Conclusion

I hope that this introduction to POE has piqued your interest. POE allows you to create powerful, event-driven programs that can be used in many corporate, and private environments.

POE is available on the CPAN (http://search.cpan.org/dist/POE) and has a rich, community-maintained website (http://poe.perl.org).

TPJ


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 
Dr. Dobb's TV