Prefer Using Active Objects Instead of Naked Threads
How to automate best practices for threads and raise the semantic level of our code
An Active Helper
The first thing we want to do is to automate the common parts and write them in one reusable place, so that we don't have to write them by hand every time for each new active class. To do that, let's introduce a helper class Active that encapsulates a thread behind a messaging interface and provides the basic message-sending and automatic pumping facilities. This section shows a straightforward object-oriented implementation that can be implemented as shown in any of our major languages (even in C, where the class would be written as a struct and the object's methods would be written as plain functions with an explicit opaque "this" pointer). After that, we'll narrow our focus to C++ specifically and look at another version we can write and use even more simply in that language.
The Active helper below provides the private thread and a message queue, as well as a sentinel done message that it will use to signal the private thread to finish. It also defines a nested Message class that will serve as the base of all messages that can be enqueued:
// Example 1: Active helper, the general OO way
//
class Active {
public:
class Message { // base of all message types
public:
virtual ~Message() { }
virtual void Execute() { }
};
private:
// (suppress copying if in C++)
// private data
unique_ptr<Message> done; // le sentinel
message_queue< unique_ptr<Message> > mq; // le queue
unique_ptr<thread> thd; // le thread
Note that I'll assume message_queue is a thread-safe internally synchronized message queue that doesn't need external synchronization by its caller. If you don't have such a queue type handy, you can use an ordinary queue type and protect it with a mutex.
With that much in place, Active can go on to provide the common machinery, starting with the message pump itself:
private:
// The dispatch loop: pump messages until done
void Run() {
unique_ptr<Message> msg;
while( (msg = mq.receive()) != done ) {
msg->Execute();
}
}
public:
// Start everything up, using Run as the thread mainline
Active() : done( new Message ) {
thd = unique_ptr<thread>(
new thread( bind(&Active::Run, this) ) );
}
// Shut down: send sentinel and wait for queue to drain
~Active() {
Send( done );
thd->join();
}
// Enqueue a message
void Send( unique_ptr<Message> m ) {
mq.send( m );
}
};
The Active Cookbook: A Background Worker
Now let's use the helper and actually write an active class. To write a class with objects that are active, we must add an Active helper and then for each public method:
- Have the public method capture its parameters as a message and call
Sendto send the call for later execution on the private thread. Note that these functions should not touch any member variables. They should use and store their parameters and locals only. - Provide a corresponding nonpublic class derived from
Messagewith constructor and data members that will capture the parameters and this pointer, and with anExecutemethod that will perform the actual work (and can manipulate member variables; theseExecutemethods should be the only code in the class that does so).
For example, say we want to have an agent that does background work that we want to get off a responsive thread, including the performance of background save and print functions. Here's how we can write it as an active object that can be launched from the responsive thread using asynchronous method calls like b.Save("filename.txt") and b.Print(myData):
class Backgrounder {
public:
void Save( string filename ) {
a.Send( new MSave( this, filename ) );
}
void Print( Data& data ) {
a.Send( new MPrint( this, data ) );
}
private:
class MSave : public Active::Message {
Backgrounder* this_; string filename;
public:
MSave( Backgrounder* b, string f ) : this_(b), filename(f) { }
void Execute() { … } // do the actual work
};
class MPrint : public Active::Message {
Backgrounder* this_; Data& data;
public:
MPrint( Backgrounder* b, Data& d ) : this_(b), data(d) { }
void Execute() { … } // do the actual work
};
// Private state if desired
PrivateData somePrivateStateAcrossCalls;
// Helper goes last, for ordered destruction
Active a;
};
Now we can simply enqueue work for the background thread as messages. If the worker's private thread is idle, it can wake up and start processing the message right away. If the worker is already busy, the message waits behind any other waiting messages and all get processed one after the other in FIFO order on the private thread. (Teaser: What if the background thread wants to report progress or output to the caller, or a side effect such as reporting that a Print message is done using the shared data? We'll consider that next month.)








