Multithreading, Java, & OSGi

The ThreadManager class Oliver presents here goes a long way toward easing multithreaded programming with Java and the OSGi platform.


August 02, 2006
URL:http://www.drdobbs.com/java/jvm/multithreading-java-osgi/191601642

Oliver is a software architect at Adobe Systems. He can be reached at [email protected].


The OSGi Alliance Service Platform (http://www.osgi.org/) is a Java-based environment that is multithreaded from the ground up. The OSGi framework consists of libraries and services designed to execute code from various sources. You can also use the Eclipse OSGI runtime as a standalone framework because it is part of the Eclipse Equinox project, the foundation of the Eclipse Rich Client Platform. However, multithreading in OSGi isn't just about improving performance or taking advantage of chip multiprocessors—it's essential to getting something done. To illustrate, in this article I examine issues that can arise when developing applications that must simultaneously manage UI, computation, and network activity, somewhat akin to what iTunes does. In the process, I discuss solutions I developed in terms of a specific class, ThreadManager. The point of ThreadManager isn't necessarily that you should use it—although that would be fine, too—but that it seems a relatively modest amount of reusable code can go a long way toward easing multithreaded programming.

The OSGi unit of composition is a "bundle." Physically, a bundle is a JAR file, but it is not loaded and processed as typical JAR files are. Each bundle is by default isolated from all other bundles; that is, they cannot invoke each other or even share class declarations—even if those declarations are "public." A bundle exports a package it contains via an export declaration in the JAR manifest. Similarly, a bundle declares which packages it requires via an import declaration. OSGi examines these declarations when bundles are loaded to make sure each import is satisfied, exports do not conflict, and so on. This is all managed via a set of delegating classloaders.

Bundles can be started, stopped, loaded, and unloaded all while OSGi is running. This is important for embedded devices that are "always on"—rebooting during "Desperate Housewives" to apply the latest patches just won't do. The OSGi framework tracks dependencies between bundles—for class definitions and services—to coordinate these operations.

Of course, set-top boxes aren't really always on; they tend to get unceremoniously unplugged when being moved or when extension cords get tripped over. Consequently, the state of an OSGi device is generally persistent. When an OSGi device starts, bundles are restored to their previous state, running or stopped. Operations such as installing a bundle are transactional for this same reason.

OSGi has been carefully defined, as an embedded environment, to run on top of reasonably sized J2ME profiles. However, it can also be applied to full J2SE environments. The OSGi bundle model is even being adopted as the basis for Eclipse plug-in management.

While thinking of OSGi as an operating system for running Java programs is a good place to start, it has an important difference from operating systems—OSGi does not provide a thread of execution to each bundle. Events are delivered to bundles through certain interfaces, but no guarantees are made about which thread event delivery occurs on. Generally, event callbacks are required to finish quickly and should not make calls back into the OSGi framework to avoid possible deadlock. It is common, therefore, for bundles to start one or more threads in order to get work done.

Requirements

The prototype application is designed to perform several tasks in parallel—processing events received via the UI, performing computations required (say, to playback or compress audio), and sending and receiving data over the network. This is a wonderful application for a parallel machine because, well, these operations really can be performed in parallel.

Java supports this kind of parallelism via its threading mechanisms, and so its APIs for most operations, such as IO, are blocking. This isn't the only possible approach; many traditional applications use an event-driven model that can multiplex events from multiple sources, including UI and nonblocking IO facilities. However, while an event loop can give the illusion of doing several things at once, it is really only one event at a time that's being processed. That didn't matter much when a CPU could only do one thing at a time, but Java's approach is better suited to increasing hardware parallelism.

The essential threading requirement imposed by OSGi is that applications keep track of all of these threads operating in parallel. This arises from OSGi's dynamic nature; OSGi wants to be able to start, stop, load, and unload bundles as necessary or as directed. That implies that a bundle must respond to a command to stop, and stopping must be complete: There cannot be threads that are forgotten still tying up resources the bundle was supposed to have released.

This OSGi design arises from its target set-top environment where bundles from various sources—including possibly different vendors—need to cooperate. When building a desktop application on OSGi (possibly transitively, by building first on Eclipse), this level of dynamism might seem excessive. Even a desktop application, however, might be composed of different bundles. It might support plug-ins built by third parties, and any of these might need to be updated. OSGi's dynamism might be essential in embedded devices but it certainly isn't useless on the desktop.

The Challenge

I quickly discovered that coordinating the lifetimes of even a small number of threads (read: two) was easy to get wrong. There are the usual issues with state shared between threads; those can be solved with appropriate synchronization mechanisms. Stopping the threads turns out to be the tricky part.

To make things concrete, an OSGi bundle can implement an interface called BundleActivator, which contains just two methods: start() and stop(). OSGi calls these methods when the bundle is started or stopped. The general rules for implementing these methods are that they must return quickly, they can be invoked on any thread, and they should avoid doing things that could lead to deadlock. For example, a start() method should not attempt to use the OSGi facility for installing a new bundle because that is a long-running operation. Furthermore, because the level of parallelism in the OSGi implementation is not guaranteed, the installation call might block waiting for the start method to return; because install was called from start, deadlock arises.

The bundle is allowed (assuming security permissions are granted) to spawn new threads. However, the stop() method must guarantee that all of these threads have exited before returning. That's the root of the coordination problem.

What can happen if you get this wrong? Just about anything. The OSGi implementation I was working on was throwing ClassNotFoundException exceptions from classes I'd written when they attempted to access other classes I'd written in the same bundle—which meant it couldn't be a classpath problem. That baffled me until I realized the exception was thrown from a thread in my bundle, which was still running after stop() had returned. The OSGi framework, thinking the bundle was stopped, had disconnected the class loaders for that bundle, causing a later request to load classes to fail. (Remember, class loading in Java is lazy.)

Problem Solving

The application I was prototyping needed several threads. I started off by tracking them in an ad hoc manner: Each time I needed a new thread, I created a member field to keep a reference to it and added code to my stop() method to interrupt it, then waited for it via Thread.join().

At first I had only two long-running threads, so this method worked well. Soon, however, I realized I'd need to keep track of a worker thread that was sometimes created and ran only for a short period of time, and a set of threads—the size of which would vary over time—processing I/O. It quickly became apparent just looking at my stop() method implementation that a more structured approach was needed. The code actually might have been correct, but it was too complicated to read and be accurate—a sure sign of trouble.

Fortunately, since I'd started in an ad hoc fashion, I hadn't yet invented any fancy thread-control patterns to pollute my thinking. Instead, I took a look at what worked for the threads I was managing and tried to look for patterns. If I could find a pattern, I could probably factor out some kind of supporting classes to simplify things.

It turns out that the basic pattern for handling threads properly in OSGi has three parts:

Java has Thread.start() for the first item and Thread.join() for the last, but has little to say about how to interrupt a thread. If you're thinking that Thread.interrupt() is the answer, you need to realize that this method only interrupts threads waiting on monitors. It will do nothing for a thread blocked on, say, reading from an InputStream.

I didn't need a concurrency revolution to get this right; I just needed the right decomposition of the problem. The design I came up with centers on a ThreadManager object for handling a set of threads; for instance, as might be created by a particular bundle. It also consists of an abstract class, Interruptible, which extends the standard Java Runnable interface with an interrupt() method. The interrupt() method has to know with certainty how to interrupt each thread.

ThreadManager provides two basic operations: run( Interruptible ) and stopAll() (see Listing One). The former schedules the Interruptible on a new thread. The latter calls interrupt() on every Interruptible and then join() on every thread. StopAll() doesn't return until every Interruptible passed to run() has exited.

import java.util.*;

/** ThreadManager provides facilities for starting a set of threads and
 * tracking them through completion. The threads must implement the
 * Interruptible interface, which is an extension of Runnable.
 * Typical use is:
 *   ThreadManager tm = new ThreadManager();
 *   // Invoke run any number of times
 *   tm.run( ... );
 *   // Interrupt all threads and wait for their exit
 *   tm.stopAll();
 * No guarantees are made that Interruptible.run() is invoked before
 * Interruptible.interrupt(). They may be called in the other order when, 
 * for example, tm.stopAll() is called in close time proximity to tm.run() 
 * or when the stopping thread has higher priority. However, it is guaranteed
 * that if a call to tm.run() for some Interruptible returns normally then
 * a call to tm.stopAll() will result in a call to Interruptible.interrupt() 
 * for that Interruptible. The implementation of Interruptible must handle 
 * run() and interrupt() being called in either order.
 * @see ThreadManager.Interruptible
 */

class ThreadManager {
    /** An abstract base class for use with ThreadManager. It implements
     * runnable, and derived class should implement the run() method. It also
     * defines a method called interrupt() that derived class must implement
     * such that, when called, it will cause the run() method to return.
     */

    abstract static class Interruptible implements Runnable {
        // The thread on which run() is invoked. This is set by ThreadManager.
        private Thread runThread;
        /** Must cause run() method to return immediately, i.e, by closing
         * socket on which run() might be blocked or calling 
         * Thread.interrupt(). If run() has not yet been called, this method 
         * must insure that subsequent calls to run() will also return 
         * immediately. Must be callable from any thread.
         * Note: No guarantee that run() is actually running when this
         * method is called; it may be called before, during, or after run()
         * method. Care must be taken to insure it works properly in all cases.
         */

        public abstract void interrupt();

        /** Derived classes may use this method to obtain the thread on which
         * the run() method is executing; this is useful if the interrupt()
         * method needs to invoke thread.interrupt(). Note that this method
         * will block until that thread name is available.
         */
        protected Thread getRunThread() throws InterruptedException {
            synchronized( this ) {
                while( runThread == null ) wait();
            }
            return runThread;
        }
    }
   /* ThreadManager schedules runnables of this type, not of Interruptible. */
    private class ManagedRunnable implements Runnable {
        private Interruptible target;
        ManagedRunnable( Interruptible target ) {
            this.target = target;
        }
        public void run() {
            try {
                synchronized( target ) {
                    target.runThread = Thread.currentThread();
                    target.notifyAll();
                }
                target.run();
            } finally {
                running.remove( this );
            }
        }
        public void stop() throws InterruptedException {
            target.interrupt();
            target.getRunThread().join();
        }
    }
    /** Creates and starts a new thread for the given Interruptible. */
    public void run( final Interruptible target, String name ) {
        run( target, name, 0 );
    }
    /** Creates and starts a new thread for the given Interruptible.
     * The thread priority is adjusted by the relative value given
     * by priorityBoost.
     */
    public void run( final Interruptible target, String name, 
                                                    int priorityBoost ) {
        // We wrap the given Interruptible with a new one that sets the
        // current thread when run starts and removes this thread from
        // the manager when it stops.

        ManagedRunnable mr = new ManagedRunnable( target );

        // Hold a lock on isStopping when we add to running to be sure 
        // stopAll() doesn't get a bad copy of the running list.

        synchronized( isStopping ) {
            if( isStopping.booleanValue()) throw new 
                         IllegalStateException( "stopAll() has been called" );
            running.add( mr );
        }
        Thread t = new Thread( mr, name );
        t.setPriority( t.getPriority() + priorityBoost );
        t.start();
    }
    /** Stops all threads registered with this manager. Does
     * not return until all threads have exited.
     */
    public void stopAll() throws InterruptedException {
        // Set isStopping to block any subsequent calls to run() from
        // scheduling new things.

        synchronized( isStopping ) {
            isStopping = new Boolean( true );
        }
        // Holding a lock on 'running' prevents threads from exiting, as well
        // as starting, so we can't call stop() while we have the lock. Make
        // a copy to avoid this.

        List toNotify = null;
        synchronized( running ) {
            toNotify = new ArrayList( running );
        }
        Iterator i = toNotify.iterator();
        while( i.hasNext()) {
            ManagedRunnable mr = (ManagedRunnable) i.next();
            mr.stop();
        }
    }
    private List running = Collections.synchronizedList( new ArrayList());
    private Boolean isStopping = new Boolean( false );
}
Listing One

This reduces the code required to properly manage threads within a bundle to just a couple of lines. A call to ThreadManager.run() replaces every call to create, start, and keep a reference to every new thread. A single call to stopAll() guarantees that each is shut down.

ThreadManager Design Points

I found many subtle and interesting design points in writing ThreadManager, and probably not all of them.

For instance, the major service provided by the ThreadManager is keeping track of each thread it starts so that they can all be stopped at some later time. This requires nothing more than keeping an accurate list of interruptible objects running on various threads. The list part is easy; the accurate part takes a bit of effort.

Each Interruptible is added to the list of running objects before its assigned Thread is started. This guarantees that once ThreadManager.run() is called for some Interruptible, both run() and interrupt() are called on it. It also makes it possible for interrupt() to be called first.

The second half of this problem is removing each Interruptible from the list when it exits, which happens when the run() method returns. This is handled by scheduling not the Interruptible itself but a private class called ManagedRunnable. ManagedRunnable has a run() method that wraps the call to Interruptible.run() in a try/finally block. The finally clause is invoked when run() returns—whether normally or with an exception—and removes the Interruptible from the list.

The kind of interrupt required varies, depending on what the code in the thread is doing. The Thread.interrupt() method isn't always appropriate. For example, while it will interrupt a thread waiting on Object.wait(), it won't interrupt a thread waiting on ServerSocket.accept(). The interrupt() method implementation—which is always called from a thread other than the one on which run() is called—must take appropriate action to interrupt the thread, whatever that might be. My prototype called for three different strategies.

My prototype used SWT for the UI. The application bundle launched a thread on which a loop polled and processed the SWT event queue. The interrupt method for this thread made use of the SWT Display.asyncExec() call to post a "quit" event back to the UI thread, causing that loop to exit. Swing provides similar facilities. Worst case, you could implement your own mechanism for passing the request between threads and check it in the event loop. (In that case, however, you might need a timeout on the call to poll for an event.)

Threads waiting on I/O are even easier; typically, simply closing the stream or socket on which they are waiting is sufficient. The close will cause the read(), accept(), or other call to return with an error. The thread performing the I/O simply needs to exit when this error occurs.

Worker threads performing computation can be the trickiest because there is not a good mechanism for interrupting CPU-bound work from another thread. The only option here is to have the thread periodically check to see if an interrupt bit has been set. It shouldn't do that too often or the overhead will take a toll, but it must do it frequently enough to keep the interrupt responsive.

Multiple Blockers

Interrupting a thread can require more than one technique if a thread can be blocked in multiple places. For example, some threads might sometimes be waiting on a monitor and other times on a socket. The interrupt() method must then interrupt both. The run() method must be written to ensure that interrupts cause the method to exit and not block again.

The hardest part of writing an interrupt can be inspecting all the code invoked from the thread's run() method and determining whether it can block and, if so, how to interrupt it. This is particularly hard if that method uses third-party code or libraries that don't clearly document their behavior or dependencies. Although I did not encounter this, it is easy to imagine library calls that are written so as to be essentially uninterruptible. Indeed, adding this kind of thread-specific behavioral information to signatures hints at potentially big requirement changes for those writing reusable code.

Interrupt Before Run

Again, an Interruptible scheduled to run via ThreadManager is guaranteed to have both its run() and interrupt() methods called, but not necessarily in that order. It turns out this is often easier to code correctly than you might think. For example, if a socket connection is open before run() is called, then interrupt() can safely close it; the run() method will find it closed when it is invoked and immediately exit.

Other times, however, interrupt() will have to set a special flag to let run() know that interrupt() has already been called and to avoid proceeding to any blocking code. The first time I got this wrong (I didn't have the flag logic), I briefly considered making a guarantee that run() was always invoked first.

However, what kind of guarantee could I really make? I could have interrupt() block until the runThread had started, but that wasn't sufficient. The thread might start but yield before the first instruction in the run() method, so interrupt() could still be called earlier than expected. The only way to guarantee that run() had accomplished any particular amount of work before interrupt() was called is with synchronization code (wait and notify, for example) inside cooperating run() and interrupt() methods. You can write such code if appropriate, but ThreadManager doesn't have this kind of knowledge and won't do it for you.

Interrupt After Run

It is also possible for interrupt() to be called after run() has exited. ThreadManager.stopAll() could check to see if run() has exited before calling interrupt(), but by the time the call to interrupt() actually happens, run() may have exited. In practice, most interrupt() methods perform idempotent actions, such as closing streams. Usually, no special code is required to properly handle this case.

Interrupting the Right Thread

If your interrupt() method does use Thread.interrupt(), you'll need a handle on the thread running the run() method. This is another case where synchronization between run() and interrupt() is necessary—you can't interrupt a thread before it has been created. Here, however, ThreadManager really can help you out.

Interruptible defines a method called getRunThread() that returns the thread on which run() has been (or will be) invoked. This is why Interruptible is an abstract class, and not an interface.

The implementation depends on the private field runThread being assigned exactly once when the thread is created. Callers to getRunThread() block until the assignment occurs. ThreadManager arranges to perform this assignment but the field should not otherwise be visible, even to derived classes implementing Interruptible. This is why Interruptible is a nested class within ThreadManager.

To set the runThread field, ThreadManager leverages the nested ManagedRunnable class, mentioned earlier. ManagedRunnable has its own run() method that sets runThread and calls notifyAll() before invoking Interruptible.run().

Interrupt Me Before I Run Again

Working with multiple threads is, in a way, like writing transactional database code—you need to think carefully about which operations have to be atomic to preserve the system, yet being too conservative in this regard can make your code inefficient. This shouldn't come as a surprise, as a database is typically handling multiple requests from multiple clients in parallel. If you look at ThreadManager as a miniscule database, the same conditions apply: run() and stopAll() can be called in parallel. What guarantees should be made?

In my prototype application, one worker thread is spawned periodically based on external events, runs for a few hundred milliseconds, and then exits. (Because it takes little time to complete and doesn't block, I even used a null interrupt() method.) Creation of this thread was independent of when my bundle might be stopped; in fact, the two always occurred on different threads. The guarantees wanted as a client of ThreadManager are:

Together, these rules guarantee the necessary transactionality, and thus synchronization, between creating and stopping threads using the same ThreadManager instance. Without them, race conditions could permit new threads created at the same time as stopAll() to continue running after stopAll() returns.

These guarantees are implemented with a Boolean object in ThreadManager, isStopping, that tracks whether a call to stopAll() has started. Once set, subsequent calls to run() fail. It is important that synchronization is performed on this variable and that run() and stopAll() are not themselves synchronized. If they were, calls to run() would block while stopAll() was executing. Because stopAll() could be stopping a thread calling run(), deadlock could result.

Interrupted Exceptions

Prior to this multithreading exercise, InterruptedException had always been a bit of a mystery to me. Was it really needed? Why? What should I do with it? Sample code frequently ignores it. Even Doug Lea didn't really seem to know what to do with it in Concurrent Programming in Java (Addison-Wesley, 2000).

ThreadManager gives a clear answer to these questions: InterruptedException must be obeyed—not ignored. The exception is thrown when a thread should exit posthaste. Ignoring it is almost certainly the wrong thing to do. Certainly ThreadManager won't work properly if you do. Proper operation of ThreadManager requires that threads exit when requested. If they don't, calls to stopAll() won't return.

The same applies to other exceptions, such as IOExceptions, which might indicate interruptions. These cases have always seemed more obvious to me; after all, there is often little one can do after receiving an exception on a socket. InterruptedException has always carried that temptation to try again.

Consistent with this advice, ThreadManager itself will break out of a call to stopAll() if it gets interrupted. Is this the right thing to do? It would seem to depend on the situation—something only the caller can know. When handling exceptions, deferring up the call stack generally seems like the right solution.

Conclusion

My prototyping experience convinced me that, ultimately, getting used to multithreaded programming is going to be a big change. It won't necessarily mean the use of new languages or even new skills. It will require consistent attention to parallelism as a new axis along which things must be made to work and can go wrong.

I suspect the database folks have a real advantage in this concurrency revolution. Unlike most applications, the shared-state nature of databases has meant scaling up to additional users was never as easy as adding racks of additional, yet essentially independent, machines. Databases are already quite good at exploiting true machine parallelism.

To accomplish this, database designers have had to consider issues of concurrency, isolation, and deadlock for every line of code they have written. Soon, this requirement will apply to all of us. It doesn't mean we have to rewrite everything from the ground up, but it could mean we'll have to reconsider every library we use to ask questions about how it will operate (or fail) in a multithreaded world.

Acknowledgments

Thanks go to my colleagues Stan Switzer and Steve Dakin for reviewing drafts of this article and to my employer, Adobe Systems, for the opportunity to perform this work and to write about it.

DDJ

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.