Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

Tools

Pipes, Channels, and Perl-Win32


Dr. Dobb's Journal April 1997: Pipes, Channels, and Perl-Win32

Jean-Louis is an independent consultant and author. He can be reached at [email protected] or via http://ourworld.compuserve.com/homepages/jl_leroy.


Perl is a powerful text-manipulation language that incorporates the best of UNIX utilities such as sed and awk. With Perl 5, the language also became a versatile object-oriented language that supports dynamic multiple inheritance.

There are several implementations of Perl, among them, Perl-Win32 -- the Win32 port of the Perl language. Provided by HIP Communications, Perl-Win32 is available as an executable (perl.exe) and DLL (perl100.dll). The DLL contains the interpreter wrapped in an abstract class called IPerlSupport. It includes numerous pure virtual functions that the interpreter calls to open, read, and write files, create processes, and the like. It's easy to tell what each function does: Function names are simply the name of a standard C library function, with the first letter capitalized.

Perl.exe derives a CPerlExe class from IPerlSupport and implements the pure virtuals by calling the C library. Its main function initializes a CPerlExe object and passes it the command-line arguments. Rather than launching perl.exe in a separate process, you can allocate a CPerlExe object and talk to the DLL directly.

Perl-Win32 is available with source code directly from HIP (http://www.perl.hip.com) or with the Microsoft Windows NT Resource Kit. (Microsoft supported HIP's effort, on the condition that the package could be recompiled using Microsoft's compiler.) In addition to extensive Win32 API support, HIP has repackaged the interpreter in a class. This makes it possible to run several instances of Perl concurrently.

I recently wrote a Perl-based multifile search-and-replace extension using Perl-Win32. Internally, it would run a Perl script in a separate thread, send the replace parameters and files down the interpreter's standard input stream, and collect the locations of the matches and the replace strings from the output stream. It seemed anonymous pipes would be perfect for the job. I wrote a program to test the idea, and it worked beautifully. But I couldn't get my mind off those two threads talking together. I worried about what would happen if one of the pipes became full or empty. Later, when I reduced the size of the pipes to one byte, the program deadlocked.

The Trouble with Pipes

When you create an anonymous pipe, you must specify the size of the underlying buffer. This size remains fixed as long as the pipe exists.

There are two situations in which you can block on a pipe I/O -- when you attempt to read from a empty pipe, and when you overflow the pipe buffer. In both cases, the thread is suspended until more data (for reads) or room (for writes) becomes available, or until the pipe is closed.

If the pipe is not empty but becomes empty during a read, the call will return. You can tell that the operation was not completed by checking the return value: It indicates how many bytes were actually read. This is your one and only chance of learning the pipe's status. If you get fewer bytes than you attempted to read, you'd better be sure that some other thread or process will send in more data before attempting to read from the pipe again. write, on the other hand, always blocks until all the data has been sent.

Pipes work well in situations where one process speaks and the other listens. The business becomes riskier when a dialog must take place. Imagine two threads, "client" and "server," and two pipes, client-to-server (CS) and server-to-client (SC). The client sends data in the CS pipe, and the server reads the data, performs computations, then writes results to the SC pipe, where the client collects them.

The client side cannot simply send all the data down the CS pipe and read the results from the SC pipe afterwards. Suppose the server produces more results than the SC pipe can hold. It blocks on the pipe, waiting for the client thread to read some data from it. The client thread is now in danger. If it overflows the CS pipe, it will also block because the server is not reading CS anymore.

Of course, the client could periodically read the SC pipe, in an attempt to prevent the server from blocking on a full SC pipe. Unfortunately, there is no guarantee that there's anything there to read. Perhaps the server needs to receive more data before sending results in the SC pipe. This time, both processes are blocked on empty pipes. This is a dilemma -- the right move in one scenario is lethal in the other. Neither is there a general way for the client to decide which situation it's in, since there's no way of finding whether a pipe is empty or full without starting a potentially blocking I/O on it.

How can you work around this difficulty? One answer would be to make the data stream self-describing; for example, you could prefix each packet of data with its size. The problem with this solution is that it requires both ends to cooperate. Or you could try to predict the amount of data the server will generate. But this requires an intimate knowledge of the server's behavior -- not an option for a Perl pattern-substitution server.

This is Not a Pipe

Alas, it seemed that I couldn't use anonymous pipes to communicate with Perl. Not all was lost, though. Since HIP has virtualized the interpreter's I/O calls, I could monitor the calls and take control whenever they involved the standard input, output, and error streams. All I needed was a safe way of passing information between the application's and interpreter's thread. Consequently, I created a better pipe, which I call a "channel."

Channels have several advantages over anonymous pipes. First, they're dynamically sized, which means you can't block while writing to a channel. Secondly, you can read it without implicitly blocking. The read member function returns the number of elements read, which can be zero. There's also a blocking version called blocking_read. Finally, channels are type safe -- they are implemented as a class template parameterized by the element's type. If you want to exchange integers between two threads, you use a channel<int>. The template's member functions will get the word and operate on whole integers. No need for casts or sizeofs.

Inside channel<>

A channel must be open before you can write to it. You can read from a channel even after it's been closed, though, just like you can read from a pipe after its write handle has been closed. A channel that contains data or is still open is said to be "alive." In other words, a channel is alive if it contains data or if it might contain data in the future. A channel is "dead" when it is both empty and closed, and thus will never contain data again (unless it's reopened). All blocking functions return when a channel dies, making it possible for a thread to send data into a channel then close it immediately, without waiting for other threads to process all of the channel's content.

