Channels ▼
RSS

.NET

How Do I Queue Java Threads?

Source Code Accompanies This Article. Download It Now.


Oct98: Java Q&A

Mike is the vice president of Accretive Technologies Inc., an Atlanta-based firm specializing in client/server and Internet development. He can be contacted at mike@accretive.com.


A recent project of mine required a multithreaded program that could manage multiple mainframe sessions simultaneously. The sessions were grouped by their application affinity -- in this case, by state (Ohio, Illinois, Michigan). Each grouping (state) could have multiple sessions. Requests that came in were tagged by state, so the process had to be able to route requests to the correct session. After the request was processed, it had to be sent back to the requester.

As luck would have it, the mainframe communication package we were using dictated we use Java -- which has an elegant threading model -- for playing scripts and collecting results from mainframe sessions. In this article, I'll present the multithreading approach I implemented. In the process, I'll also examine the differences between centralized- and distributed-queuing models. The complete code for the centralized- and distributed-queuing models (which has been compiled and tested under the JDK 1.0.2/ 1.1.2/1.1.3) is available electronically; see "Resource Center," page 3.

Threads in Java

Multithreaded support is built into Java. To utilize threads, you can either derive your class from Thread, or declare your class as supporting the interface Runnable. Either way will force you into implementing the run() method, which is where a thread's work is done.

Example 1 illustrates how to create and start a thread (assume that the class derived from Thread is MyThread). After Example 1 runs, the instance of MyThread pointed to by thdHandle will be executing in its own thread, in its derived run() method. As long as the run() method executes, the thread will live. The thread terminates when run() exits, or when an external event such as the controlling process invoking stop() or destroy() on the thread occurs, or when the Java virtual machine (JVM) exits.

The Queue

The Java Vector class provides a good queuing mechanism. Vector includes the firstElement() method, which returns the first element in the structure. Like a queue, this method also preserves order. Furthermore, Vector's insertion and retrieval methods have mechanisms to prevent simultaneous access by multiple threads.

For starters, look at firstElement()'s declaration in the Java documentation. Notice that the declaration contains the term synchronized, a Java keyword that indicates the method will be protected from simultaneous access by two or more threads. When a thread enters a synchronized method, other threads are kept out of synchronized methods of that instance of the class. When the thread exits the synchronized method, the next thread in line is given access. I'll encapsulate the queue functionality in a class called QueueObject, which has methods for inserting and removing items from the vector (see Listing One).

A Multithreaded Request Processor

For purposes of illustration, I'll use a primary thread to represent the single entry point for requests to enter the processor. This thread will start up "inspector" threads, which process the incoming requests. Here, they will merely place a response string into the request and return it.

The requests will be implemented as a class as well. The class RequestObject (see Listing Two) has methods that allow the response text to be set and retrieved, and methods to get and set a time value. This time value will be used to set an interval that the inspector thread should wait before returning the response. This simulates long running queries.

Processing proceeds thus: PrimaryThread (the main class for the application) creates the response queue by creating a new instance of QueueObject. It then creates an InspectorThread object for as many threads as desired (in the examples, two are created). As each InspectorThread is created, it is handed its ID number (sequential beginning with 1), name, and a reference to the response QueueObject. This lets PrimaryThread collect the responses from all threads for printing or reporting. Then, a loop is entered; RequestObjects are created and handed, alternately, to each thread. When the loop terminates, the response QueueObject is processed, and all items are collected. Finally, the threads are told to clean up and shut down, and the program exits. Figure 1 illustrates the system.

The First Solution

The most object-oriented approach seemed to be to use a distributed-queuing model, where each InspectorThread object manages its own queue. The PrimaryThread object would insert the incoming requests in a round-robin fashion into each InspectorThread's queue. This would ensure that the request load was distributed evenly. Also, there would be little danger of InspectorThread objects getting into each other's way.

