Java Concurrency: Queue Processing, Part 1
This is the first in a series on Java concurrency patterns. Java has many built in classes and functionality to help write concurrent code, with varying ways to synchronize the activities of threads. For instance, there are monitors, locks, latches, atomic operations, delayed tasks, future tasks, barriers, the list goes on. In this installment, we'll implement queue processing across a pool of worker threads using basic Java Thread synchronization and object monitors.
The Queue as Monitor
Every Java class extends the Object class by default. There are several built-in methods on this class, of which we'll focus on wait(), notify(), and notifyAll(). These methods operate on the object's monitor, which is used to synchronize operations across a group of threads. There are some rules to follow, but it's really quite simple: The waiting thread needs to own the object's monitor, which means it must call wait() within the code that synchronizes on the monitor. There are three ways of doing this, reference the Java API for more details. Here's one example:
synchronized (obj) {
while ( condition ) {
obj.wait();
// perform some action
}
}
Conversely, the thread that signals the waiting thread(s) will call notify()or notifyAll() on the same monitor object. It, too, needs to own the monitor to do this, as in this example:
synchronized (obj) {
obj.notify();
}
Calling notify() wakes up a single thread waiting on the monitor, while notifyAll() wakes up all of the threads waiting on the monitor. In general, calling notify()is more efficient as it results in fewer thread context switches, but there are times where notifyAll() may be a better choice. For instance, when work is produced at a high rate, it may be more efficient to wake as many threads as possible each time work is placed on the queue. This is mainly an implementation decision that you need to make.
Multithreaded Queue Example
Let's examine a common scenario in which tasks are executed by worker threads, and where the work is coordinated via a queue to ensure once-and-only-once execution of each task. In this case, we'll use the queue (itself a Java Object) as the monitor as well. Here's the overall design:
- Create a queue to act as a monitor and a way to transfer work
- Create a pool of worker threads (consumers) that each wait on the monitor (the queue)
- Create one or more producers that place items on the queue
- Notify the monitor when an item is placed on the queue, which in turn wakes up a worker to pull the next item off the queue
Let's start by looking at the worker's class, MyWorker, which itself extends Thread:
public class MyWorker extends Thread {
private static int instance = 0;
private final Queue<String> queue;
public MyWorker(Queue queue) {
this.queue = queue;
setName("MyWorker:" + (instance++));
}
@Override
public void run() {
while ( true ) {
try {
Runnable work = null;
synchronized ( queue ) {
while ( queue.isEmpty() )
queue.wait();
// Get the next work item off of the queue
work = queue.remove();
}
// Process the work item
work.run();
}
catch ( InterruptedException ie ) {
break; // Terminate
}
}
}
private void doWork(Runnable work) { ... }
}
The code that creates a pool of MyWorker objects, which is essentially a Thread pool, looks like this:
for ( int i = 0; i < THREAD_POOL_SIZE; i++ ) {
// Create and start the worker thread, which will
// immediately wait on the monitor (the queue in
// this case) to be signaled to begin processing
MyWorker worker = new MyWorker(queue);
worker.start();
}
The code to place work on the queue and signal a worker thread looks like this:
synchronized ( queue ) {
// Add work to the queue
Runnable work = getMoreWork();
queue.add(work);
// Notify the monitor object that all the threads
// are waiting on. This will awaken just one to
// begin processing work from the queue
queue.notify();
}
This code illustrates how straightforward basic Java thread synchronization is, although it is low-level. In Part 2 of this series, we'll look at the java.util.concurrent package and how it contains built-in classes to make synchronized queue and task processing even simpler.
Happy Coding!
EJB