Internally, a channel is a thread-safe standard queue; an example of this is provided electronically (see "Availability," page 3). It uses two Win32 synchronization objects -- a manual-reset event and a critical section. Both are initialized by the channel's constructor. The event is in a signaled state when the channel either contains data or is dead.

The CRITICAL_SECTION member variable is used to serialize access to the channel's internal data structures. Since it will be used in const member functions, it is declared as mutable. Instead of using an Enter/LeaveCriticalSection "sandwich," I use a lock object. Its constructor enters the critical section, and its destructor leaves it. This approach is cleaner, especially for functions that contain multiple return paths. It is also robust in the presence of exceptions.

Following is a list of other channel functions. The channel functions write, read, and blocking_read each have three overloads: one to transfer a single element, one to transfer N elements starting at an address, and one to transfer elements in a range delimited by two pointers. The latter two functions will become member templates parameterized by an iterator (a generalized pointer) when support for nested templates becomes available. Here, I'll will discuss only the transfer-N-elements version.

open empties the queue, resets the event, and puts the channel in the open state.

write puts the elements in the queue and sets the event to a signaled state if the queue was previously empty. If it contained data, the event is signaled already, and there's no point in making a potentially expensive call to the operating system.

read attempts to transfer the requested number of elements from the queue and returns the number actually read. If the queue becomes empty during the transfer and the channel still is open, read sets the event to nonsignaled. From then until either write or close is called, threads calling wait or blocking_read on this channel will be suspended.

wait waits for the event to become signaled; that is, for the channel to contain data or die. Since the event is manually reset, it remains signaled even after the call to WaitForSingleObject returns; this is appropriate because the queue size hasn't changed. It then returns a Boolean value that is True if the channel is still alive.

If only one thread is reading from the channel, you can be certain that ''something'' will be available after wait returns True. That "something" can be elements to be read or a report that the channel has been closed. If multiple threads are reading the channel, you don't have that guarantee. In short, wait is just a way of doing nothing efficiently, without wasting CPU cycles -- it is not a guarantee that subsequent reads will actually find data to transfer.

blocking_read calls read, then wait in a loop until the required count is reached or until wait returns False. In the latter case, nothing more will come through the channel.

close puts the channel in the closed state, exits the critical section, and signals the event. It is not a good idea to signal the event inside the critical section, because it would waste two thread switches under NT. Why? Currently, the NT scheduler switches from the thread signaling the event to the thread waiting for it -- provided it doesn't have a lower priority. If the closing thread owns the critical section at that moment, the waiting thread won't run for long, as it will almost immediately try to grab the critical section. As a result, execution will bounce back to the closing thread. You cannot perform this optimization in write because another thread could preempt just before the call to SetEvent and read all the data. When the writing thread would regain control, it would set the event in the signaled state even though the queue would now be empty.

Channel Streams

The channel class, however, ended up being too low level for my needs. I needed to exchange numerical data with Perl -- character counts, line numbers, and the like. Consequently, my next step was to create stream classes for channels of characters.

