High-Performance I/O with Java NIO

The NIO library offers a high-performance technique for handling input/output (I/O) operations.


September 01, 2005
URL:http://www.drdobbs.com/jvm/high-performance-io-with-java-nio/jvm/high-performance-io-with-java-nio/184406242

The NIO library, a feature added to Java as part of JDK 1.4, offers an alternative, high-performance technique for handling input/output (I/O) operations. This library supports working directly with native channels, sockets, and direct memory buffers to increase the performance and throughput of I/O. In addition, it offers the ability to do true nonblocking I/O operations. However, if used incorrectly, NIO can yield little performance increase and can cause an increase in the load and reduce stability of an application. In this article, I discuss how Orbitz (the company I work for) used NIO to increase performance and stability by replacing an older standard I/O implementation. Orbitz communicates with various third-party backend systems to retrieve data for its customers. During the course of handling a single customer request, the Orbitz.com application performs time-sensitive I/O to fetch the customer's response. With a large number of concurrent users, the application can be performing hundreds of I/O operations at the same time.

The Orbitz.com application handles concurrent user requests by allowing each request to be performed in a separate thread. These threads, called "execute threads," can be configured such that each request is handled by a new thread or by a thread from a thread pool. In either case, it is vital to ensure that no thread is running for an extended period of time. When threads take too long processing a single user request and a new thread is used for each request, the Java VM has the possibility of crashing when excessive threads are created. Likewise, if a pool is being used and threads from that pool take too long while executing, other users are forced to wait until a thread has finished and can be reused.

Standard I/O

The Orbitz.com application was initially implemented with a standard I/O design using Java 1.3. To guarantee timeouts for I/O operations, it employed a separate thread to perform the I/O operations. The execute threads are then joined on this new thread using a timeout. With this approach, it prevented the execute thread from hanging when I/O operations encountered problems. Listing One shows the original thread that performed the I/O operations. It has been simplified from its original version.

Listing One

public class IOThread extends Thread {
    private String result;
    public synchronized String getResult() {
        return result;
    }
    public void run() {
        try {
            // Connect to the remote host and read in the data
            URL url = new URL("http://example.orbitz.com");
            URLConnection connection = url.openConnection();

            String localResult = doRead(connection);
            synchronized (this) {
                result = localResult;
            }
        } catch (IOException ioe) {
            System.err.println(ioe.toString());
        }
    }
    private String doRead(URLConnection connection) throws IOException {
        StringBuffer buf = new StringBuffer();
        InputStream is = connection.getInputStream();
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        char[] cbuf = new char[4096];
        int n;
        do {
            n = br.read(cbuf);
            if (n > 0) {
                buf.append(cbuf, 0, n);
            }
        } while (n >= 0);
        return buf.toString();
    }
}

By calling the join method on the IOThread, one of the execute threads waits until the IOThread has finished running. A timeout is specified when the join method is called to ensure that the execute threads do not wait forever for an IOThread to complete. Listing Two shows an example of how an execute thread used an IOThread.

Listing Two

public class OldIO {
    public String execute() {
        IOThread thread = new IOThread();
        thread.start();
        try {
            thread.join(15000); // Wait for 15 seconds
        } catch (InterruptedException ie) {}

        return thread.getResult();
    }
}

The first version of the execute threads created new instances of the IOThread class for each I/O operation. Using the initial implementation it was observed that the servers were periodically experiencing high thread counts. It had initially been assumed that the maximum number of threads that could be created in a Java VM running this code would be 2N, where N was the number of execute threads. After some investigation, I found that an instance of the IOThread would remain in memory if it experienced network problems and was hung while performing an I/O operation. In addition, the execute thread correctly timed-out and the next execute thread attempted the same operation, with the same results. The net result was that the Java VM created an enormous number of threads and eventually ran out of memory.

To reduce the thread count, I added a thread pool that stored instances of the IOThread class. I also changed the execute thread code so that it checked out an IOThread instance from the pool, used it, and then checked it back in. This not only reduced the high thread count, but also reduced the overhead of thread creation. Listing Three shows the second version of the execute thread that used the pool of IOThreads (the implementation of a thread pool is out of scope of this article).

Listing Three

public class PoolOldIO {
    private static ThreadPool pool = new ThreadPool();
    public String execute() {
        IOThread thread = pool.checkOut();
        String result;
        try {
            thread.execute(15000); // Wait for 15 seconds
        } catch (InterruptedException ie) {
            // Smother, okay to be interrupted
        } finally {
            result = thread.getResult();
            pool.checkIn(thread); // Must always check-in the thread
        }
        return result;
    }
}

