Channels ▼
RSS

JVM Languages

Multithreading, Java, & OSGi

Source Code Accompanies This Article. Download It Now.


Problem Solving

The application I was prototyping needed several threads. I started off by tracking them in an ad hoc manner: Each time I needed a new thread, I created a member field to keep a reference to it and added code to my stop() method to interrupt it, then waited for it via Thread.join().

At first I had only two long-running threads, so this method worked well. Soon, however, I realized I'd need to keep track of a worker thread that was sometimes created and ran only for a short period of time, and a set of threads—the size of which would vary over time—processing I/O. It quickly became apparent just looking at my stop() method implementation that a more structured approach was needed. The code actually might have been correct, but it was too complicated to read and be accurate—a sure sign of trouble.

Fortunately, since I'd started in an ad hoc fashion, I hadn't yet invented any fancy thread-control patterns to pollute my thinking. Instead, I took a look at what worked for the threads I was managing and tried to look for patterns. If I could find a pattern, I could probably factor out some kind of supporting classes to simplify things.

It turns out that the basic pattern for handling threads properly in OSGi has three parts:

  • Keep track of each thread when you start it.
  • Know how to interrupt each thread.
  • Wait for each thread to exit when stopping.

Java has Thread.start() for the first item and Thread.join() for the last, but has little to say about how to interrupt a thread. If you're thinking that Thread.interrupt() is the answer, you need to realize that this method only interrupts threads waiting on monitors. It will do nothing for a thread blocked on, say, reading from an InputStream.

I didn't need a concurrency revolution to get this right; I just needed the right decomposition of the problem. The design I came up with centers on a ThreadManager object for handling a set of threads; for instance, as might be created by a particular bundle. It also consists of an abstract class, Interruptible, which extends the standard Java Runnable interface with an interrupt() method. The interrupt() method has to know with certainty how to interrupt each thread.

ThreadManager provides two basic operations: run( Interruptible ) and stopAll() (see Listing One). The former schedules the Interruptible on a new thread. The latter calls interrupt() on every Interruptible and then join() on every thread. StopAll() doesn't return until every Interruptible passed to run() has exited.

import java.util.*;

/** ThreadManager provides facilities for starting a set of threads and
 * tracking them through completion. The threads must implement the
 * Interruptible interface, which is an extension of Runnable.
 * Typical use is:
 *   ThreadManager tm = new ThreadManager();
 *   // Invoke run any number of times
 *   tm.run( ... );
 *   // Interrupt all threads and wait for their exit
 *   tm.stopAll();
 * No guarantees are made that Interruptible.run() is invoked before
 * Interruptible.interrupt(). They may be called in the other order when, 
 * for example, tm.stopAll() is called in close time proximity to tm.run() 
 * or when the stopping thread has higher priority. However, it is guaranteed
 * that if a call to tm.run() for some Interruptible returns normally then
 * a call to tm.stopAll() will result in a call to Interruptible.interrupt() 
 * for that Interruptible. The implementation of Interruptible must handle 
 * run() and interrupt() being called in either order.
 * @see ThreadManager.Interruptible
 */

class ThreadManager {
    /** An abstract base class for use with ThreadManager. It implements
     * runnable, and derived class should implement the run() method. It also
     * defines a method called interrupt() that derived class must implement
     * such that, when called, it will cause the run() method to return.
     */

    abstract static class Interruptible implements Runnable {
        // The thread on which run() is invoked. This is set by ThreadManager.
        private Thread runThread;
        /** Must cause run() method to return immediately, i.e, by closing
         * socket on which run() might be blocked or calling 
         * Thread.interrupt(). If run() has not yet been called, this method 
         * must insure that subsequent calls to run() will also return 
         * immediately. Must be callable from any thread.
         * Note: No guarantee that run() is actually running when this
         * method is called; it may be called before, during, or after run()
         * method. Care must be taken to insure it works properly in all cases.
         */

        public abstract void interrupt();

        /** Derived classes may use this method to obtain the thread on which
         * the run() method is executing; this is useful if the interrupt()
         * method needs to invoke thread.interrupt(). Note that this method
         * will block until that thread name is available.
         */
        protected Thread getRunThread() throws InterruptedException {
            synchronized( this ) {
                while( runThread == null ) wait();
            }
            return runThread;
        }
    }
   /* ThreadManager schedules runnables of this type, not of Interruptible. */
    private class ManagedRunnable implements Runnable {
        private Interruptible target;
        ManagedRunnable( Interruptible target ) {
            this.target = target;
        }
        public void run() {
            try {
                synchronized( target ) {
                    target.runThread = Thread.currentThread();
                    target.notifyAll();
                }
                target.run();
            } finally {
                running.remove( this );
            }
        }
        public void stop() throws InterruptedException {
            target.interrupt();
            target.getRunThread().join();
        }
    }
    /** Creates and starts a new thread for the given Interruptible. */
    public void run( final Interruptible target, String name ) {
        run( target, name, 0 );
    }
    /** Creates and starts a new thread for the given Interruptible.
     * The thread priority is adjusted by the relative value given
     * by priorityBoost.
     */
    public void run( final Interruptible target, String name, 
                                                    int priorityBoost ) {
        // We wrap the given Interruptible with a new one that sets the
        // current thread when run starts and removes this thread from
        // the manager when it stops.

        ManagedRunnable mr = new ManagedRunnable( target );

        // Hold a lock on isStopping when we add to running to be sure 
        // stopAll() doesn't get a bad copy of the running list.

        synchronized( isStopping ) {
            if( isStopping.booleanValue()) throw new 
                         IllegalStateException( "stopAll() has been called" );
            running.add( mr );
        }
        Thread t = new Thread( mr, name );
        t.setPriority( t.getPriority() + priorityBoost );
        t.start();
    }
    /** Stops all threads registered with this manager. Does
     * not return until all threads have exited.
     */
    public void stopAll() throws InterruptedException {
        // Set isStopping to block any subsequent calls to run() from
        // scheduling new things.

        synchronized( isStopping ) {
            isStopping = new Boolean( true );
        }
        // Holding a lock on 'running' prevents threads from exiting, as well
        // as starting, so we can't call stop() while we have the lock. Make
        // a copy to avoid this.

        List toNotify = null;
        synchronized( running ) {
            toNotify = new ArrayList( running );
        }
        Iterator i = toNotify.iterator();
        while( i.hasNext()) {
            ManagedRunnable mr = (ManagedRunnable) i.next();
            mr.stop();
        }
    }
    private List running = Collections.synchronizedList( new ArrayList());
    private Boolean isStopping = new Boolean( false );
}
Listing One

This reduces the code required to properly manage threads within a bundle to just a couple of lines. A call to ThreadManager.run() replaces every call to create, start, and keep a reference to every new thread. A single call to stopAll() guarantees that each is shut down.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video