Channels ▼

Eric Bruno

Dr. Dobb's Bloggers

Java Concurrency: Queue Processing, Part 2

April 15, 2012

In Java Concurrency: Queue Processing, Part 1 (the first installment in this series), we explored how to coordinate a pool of worker threads listening on a queue, which was also the monitor object used to signal workers to process the queue. In this installment, we look at the java.util.concurrent package. Specifically, we'll explore how to coordinate the activities of multiple threads using the SychronousQueue class and other classes that implement the BlockingQueue interface.

More Insights

White Papers

More >>

Reports

More >>

Webcasts

More >>

The Hand Off

There are times when you need to process different actions in different threads, but they need to be coordinated using a hand-off pattern. In other words, you can have a child thread that does some work, and then sits and waits for more work from a queue. On the other side, you can have a producer thread that chugs along until it needs to hand-off a portion of its work activity to another thread. With a SynchronousQueue, both sides of the queue (consumer and producer) will block until there is a corresponding producer and consumer, respectively.

To be precise, a SynchronousQueue isn't really a queue at all, since it doesn't have any capacity. The requirement is that upon insertion into the queue, the producer will block until there is a consumer ready to pull a message off the queue. Similarly, a consumer will block until a producer places a message onto the queue; hence the hand-off pattern.

Fortunately, a SynchronousQueue is very easy to use; all of the complexity is implemented for you within Java's concurrent collection classes. To illustrate, let's look at a simple example. First, the worker (consumer) thread:

public class MyWorker extends Thread {
    private final BlockingQueue<String> queue;
    public MyWorker(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            while ( true ) {
                String s = queue.take();
                doWork(s);
            }
        }
        catch ( InterruptedException ie ) { 
            // just terminate
        }
    }
    …
}

The doWork() method has been omitted, but all it does in this example is output a simple message. With the SynchronousQueue class, consumers call the blocking take() method. Calls to remove() or peek() will result in an Exception or null respectively, always. Also, the call to take() will only return when a producer places something into the queue.

The producer code is straightforward as well:

public class SyncQueueExample {
    // …    
    public SyncQueueExample() {
        try {
            int workItem = 0;
            // Create a synchronous queue
            BlockingQueue<String> queue = new SynchronousQueue<String>();

            // Create the child worker thread
            MyWorker worker = new MyWorker(queue);
            worker.start();

            // Start sending to the queue
            while ( true ) {
                System.out.println("\nPlacing work on queue");
                String work = "Work Item:" + (++workItem);
                queue.put(work);
            }
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
    }
}

When run, the output between the queue producer and consumer will be "coordinated," in effect, and alternate as such:

  Placing work on queue
  Thread 'Thread-1' processing work: Work Item:1

  Placing work on queue
  Thread 'Thread-1' processing work: Work Item:2

  Placing work on queue
  Thread 'Thread-1' processing work: Work Item:3

  Placing work on queue
  Thread 'Thread-1' processing work: Work Item:4
  …

To continue on the worker thread pool theme from the previous blog, you can create multiple queue consumers, each of which will block on the call to the SynchronousQueue's take() method until it's chosen to process a message:

            // Create child worker thread pool
            int POOL_SIZE = 5;
            for ( int i = 0; i < POOL_SIZE; i++ ) {
                MyWorker worker = new MyWorker(queue);
                worker.start();
            }

This pattern is useful if processing each queued work request were to involve some lengthy operation, such as file IO. Since there can be many consumers waiting, the work queue producer would be free to continuously generate work even while an individual consumer took considerable time processing a single item. In this case, the producer thread would only be blocked on insertion when all consumer threads were busy processing. Once a worker thread completed its processing and once again called take(), the producer's call to put() would unblock and the entire process would continue.

Hand Off, with Buffering

A similar scenario can be achieved with the ArrayBlockingQueue. In this implementation of the BlockingQueue interface, a producer can produce work on a queue until a certain number of outstanding queued items exist, at which point it will block. Until that number is reached, however, the producer will be free to continue producing work. As with a SychronousQueue, calls to take() by a consumer will block if there are no items on queue, but will return if and when queued items exist.

As for the consumer worker threads, the MyWorker class remains exactly the same as in the previous example. Since both SychronousQueue and ArrayBlockingQueue are instances of the BlockingQueue interface, no code change is required — this is polymorphism in action.

Regarding the producer, the code from the previous example is mostly unchanged, except the single line where a SynchronousQueue was created. Instead, we need to create an ArrayBlockingQueue, where a number is provided in the constructor:

    // Create a synchronous queue
    int MAX_Q_SIZE = 5;
    BlockingQueue<String> queue = new ArrayBlockingQueue<String>(MAX_Q_SIZE);

This number indicates the maximum size of the queue, which is the number of items that can be placed on the queue before the producer blocks on calls to put(). Once consumers remove items and the queue size drops below this maximum, the producer's call to put() will return. Depending on OS thread scheduling as well as whether you specify the "fairness" of the queue's consumer processing, you will see varied output in terms of messages being placed on the queue and when they are processed:

  ...
  Placing work on queue
  Placing work on queue
  Placing work on queue
  Thread 'Thread-1' processing work: Work Item:6
  Thread 'Thread-2' processing work: Work Item:7
  Thread 'Thread-3' processing work: Work Item:8
  Thread 'Thread-4' processing work: Work Item:9
  Thread 'Thread-5' processing work: Work Item:10
  Placing work on queue
  Placing work on queue
  ...

Priority, Delays, and Other Details

There are multiple queue classes in the java.util.concurrent package that implement the BlockingQueue interface (we've examined the first two in detail here):

  • SynchronousQueue: A hand-pattern. Producer(s) block until there is a consumer available; consumer(s) block until there is an enqueued message.

  • ArrayBlockingQueue: Similar to the SynchronousQueue, except the queue can contain a pre-set number of items before the queue producer(s) block on queue insertion.

  • DelayQueue: Elements placed in the queue are not available for removal until their delay time has expired. Items that are furthest past their expiration time are available for removal first. Calls to put() do not block since this queue is unbounded, although calls to take() will block until an expired message is available.

  • LinkedBlockingQueue: Similar to an ArrayBlockingQueue, but where queue growth is allowed. However, the queue constructor does accept an optional parameter to constrain the growth to a maximum number of items.

  • LinkedTransferQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal.

  • PriorityBlockingQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal, and those items are subject to priority-based ordering. Enqueued objects must be comparable, meaning they must implement the Comparable interface and required methods.

In part 3 of this series on Java concurrency, we'll continue with the java.util.concurrent package and explore Executors and the ExecutorService class.

Happy Coding!

— EJB

Related Reading






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 


Video