This improvement still has an underlying problem. If the network experiences major failures, all instances of IOThread in the pool quickly become stalled and execute threads begin to fail during check-out because there are no more IOThreads in the pool. Additionally, when I began testing this new design under heavy load, I found that it still had problems that reduced overall performance. The issue was excessive context switching.

All the implementations thus far force the Java VM to perform increased context switching because either the execute threads or the IOThreads are all performing I/O operations concurrently. The largest problem with this concurrency is that the Java VM doesn't know which threads are ready to perform I/O operations and which are not. Therefore, the Java VM might put a thread onto the processor that isn't ready to perform I/O operations. Meanwhile another thread that is waiting to run is ready to perform I/O operations. Because Java does not provide the ability to "stack the deck" by moving threads that are ready to perform I/O to the top of the list, it is possible that a thread, which is ready to perform I/O, might wait long enough that it times out.

NIO

The NIO package added to Java 1.4 offers Java developers the ability to perform true nonblocking I/O and guarantee timeouts for I/O operations. It also works with low-level operating-system constructs to increase performance and reduce latency. In addition, it provides the ability to monitor multiple I/O operations concurrently, also known as "multiplexing."

Multiplexed NIO is a technique that moves all the I/O work into a single thread that watches over many I/O operations executing concurrently. A multiplexed NIO design uses the java.nio.channels.Selector class in conjunction with subclasses of the java.nio.channels.SelectableChannel class. The Selector class lets the application watch many I/O connections (subclasses of the SelectableChannel class and also known as "Channels") and organize these Channels into those that are ready to perform I/O and those that are not.

I decided to redesign to use NIO and a single thread to perform all the I/O operations. Listing Four shows the skeleton body of the NIO thread.

Listing Four

public class NIOWorker extends Thread {
    private Selector selector;
    public NIOWorker () throws IOException {
        selector = Selector.open();
    }
    ...
}

As with the threaded standard I/O design, the execute threads need a way to supply work to the NIOWorker in the form of a URL they want to contact. The NIOWorker then connects to the URL, sends an HTTP message to the server, and then reads the HTTP-response. After it completes that work, it passes back the result to the execute thread. One unit of work is designated using an inner class within the NIOWorker thread that lets a URL be passed in and the contents to be passed back. Listing Five shows the inner class of the NIOWorker that represents one unit of work.

Listing Five

public static class Work {
    public URL in;
    public String out;
    public ByteBuffer httpRequest;
}

Supplying work to the NIOWorker is not a simple process and requires two methods. Because the NIOWorker thread is a separate thread that handles all the I/O operations, execute threads must add their work and then wait for the NIOWorker thread to process it. Therefore, the NIOWorker class has one method that forces the execute threads to wait, and a second method that actually adds the work. This design waits and notifies using the Work object as the monitor. Listing Six shows the two add methods that force the execute threads to wait and add the work to the Selector.

Listing Six

public String doWork(URL url, long timeout) {
    Work work = new Work();
    work.in = url;
    String result = null;
    synchronized (work) {
        work.httpRequest = buildHTTPRequest(work);

        // If the work was added successfully, call wait on the work object
        // which forces the calling thread to wait (i.e. the execute thread)
        if (add(work, timeout)) {
            try {
                work.wait(timeout);
            } catch (InterruptedException e) {
                System.err.println("NIO operation interrupted");
            }
            result = work.out;
        }
    }
    return result;
}
protected boolean add(Work work, long timeout) {
    SocketChannel channel = null;
    try {
       URL url = work.in;
       InetSocketAddress addr = new InetSocketAddress(url.getHost(), 80);

       channel = SocketChannel.open();
       channel.configureBlocking(false);
       channel.connect(addr);

       WorkState state=new WorkState(System.currentTimeMillis(),timeout,work);
       channel.register(selector, SelectionKey.OP_CONNECT, state);
    } catch (IOException ioe) {
        if (channel != null) {
            try {
                channel.close();
            } catch (IOException ioe2) {
                System.err.println("Unable to close channel: " + ioe2);
            }
        }
        System.err.println("Channel creation or registration failed: " + ioe);
        return false;
    }
    return true;
}

This code creates a SocketChannel, which is a Channel that extends SelectableChannel, and registers the Selector with it. This Channel is set to nonblocking so that operations performed on Channel do not block until complete but rather return as quickly as possible. Registering a Selector with a Channel informs the Channel to begin a specific I/O operation and also tells the Channel to signal the Selector once the operation is complete. Here the code is setting the Channel to begin an OP_CONNECT operation, which forces the Channel to connect to a remote socket.