This requires the primary thread to keep track of which InspectorThread object (assuming there's more than one) received the last request. The InspectorThread object exposes a method to hold references to each created InspectorThread object. This is accomplished by placing them in an array. This array of thread handles is also used for process control -- notably, when the program exits and the threads need to be shut down. Listings Three and Four present PrimaryThread and InspectorThread.

Efficient Waiting

What does an InspectorThread object do while it's waiting for requests to arrive?

The firstElement() method throws a NoSuchElementException if an attempt is made to get an object from an empty vector. The inspector threads can still be programmed to just hammer on their vector, checking for input, though that would consume unnecessary CPU cycles.

Luckily, the wait() and notify() methods provide a signaling mechanism among threads. A thread can call wait() (as part of a given object's context) and then efficiently yield its slice of the CPU for use by other threads. As soon as another thread calls notify() (relative to the same object the other thread is waiting on), the waiting thread will be released and is free to continue processing.

In this example, an InspectorThread object can call wait() just before trying to get to an item from the vector. Java requires you to handle the InterruptedException when calling the wait() method. When the PrimaryThread object receives an item for processing, it places it onto the InspectorThread's request vector via submitRequest(). In submitRequest(), notify() gets called, releasing the InspectorThread to retrieve the item from the vector and process it.

At this point, it may seem that a deadlock situation should arise. After all, in the InspectorThread class, when the code calls the GetQueueItem() method (which is synchronized) and hangs on the wait() method, how can another thread get into that instance of the QueueObject's AddQueueItem() method to insert an item and call the notify() method if the AddQueueItem method is also synchronized? It seems that the code in PrimaryThread should hang when it tries to insert an item into the thread's queue. This is where Java's built-in multithreading really shines. When wait() is encountered, the lock monitor for that thread actually unlocks, allowing other threads to get in and set variables or, in this case, call the notify() method, which releases the waiting thread to process the queue.

Still, you have to be careful when using wait() and notify(). It is possible for these calls to "miss" one another, due to timing considerations, and you can still arrive in a deadlock situation. (For a more thorough treatment of deadlock, see "Java Deadlock," by Allan Vermeulen, DDJ, September 1997; available electronically, see "Resource Center," page 3.) For example, assume that there is a process similar to the sample code, where a primary thread object starts one inspector thread object. In my example, I assume also that the primary thread is receiving requests from an external source, not in a loop as in the sample code. While the inspector thread is processing a request, another arrives at the primary thread, and it gets inserted into the inspector thread's request queue. If precautions are not taken, when the primary thread places the request and calls notify(), there will be no monitor listening for it. The primary thread goes back to waiting for more requests. Meanwhile, the inspector finishes its long process, sends back the result, and tries to call wait() to be signaled to handle the next request. It will wait forever, or until yet another request arrives, in which case the inspector thread can start to back up -- it really has two items in the queue, but it will only pick up the first one. In other words, calls to notify() are not queued up and answered when the other thread calls wait().

One way to approach this problem is to use a "clean-after" approach: When the queue is checked, keep reading and processing until the queue is empty. This method has the probable effect of one thread "hogging" the request queue and doing more than its fair share of work.

The second approach -- the one I used -- was to use a counter to indicate if items were present on the queue. In the GetQueueItem() method of the RequestObject class (Listing Two), you can see a check of the object variable itemcount. The code only descends into the try/catch block for the wait() call if itemcount equals zero. As requests are added to the queue (in the AddQueueItem() method), itemcount is incremented. This forces the thread to read the queue directly if items are present. After a request is processed, the itemcount variable is decremented. This ensures that the wait() method will be called only when no requests are queued.

You must also be sure that deadlock is not encountered when the system is shut down. Near the end of the PrimaryThread code is a loop that calls the shutdown() method of each of the InspectorThread objects that were started. Notice in the shutdown() method, the call to the BumpQueue method (which is only a call to notify()). Remember that the run method of the InspectorThread object will call the wait() method only if there are no outstanding requests in the queue. That's exactly the situation -- since the code is about to exit, no more requests will be pending. So a call to notify() (in the BumpQueue method) is performed to knock the run() method off the wait call. Since the shutdown() method also sets the running object variable to False (when processing goes to the top of the loop), the code will drop out and exit the run() method, terminating the thread.

Lessons Learned

After running the code, I noticed that the throughput of the overall system was sometimes poor. I examined the distributed-queue mechanism as well as the queries I was submitting (most notably, how long each query took). I found that if two long requests were handed to the same InspectorThread object, there were instances where that InspectorThread object was working, while other InspectorThread objects were idle, waiting for new requests.

I'm sure that there are many books that cover queuing theory, so I won't discuss that subject here. My current architecture would handle requests that took roughly the same amount of time. However, I had a situation where request processing times varied from short to long. Consequently, I reworked the PrimaryThread object (available electronically). Since I alternate between threads, it's easy to show that one thread can get overloaded, while the other sits idle. I needed a more flexible way to handle requests that could vary in processing time.

Consequently, I decided to rework the code and set up the system to use just one queue for incoming requests. The queue would be created and held in the PrimaryThread object, and references to the queue would be passed into the InspectorThread objects on the constructor. This required little code modification in the InspectorThread object, and (to my surprise) let me remove some code in the PrimaryThread object.

The principal change in the InspectorThread object was to have the code in the run() method call the GetQueueItem() method on the passed-in QueueObject reference. Since the method is declared as synchronized, only one thread at a time will be allowed in to get requests. The new InspectorThread object is also available electronically.

The code for the PrimaryThread object actually got simpler. There was no need to track the thread that last received a request. The generated RequestObjects are simply loaded into the QueueObject used for requests, and the InspectorThread objects will wrestle with one another to get requests from the queue. The new code for the PrimaryThread object is available electronically.

The revised processing flow using a centralized request queue is as follows: The PrimaryThread object creates two QueueObjects -- a request queue and a response queue. Then, it creates the two InspectorThread objects, passing the ID number of the thread, its name, and a reference to both the request and response QueueObjects. The threads are started, and the main loop is entered, creating all the RequestObjects and placing them into the request QueueObject. The InspectorThreads call the GetQueueItem() method on the request QueueObject, getting RequestObjects as fast as they can process them. As they complete the requests, they place the responses onto the response QueueObject. When all the responses have been collected, the PrimaryThread object orders the InspectorThread objects to shut down, and the program exits; see Figure 2.

To illustrate the differences in how the centralized queue handles requests of varying process times, Listing Five shows a scenario with one long request and four short requests. When the program runs, the first thread will grab the long request, and that will be the only one it processes, as the other thread will go through the rest of the requests before the first one finishes.

Conclusion

By making the InspectorThread object responsible for reading from the centralized queue, I eliminated the management of the requests that was required by the PrimaryThread object in the original solution. This became an issue in the project I was developing, as the program also had to have the capability to dynamically spawn threads during its life. The centralized queue approach made this much simpler.

Most importantly, more requests are handled in a timely fashion, since threads are not waiting idly for requests while other threads are overloaded. In a real-world situation, this allows for more responsiveness to an external process that is supplying the requests.

A logical extension of the model I've presented here would be a multithreaded input request handler -- when an external request arrives, it would spawn a thread that is responsible for sending the request to the queue and waiting for the response. In such a model, rather than passing a reference to a central QueueObject, the newly spawned thread would pass a reference to its own QueueObject, in which the InspectorThread would place the response. This would handle the requirement of maintaining correct request context in the case of multiple senders.

DDJ

Listing One

import java.util.*;/* QueueObject - general class that implements a queue mechanism using a Java 
 * Vector. Also implements signaling in GetQueueItem and SetQueueItem methods
 * Author:  Mike Criscolo
 */
public class QueueObject {
    protected Vector    queue;
    protected int       itemcount;
    protected String    queueName;
    public QueueObject(String name) {
        queue = new Vector();
        queueName = name;
        itemcount = 0;
    }
    // Get an item from the vector.  Wait if no items available
    public synchronized Object GetQueueItem() {
        Object   item = null;
        // If no items available, drop into wait() call
        if (itemcount == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
               System.out.println(queueName + ": Hey! Somebody woke me up!");
            }
        }
        // Get first item from vector, remove it and decrement item count.
        item = (Object)queue.firstElement();
        queue.removeElement(item);
        itemcount--;
        // Send it back
        return item;
    }
    // Place an item onto vector. Signal threads that an item is available.
    public synchronized void AddQueueItem(Object o) {
        System.out.println(queueName + ": Adding item to queue...");
        itemcount++;
        queue.addElement(o);
        notify();
    }
    // Handy place to put a separate notify call - used during shutdown.
    public synchronized void BumpQueue() {
        notify();
    }
}

Back to Article

Listing Two

/* RequestObject - class used to send requests between threads. * Author:  Mike Criscolo
 */
public class RequestObject {
    String  beginText;
    int     iSecs;
    String  responseText;


</p>
    public RequestObject() {
        beginText = "";
        responseText = "";
        iSecs = 0;
    }
    public RequestObject(String bt, int time) {
        beginText = bt;
        responseText = "";
        iSecs = time;
    }
    public void SetResponseText(String rt) {
        responseText = rt;
    }
    // Return the delay time (in seconds)
    public int GetDelayTime() {
        return iSecs;
    }
    public void Dump() {
        System.out.println("Begin: "+beginText+"  Response:" + responseText);
    }
}

Back to Article

Listing Three

/* Implement a Distributed queue solution for processing requests * with multiple threads
 * Author: Mike Criscolo
 */
public class PrimaryThread extends Thread {


</p>
    private int totThreads;
    public PrimaryThread() {
        super("PrimaryThread");
        // We'll do 2 threads
        totThreads = 2;
    }
    public void run() {
        int i;
        InspectorThread  threadArray[];
        RequestObject   reqOb;
        RequestObject   rspOb;
        int curThread = 0;
        // Create the response queue
        QueueObject rspQ = new QueueObject("Response Queue");
        threadArray = new InspectorThread[totThreads];
        // Crank the threads
        System.out.println("Cranking threads...");
        for (i = 0; i < totThreads; i++) {
            threadArray[i] = 
               new InspectorThread("User Thread " + (i + 1), (i + 1), rspQ);
            threadArray[i].start();
        }
        // Give the threads a chance to crank
        try {
            sleep(2000);
        } catch (InterruptedException e) {
            System.out.println("Bumped off the sleep() call!");
        }
        // Create some RequestObjects and hand them to the 
        // InspectorThread objects...
        for (i = 0; i < 5; i++) {
            reqOb = new RequestObject("This is item " + (i + 1), 2);
            threadArray[curThread].submitRequest(reqOb);
            curThread++;
            if (curThread == totThreads) {
                curThread = 0;
            }
        }
        // Now loop, checking the response queue for all responses
        while (i > 0) {
            rspOb = (RequestObject)rspQ.GetQueueItem();
            rspOb.Dump();
            i--;
        }
        // Kill all threads
        for (i=0; i<totThreads; i++) {
            threadArray[i].shutdown();
        }
    }
    public static void main(String argv[]) {
        PrimaryThread  pt;
        pt = new PrimaryThread();
        pt.start();
    }
}

Back to Article

Listing Four

import java.util.*;/* InspectorThread - class that encapsulates the processing for
 * a thread that inspects request items.
 * Author:  Mike Criscolo
 */
public class InspectorThread extends Thread {
    protected int            threadId;
    protected boolean        running;
    protected QueueObject    requestQ;
    protected QueueObject    responseQ;


</p>
    public InspectorThread(String name, int id, QueueObject response) {
        // Call the superclass
        super(name);
        // Store the thread id and response QueueObject
        threadId = id;
        responseQ = response;
        // Create the request QueueObject for the thread to use
        requestQ = new QueueObject("Thread " + threadId + " Queue");
        running = false;
    }
    // Set the running boolean to false and bump the main loop off the wait.
    public void shutdown() {
        running = false;
        requestQ.BumpQueue();
    }
    // Main loop of the thread.
    public void run() {
        RequestObject thing;
        // Set running to true
        running = true;
        // While the thread is supposed to be running...
        while (running) {
         try {
          System.out.println("Thread "+threadId+": waiting for items...");
                // Call the QueueObject's method to get a request
                thing = (RequestObject)requestQ.GetQueueItem();
                try {
                    // Sleep for time (convert to milliseconds)
                    sleep(thing.GetDelayTime()*1000);
                } catch (InterruptedException intExp) {
                    // No problem - try again
                }
                // Put some text in the request to show we handled it
                thing.SetResponseText("Inspected by #" + threadId);
                // Send it back
                responseQ.AddQueueItem(thing);
            } catch (NoSuchElementException e) {
                // Oh well, try again
            }
        }
        System.out.println("Thread " + threadId + " exiting!");
    }
    // Add a RequestObject to the thread's queue
    public void submitRequest(Object request) {
        requestQ.AddQueueItem(request);
    }
}

Back to Article

Listing Five

/* Implement a Centralized queue solution for processing requests with  * multiple threads, with 1 long query and 4 short ones, to illustrate 
 * increased throughput
 * Author:  Mike Criscolo
 */
public class PrimaryThread extends Thread {
    private int totThreads;
    public PrimaryThread() {
        super("PrimaryThread");
        // We'll do 2 threads
        totThreads = 2;
    }
    public void run() {
        int i;


</p>
        InspectorThread  threadArray[];
        RequestObject   reqOb;
        RequestObject   rspOb;
        // Create the request and response queues
        QueueObject reqQ = new QueueObject("Request Queue");
        QueueObject rspQ = new QueueObject("Response Queue");
        threadArray = new InspectorThread[totThreads];
        // Crank the threads
        System.out.println("Cranking threads...");
        for (i = 0; i < totThreads; i++) {
            threadArray[i] = new InspectorThread("User Thread " + 
                   (i + 1), (i + 1), reqQ, rspQ);
            threadArray[i].start();
        }
        // Give the threads a chance to crank
        try {
            sleep(2000);
        } catch (InterruptedException e) {
            System.out.println("Bumped off the sleep() call!");
        }
        // Create some RequestObjects and jam them into the request queue
        for (i = 0; i < 5; i++) {
            // Last parm on next line puts a long request in first,
            // followed by short requests
            reqOb = new RequestObject("This is item "+(i+1),((i==0)?10:2));
            reqQ.AddQueueItem(reqOb);
        }
        // Now loop, checking the response queue for all responses
        while (i > 0) {
            rspOb = (RequestObject)rspQ.GetQueueItem();
            rspOb.Dump();
            i--;
        }
        // Kill all threads
        for (i=0; i<totThreads; i++) {
            threadArray[i].shutdown();
        }
    }
    public static void main(String argv[]) {
        PrimaryThread  pt;
        pt = new PrimaryThread();
        pt.start();
    }
}

Back to Article


Copyright © 1998, Dr. Dobb's Journal

Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video