In the spirit of the standard iostream library, I created a templatized stream class that can be instantiated either for ANSI or Unicode characters. (To be honest, I had another reason, but it's less politically correct -- most implementations of templates require you to put all the code in the header file. This eliminates an implementation file and makes it very simple for an application to incorporate the code, as there is no library to link to.)

Listing One resents the channel stream classes. As you can see, supporting both character sets leads to some verbosity. You can't simply return EOF to report failure or end-of-file; rather, you must obtain the correct value (and its type) from the corresponding char_traits class. Likewise, you don't cast between ints and chars; you ask char_traits to do it for you.

Class channelbuf is straightforward. underflow() reads one character from the channel in blocking mode, because the semantics of underflow demand that one character be returned -- be it the EOF indicator. It then tries to read in the remainder of the buffer. overflow and sync are trivial.

The open function automatically opens its associated channel if it's not already open. It can optionally close the channel when the buffer itself is closed. After experimentation, I decided that requiring a separate opening and closing of the channel and its stream was tedious and error prone. Typically, you will either use the channel directly or do everything through stream objects. In the latter case, the less you deal with the channel, the better.

The stream classes do little more than allocate a buffer object and forward a few channel-specific operations to it. Since the istream, ostream, and iostream variants (and their wide-character counterparts) are similar, I factorized the common behavior in a template and used typedefs to create the familiar streams. Again, experimentation showed that it's convenient for output streams to automatically close the underlying channel, at least in situations where only one thread writes the channel. When the ostream is destroyed, it almost certainly means that no more data will travel down the channel. This small variation between ostream and the other two stream types is expressed by an additional template parameter.

Limitations of Channels

Channels are currently limited to interthread communications; in other words, they can't cross process boundaries. It would be possible to create an interprocess channel by combining a channel, a pipe, and a thread. The thread would simply copy data between the channel and the pipe and would frequently block; its public-member functions would write to the channel, though. Such a class could also be used in situations where the controlled program works with I/O handles and nothing else. In those cases, you could pass it the pipe's read or write handle.

Finally, the queue can grow quite large under certain circumstances. Under operating systems where the signaled thread immediately preempts the signaling thread, the queue will never get larger than the number of elements in the longest write -- if a thread is continuously reading from the channel. Under Windows 95, however, it is possible for the writing thread to queue the elements from several consecutive writes before the reading thread preempts. This behavior is acceptable if the writing thread just wants to get rid of the data as fast as possible; it also reduces the number of thread switches. But it can become a nuisance. For that reason, you may want to design a mechanism that forces a thread switch when the queue exceeds a preferred size and threads exist that are ready to read it.

Conclusion

With all the control I had over HIP's Perl interpreter, I had several ad hoc solutions available to me. Nonetheless, I chose to create a reusable component. This required some extra work, especially in the design of the interface's details. But I expect it to pay for itself the next time I encounter a similar problem.

For More Information

HIP Communications
1122 Mainland, Suite 350
Vancouver, B.C.
Canada V6B 5L1
604-606-4600
http://www.perl.hip.com/

DDJ

Listing One

# ifndef channelstream_defined# define channelstream_defined


# include <iostream> # include <streambuf> # include "channel.h"

template<class Char> class basic_channelbuf : public basic_streambuf<Char, char_traits<Char> > { public: basic_channelbuf(); ~basic_channelbuf(); void open(channel<Char>& c, BOOL autoclose); void close(); void close_channel();

BOOL is_open() const { return pc && pc->is_open(); } protected: virtual int_type underflow(); virtual int_type overflow(int_type c = EOF); virtual int_type sync(); private: channel<Char>* pc; enum { ibufsize = 1024, obufsize = 1024 }; Char ibuf[ibufsize]; Char obuf[obufsize]; BOOL autoclose; }; typedef basic_channelbuf<char> channelbuf; typedef basic_channelbuf<wchar_t> wchannelbuf;

template<class Char, class Base, BOOL Autoclose> class channelstream_ : public Base { public: channelstream_() : Base(&buf) { } channelstream_(channel<Char>& c,BOOL autoclose=Autoclose):Base(&buf) { open(c, autoclose); } void open(channel<Char>& c, BOOL autoclose = Autoclose) { buf.open(c, autoclose); } BOOL is_open() const { return buf.is_open(); } void close() { buf.close(); } void close_channel() { buf.close_channel(); } private: basic_channelbuf<Char> buf; }; typedef channelstream_<char, ostream, TRUE> ochannelstream; typedef channelstream_<char, istream, FALSE> ichannelstream; typedef channelstream_<char, iostream, TRUE> channelstream;

typedef channelstream_<wchar_t, wostream, TRUE> wochannelstream;

typedef channelstream_<wchar_t, wistream, FALSE> wichannelstream; typedef channelstream_<wchar_t, wiostream, TRUE> wchannelstream;

template<class Char> basic_channelbuf<Char>::basic_channelbuf() : pc(NULL) { } template<class Char> basic_channelbuf<Char>::~basic_channelbuf() { if (pc) close(); } template<class Char> void basic_channelbuf<Char>::open(channel<Char>& c, BOOL ac) { pc = &c; if (!c.is_open()) c.open(); autoclose = ac; setp(obuf, obuf + obufsize); } template<class Char> void basic_channelbuf<Char>::close() { sync(); if (autoclose && pc->is_open()) pc->close(); pc = NULL; } template<class Char> void basic_channelbuf<Char>::close_channel() { sync(); if (pc->is_open()) pc->close(); pc = NULL; } template<class Char> basic_channelbuf<Char>::int_type basic_channelbuf<Char>::underflow() { if (!pc->blocking_read(ibuf, 1)) return traits_type::eof(); setg(ibuf, ibuf, pc->read(ibuf + 1, ibuf + ibufsize)); return traits_type::to_int_type(*gptr()); } template<class Char> basic_channelbuf<Char>::int_type basic_channelbuf<Char>::sync() { return overflow(); } template<class Char> basic_channelbuf<Char>::int_type basic_channelbuf<Char>::overflow(int_type c) { if (!is_open()) return traits_type::eof(); pc->write(pbase(), pptr()); if (c != traits_type::eof()) { Char tmp = traits_type::to_char_type(c); pc->write(tmp); } setp(obuf, obuf + obufsize); return 1; } # endif

Back to Article


Copyright © 1997, Dr. Dobb's Journal

Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.