During the registration, this code also constructs a small class that holds some state information for the I/O operation. This is called an "attachment" and can be any object. The attachment is associated with a specific Selector/Channel relationship. This Selector/Channel relationship is called a "SelectionKey" (also known as "key"). It stores the type of operation the Channel is performing as well as the attachment. The WorkState object is what the NIOWorker uses as an attachment. WorkState lets the results that are read from the Channel be stored in a StringBuffer. It also provides a mechanism to monitor timeouts so that the NIOWorker thread does not continue working on something that has timed out. Listing Seven shows the WorkState class.

Listing Seven

public static class WorkState {
    public final StringBuffer buffer = new StringBuffer();
    public final Work work;
    public final long start;
    public final long timeout;
    public boolean success = false;
    public WorkState(long start, long timeout, Work work) {
        this.start = start;
        this.timeout = timeout;
        this.work = work;
    }
    public boolean isTimedOut() {
        return (System.currentTimeMillis() - start >= timeout);
    }
}

To watch over the Channels, the NIOWorker thread needs to perform "selects" by calling the selectNow method on the Selector. Performing "selects" tells the Selector to update all of its keys to reflect which Channels are ready and which are not, allowing the NIOWorker thread to stack the deck by only working on the Channels that are ready. The number of ready keys is returned from this method. Listing Eight shows the main run method for the NIOWorker thread that performs the select.

Listing Eight

public void run() {
    while (true) {
        try {
            int num = selector.selectNow();
            if (num > 0) {
                processKeys();
            } else {
                Thread.yield();
            }
        } catch (IOException ioe) {
            System.err.println("Unable to select: " + ioe.toString());
        } catch (InterruptedException ie) {
            // Continue processing
        }
    }
}

The processKeys method handles all of the Channels that signal the Selector that they are ready to perform I/O. Because the Selector handles multiple Channels, the processKeys method performs the I/O for all Channels that are ready. Listing Nine shows the processKeys method that is responsible for iterating over all the SelectionKeys and handling each one.

Listing Nine

protected void processKeys() {
    Set keys = selector.selectedKeys();
    for (Iterator iter = keys.iterator(); iter.hasNext();) {
        SelectionKey key = (SelectionKey) iter.next();
        iter.remove();
        WorkState state = (WorkState) key.attachment();
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            if (state.isTimedOut()) {
                finished(channel, key, state);
                continue;
            }
            boolean connectable = key.isConnectable();
            if (connectable && channel.finishConnect()) {
                // If the Channel is connected, setup the Channel to 
                // write the HTTP message to the remote server
                key.interestOps(SelectionKey.OP_WRITE);
            } else if (key.isWritable()) {
                // If the Channel is finished writing, setup the
                // Channel to read the HTTP response
                if (doWrite(channel, state)) {
                    key.interestOps(SelectionKey.OP_READ);
                }
            } else if (key.isReadable()) {
                // If the Channel is finished reading, call finished
                // to complete the work
                if (doRead(channel, state)) {
                    finished(channel, key, state);
                }
            }
        } catch (IOException ioe) {
            System.err.println("Failure during IO operation");
            finished(channel, key, state);
        }
    }
}

The Selector object provides the ability to retrieve all the keys that are ready to perform I/O using the selectedKeys() method. This method returns a Set containing all the keys that are ready to perform I/O. This Set is populated with the ready keys whenever a select operation is performed on the Selector.

One important detail about the processKeys method is that during the loop iteration, the WorkState attachment is retrieved from the SelectionKey and used to verify that the work has not timed out. If it has timed out, the finished method is called to signal that the work is complete, which in this case is due to a timeout rather than success.

The doWrite, doRead, and finished methods are the last methods that complete the NIOWorker. The doWrite method writes the HTTP-request String to the channel. Listing Ten shows the doWrite method.

Listing Ten

protected boolean doWrite(SocketChannel channel, WorkState state)
throws IOException {
    int rem = state.work.httpRequest.remaining();
    int num = channel.write(state.work.httpRequest);
    // Continue writing until everything has been written
    return (num == rem);
}

The doRead method is responsible for performing a read from a Channel and signaling if everything has been read from the Channel or not. Listing Eleven is the doRead method.

Listing Eleven

protected boolean doRead(SocketChannel channel, WorkState state)
throws IOException {
    buffer.clear();
    decoder.reset();
    boolean done = false;
    int num = channel.read(buffer);
    if (num == -1) {
        state.success = true;
        done = true;
    } else if (num > 0) {
        buffer.flip();
        String data = decoder.decode(buffer).toString();
        state.buffer.append(data);
    }
    return done;
}

The doRead method is a standard NIO implementation of reading from a Channel. It reads zero or more bytes from the Channel and also determines if the Channel has any more to read. Once this code is certain that there is nothing more to read, it returns True to signal to the processKeys method that it should call the finished method. This code goes one step further and sets the success flag on the WorkState object. This is used in the finished method to determine whether the HTTP-response should be parsed.

The final finished method notifies the execute thread that its work was complete, whether successfully or not. The success flag set in the doRead method is used to determine if the HTTP-response is parsed out. Listing Twelve is the finished method.

Listing Twelve

protected void finished(SocketChannel channel, 
                                  SelectionKey key, WorkState state) {
    key.cancel();
    try {
        channel.close();
    } catch (IOException ioe) {
        System.err.println("Failed to close socket: " + ioe.toString());
    } finally {
        Work work = state.work;
        synchronized (work) {
            // Only if the Work was successful, parse out the HTTP response
            if (state.success) {
                String result = state.buffer.toString();
                work.out = parseHTTPResponse(result);
            }
            work.notify();
        }
    }
}

NIO does not handle the HTTP-request generation and HTTP-response parsing automatically. That work is left up to the NIO implementation or the client. This design handles the generation of the HTTP-request and parsing of the HTTP-response messages. The buildHTTPRequest and parseHTTPResponse methods were added here for illustration. The work of generating and parsing the HTTP messages is slightly more complex but is available in the full version of the code, within the HTTPParser class and the classes that implement the HTTPMethod interface.

Listing Thirteen is an example of an execute thread calling the NIOWorker to perform some work.

Listing Thirteen

public class NewIO {
    // Assume someone else setup the NIOWorker and passed it in
    public String execute(NIOWorker worker) {
        try {
            // Construct the URL and pass it to the worker
            URL url = new URL("http://example.orbitz.com");
            return worker.doWork(url, 15000); // Wait 15 seconds
        } catch (IOException ioe) {
            System.err.println(ioe.toString());
        }
    }
}

Performance

To fully appreciate the benefits gained between standard I/O and NIO, I collected statistics using each design employed. These statistics were collected by running each of the actual implementations through a series of load tests in a fixed environment. Table 1 presents the results from those tests. All values are the number of transactions completed by an implementation per minute.

Minute Standard IOMultiplexed NIO
1 135 287
2 137 240
3 162 235
4 152 270
5 144 246
6 167 266
7 164 390
8 164 339
9 163 364
10 154 390
Avg 154.2 302.7

Table 1: Load test results.

In addition to a load test, the standard I/O and the NIO solutions were both used in the Orbitz production environment for some period of time. I was able to monitor each one and collect statistics for load and latency. Table 2 shows the statistics from the Orbitz.com production environment.

Solution Average Latency Average Load
Standard I/O 1489 0.8
Multiplexed NIO 1199 0.7

Table 2: Statistics from the Orbitz.com production environment.

Additional Thoughts

The code I present here is only part of the complete solution used in the Orbitz.com application. I simplified some of the code, but have retained the major work required to implement a multiplexed NIO solution.

NIO is not a simple API to use and I encountered many issues during testing that required fine tuning. A few things I found that should be kept in mind are:

Pitfalls

One major NIO pitfall to be aware of is that you should always use multiplexed NIO. Putting NIO code into multiple threads reduces overall performance and can cause excessive load. I experimented with using NIO in multiple threads and observed that due to enormous amounts of context switching the load nearly tripled, while latency and throughput remained unchanged.

Conclusion

A multiplexed NIO solution is an ideal solution for the Orbitz application requirements because of the efficiencies of multiplexed I/O operations. Multiplexed I/O means that there is a single thread doing many I/O operations by leveraging a Selector to perform the multiplexing. Using a Selector, multiple Channels executing I/O operations are managed and watched over concurrently. This means that at no point is the Java VM ever waiting on Channels that are not ready to perform I/O; because the Selector knows precisely the Channels that are ready to perform I/O and those that are not. Because there is only a single thread performing I/O, the problem of context switching between multiple threads performing I/O operations has been eliminated. All of these conditions greatly reduce overhead and latency while increasing total throughput over a standard I/O implementation.

Acknowledgments

Thanks to David P. Thomas of Orbitz for reviewing this article.


Brian is a senior architect with Orbitz.com. He can be contacted at [email protected].

September, 2005: High-Performance I/O With Java NIO

Solution Average Latency Average Load
Standard I/O 1489 0.8
Multiplexed NIO 1199 0.7

Table 2: Statistics from the Orbitz.com production environment.

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.