The NIO library offers a high-performance technique for handling input/output (I/O) operations.
September 01, 2005
URL:http://www.drdobbs.com/mobile/high-performance-io-with-java-nio/184406242
The NIO library, a feature added to Java as part of JDK 1.4, offers an alternative, high-performance technique for handling input/output (I/O) operations. This library supports working directly with native channels, sockets, and direct memory buffers to increase the performance and throughput of I/O. In addition, it offers the ability to do true nonblocking I/O operations. However, if used incorrectly, NIO can yield little performance increase and can cause an increase in the load and reduce stability of an application. In this article, I discuss how Orbitz (the company I work for) used NIO to increase performance and stability by replacing an older standard I/O implementation. Orbitz communicates with various third-party backend systems to retrieve data for its customers. During the course of handling a single customer request, the Orbitz.com application performs time-sensitive I/O to fetch the customer's response. With a large number of concurrent users, the application can be performing hundreds of I/O operations at the same time.
The Orbitz.com application handles concurrent user requests by allowing each request to be performed in a separate thread. These threads, called "execute threads," can be configured such that each request is handled by a new thread or by a thread from a thread pool. In either case, it is vital to ensure that no thread is running for an extended period of time. When threads take too long processing a single user request and a new thread is used for each request, the Java VM has the possibility of crashing when excessive threads are created. Likewise, if a pool is being used and threads from that pool take too long while executing, other users are forced to wait until a thread has finished and can be reused.
The Orbitz.com application was initially implemented with a standard I/O design using Java 1.3. To guarantee timeouts for I/O operations, it employed a separate thread to perform the I/O operations. The execute threads are then joined on this new thread using a timeout. With this approach, it prevented the execute thread from hanging when I/O operations encountered problems. Listing One shows the original thread that performed the I/O operations. It has been simplified from its original version.
Listing One
public class IOThread extends Thread { private String result; public synchronized String getResult() { return result; } public void run() { try { // Connect to the remote host and read in the data URL url = new URL("http://example.orbitz.com"); URLConnection connection = url.openConnection(); String localResult = doRead(connection); synchronized (this) { result = localResult; } } catch (IOException ioe) { System.err.println(ioe.toString()); } } private String doRead(URLConnection connection) throws IOException { StringBuffer buf = new StringBuffer(); InputStream is = connection.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(is)); char[] cbuf = new char[4096]; int n; do { n = br.read(cbuf); if (n > 0) { buf.append(cbuf, 0, n); } } while (n >= 0); return buf.toString(); } }
By calling the join
method on the IOThread
, one of the execute threads waits until the IOThread
has finished running. A timeout is specified when the join method is called to ensure that the execute threads do not wait forever for an IOThread
to complete. Listing Two shows an example of how an execute thread used an IOThread
.
Listing Two
public class OldIO { public String execute() { IOThread thread = new IOThread(); thread.start(); try { thread.join(15000); // Wait for 15 seconds } catch (InterruptedException ie) {} return thread.getResult(); } }
The first version of the execute threads created new instances of the IOThread
class for each I/O operation. Using the initial implementation it was observed that the servers were periodically experiencing high thread counts. It had initially been assumed that the maximum number of threads that could be created in a Java VM running this code would be 2N
, where N
was the number of execute threads. After some investigation, I found that an instance of the IOThread
would remain in memory if it experienced network problems and was hung while performing an I/O operation. In addition, the execute thread correctly timed-out and the next execute thread attempted the same operation, with the same results. The net result was that the Java VM created an enormous number of threads and eventually ran out of memory.
To reduce the thread count, I added a thread pool that stored instances of the IOThread
class. I also changed the execute thread code so that it checked out an IOThread
instance from the pool, used it, and then checked it back in. This not only reduced the high thread count, but also reduced the overhead of thread creation. Listing Three shows the second version of the execute thread that used the pool of IOThread
s (the implementation of a thread pool is out of scope of this article).
Listing Three
public class PoolOldIO { private static ThreadPool pool = new ThreadPool(); public String execute() { IOThread thread = pool.checkOut(); String result; try { thread.execute(15000); // Wait for 15 seconds } catch (InterruptedException ie) { // Smother, okay to be interrupted } finally { result = thread.getResult(); pool.checkIn(thread); // Must always check-in the thread } return result; } }
This improvement still has an underlying problem. If the network experiences major failures, all instances of IOThread
in the pool quickly become stalled and execute threads begin to fail during check-out because there are no more IOThread
s in the pool. Additionally, when I began testing this new design under heavy load, I found that it still had problems that reduced overall performance. The issue was excessive context switching.
All the implementations thus far force the Java VM to perform increased context switching because either the execute threads or the IOThread
s are all performing I/O operations concurrently. The largest problem with this concurrency is that the Java VM doesn't know which threads are ready to perform I/O operations and which are not. Therefore, the Java VM might put a thread onto the processor that isn't ready to perform I/O operations. Meanwhile another thread that is waiting to run is ready to perform I/O operations. Because Java does not provide the ability to "stack the deck" by moving threads that are ready to perform I/O to the top of the list, it is possible that a thread, which is ready to perform I/O, might wait long enough that it times out.
The NIO package added to Java 1.4 offers Java developers the ability to perform true nonblocking I/O and guarantee timeouts for I/O operations. It also works with low-level operating-system constructs to increase performance and reduce latency. In addition, it provides the ability to monitor multiple I/O operations concurrently, also known as "multiplexing."
Multiplexed NIO is a technique that moves all the I/O work into a single thread that watches over many I/O operations executing concurrently. A multiplexed NIO design uses the java.nio.channels.Selector
class in conjunction with subclasses of the java.nio.channels.SelectableChannel
class. The Selector
class lets the application watch many I/O connections (subclasses of the SelectableChannel
class and also known as "Channels") and organize these Channels into those that are ready to perform I/O and those that are not.
I decided to redesign to use NIO and a single thread to perform all the I/O operations. Listing Four shows the skeleton body of the NIO thread.
Listing Four
public class NIOWorker extends Thread { private Selector selector; public NIOWorker () throws IOException { selector = Selector.open(); } ... }
As with the threaded standard I/O design, the execute threads need a way to supply work to the NIOWorker
in the form of a URL they want to contact. The NIOWorker
then connects to the URL, sends an HTTP message to the server, and then reads the HTTP-response. After it completes that work, it passes back the result to the execute thread. One unit of work is designated using an inner class within the NIOWorker
thread that lets a URL be passed in and the contents to be passed back. Listing Five shows the inner class of the NIOWorker
that represents one unit of work.
Listing Five
public static class Work { public URL in; public String out; public ByteBuffer httpRequest; }
Supplying work to the NIOWorker
is not a simple process and requires two methods. Because the NIOWorker
thread is a separate thread that handles all the I/O operations, execute threads must add their work and then wait for the NIOWorker
thread to process it. Therefore, the NIOWorker
class has one method that forces the execute threads to wait, and a second method that actually adds the work. This design waits and notifies using the Work
object as the monitor. Listing Six shows the two add
methods that force the execute threads to wait and add the work to the Selector
.
Listing Six
public String doWork(URL url, long timeout) { Work work = new Work(); work.in = url; String result = null; synchronized (work) { work.httpRequest = buildHTTPRequest(work); // If the work was added successfully, call wait on the work object // which forces the calling thread to wait (i.e. the execute thread) if (add(work, timeout)) { try { work.wait(timeout); } catch (InterruptedException e) { System.err.println("NIO operation interrupted"); } result = work.out; } } return result; } protected boolean add(Work work, long timeout) { SocketChannel channel = null; try { URL url = work.in; InetSocketAddress addr = new InetSocketAddress(url.getHost(), 80); channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(addr); WorkState state=new WorkState(System.currentTimeMillis(),timeout,work); channel.register(selector, SelectionKey.OP_CONNECT, state); } catch (IOException ioe) { if (channel != null) { try { channel.close(); } catch (IOException ioe2) { System.err.println("Unable to close channel: " + ioe2); } } System.err.println("Channel creation or registration failed: " + ioe); return false; } return true; }
This code creates a SocketChannel
, which is a Channel
that extends SelectableChannel
, and registers the Selector
with it. This Channel
is set to nonblocking so that operations performed on Channel
do not block until complete but rather return as quickly as possible. Registering a Selector
with a Channel
informs the Channel
to begin a specific I/O operation and also tells the Channel
to signal the Selector
once the operation is complete. Here the code is setting the Channel
to begin an OP_CONNECT operation, which forces the Channel
to connect to a remote socket.
During the registration, this code also constructs a small class that holds some state information for the I/O operation. This is called an "attachment" and can be any object. The attachment is associated with a specific Selector/Channel
relationship. This Selector/Channel
relationship is called a "SelectionKey" (also known as "key"). It stores the type of operation the Channel
is performing as well as the attachment. The WorkState
object is what the NIOWorker
uses as an attachment. WorkState
lets the results that are read from the Channel
be stored in a StringBuffer
. It also provides a mechanism to monitor timeouts so that the NIOWorker
thread does not continue working on something that has timed out. Listing Seven shows the WorkState
class.
Listing Seven
public static class WorkState { public final StringBuffer buffer = new StringBuffer(); public final Work work; public final long start; public final long timeout; public boolean success = false; public WorkState(long start, long timeout, Work work) { this.start = start; this.timeout = timeout; this.work = work; } public boolean isTimedOut() { return (System.currentTimeMillis() - start >= timeout); } }
To watch over the Channels
, the NIOWorker
thread needs to perform "selects" by calling the selectNow
method on the Selector
. Performing "selects" tells the Selector
to update all of its keys to reflect which Channels
are ready and which are not, allowing the NIOWorker
thread to stack the deck by only working on the Channels
that are ready. The number of ready keys is returned from this method. Listing Eight shows the main run
method for the NIOWorker
thread that performs the select.
Listing Eight
public void run() { while (true) { try { int num = selector.selectNow(); if (num > 0) { processKeys(); } else { Thread.yield(); } } catch (IOException ioe) { System.err.println("Unable to select: " + ioe.toString()); } catch (InterruptedException ie) { // Continue processing } } }
The processKeys
method handles all of the Channels
that signal the Selector
that they are ready to perform I/O. Because the Selector
handles multiple Channel
s, the processKeys
method performs the I/O for all Channel
s that are ready. Listing Nine shows the processKeys
method that is responsible for iterating over all the SelectionKeys
and handling each one.
Listing Nine
protected void processKeys() { Set keys = selector.selectedKeys(); for (Iterator iter = keys.iterator(); iter.hasNext();) { SelectionKey key = (SelectionKey) iter.next(); iter.remove(); WorkState state = (WorkState) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); try { if (state.isTimedOut()) { finished(channel, key, state); continue; } boolean connectable = key.isConnectable(); if (connectable && channel.finishConnect()) { // If the Channel is connected, setup the Channel to // write the HTTP message to the remote server key.interestOps(SelectionKey.OP_WRITE); } else if (key.isWritable()) { // If the Channel is finished writing, setup the // Channel to read the HTTP response if (doWrite(channel, state)) { key.interestOps(SelectionKey.OP_READ); } } else if (key.isReadable()) { // If the Channel is finished reading, call finished // to complete the work if (doRead(channel, state)) { finished(channel, key, state); } } } catch (IOException ioe) { System.err.println("Failure during IO operation"); finished(channel, key, state); } } }
The Selector
object provides the ability to retrieve all the keys that are ready to perform I/O using the selectedKeys()
method. This method returns a Set
containing all the keys that are ready to perform I/O. This Set
is populated with the ready keys whenever a select operation is performed on the Selector
.
One important detail about the processKeys
method is that during the loop iteration, the WorkState
attachment is retrieved from the SelectionKey
and used to verify that the work has not timed out. If it has timed out, the finished method is called to signal that the work is complete, which in this case is due to a timeout rather than success.
The doWrite
, doRead
, and finished
methods are the last methods that complete the NIOWorker
. The doWrite
method writes the HTTP-request String
to the channel. Listing Ten shows the doWrite
method.
Listing Ten
protected boolean doWrite(SocketChannel channel, WorkState state) throws IOException { int rem = state.work.httpRequest.remaining(); int num = channel.write(state.work.httpRequest); // Continue writing until everything has been written return (num == rem); }
The doRead
method is responsible for performing a read from a Channel
and signaling if everything has been read from the Channel
or not. Listing Eleven is the doRead
method.
Listing Eleven
protected boolean doRead(SocketChannel channel, WorkState state) throws IOException { buffer.clear(); decoder.reset(); boolean done = false; int num = channel.read(buffer); if (num == -1) { state.success = true; done = true; } else if (num > 0) { buffer.flip(); String data = decoder.decode(buffer).toString(); state.buffer.append(data); } return done; }
The doRead
method is a standard NIO implementation of reading from a Channel
. It reads zero or more bytes from the Channel
and also determines if the Channel
has any more to read. Once this code is certain that there is nothing more to read, it returns True to signal to the processKeys
method that it should call the finished method. This code goes one step further and sets the success flag on the WorkState
object. This is used in the finished
method to determine whether the HTTP-response should be parsed.
The final finished
method notifies the execute thread that its work was complete, whether successfully or not. The success flag set in the doRead
method is used to determine if the HTTP-response is parsed out. Listing Twelve is the finished
method.
Listing Twelve
protected void finished(SocketChannel channel, SelectionKey key, WorkState state) { key.cancel(); try { channel.close(); } catch (IOException ioe) { System.err.println("Failed to close socket: " + ioe.toString()); } finally { Work work = state.work; synchronized (work) { // Only if the Work was successful, parse out the HTTP response if (state.success) { String result = state.buffer.toString(); work.out = parseHTTPResponse(result); } work.notify(); } } }
NIO does not handle the HTTP-request generation and HTTP-response parsing automatically. That work is left up to the NIO implementation or the client. This design handles the generation of the HTTP-request and parsing of the HTTP-response messages. The buildHTTPRequest
and parseHTTPResponse
methods were added here for illustration. The work of generating and parsing the HTTP messages is slightly more complex but is available in the full version of the code, within the HTTPParser
class and the classes that implement the HTTPMethod
interface.
Listing Thirteen is an example of an execute thread calling the NIOWorker
to perform some work.
Listing Thirteen
public class NewIO { // Assume someone else setup the NIOWorker and passed it in public String execute(NIOWorker worker) { try { // Construct the URL and pass it to the worker URL url = new URL("http://example.orbitz.com"); return worker.doWork(url, 15000); // Wait 15 seconds } catch (IOException ioe) { System.err.println(ioe.toString()); } } }
To fully appreciate the benefits gained between standard I/O and NIO, I collected statistics using each design employed. These statistics were collected by running each of the actual implementations through a series of load tests in a fixed environment. Table 1 presents the results from those tests. All values are the number of transactions completed by an implementation per minute.
Minute | Standard | IOMultiplexed NIO |
1 | 135 | 287 |
2 | 137 | 240 |
3 | 162 | 235 |
4 | 152 | 270 |
5 | 144 | 246 |
6 | 167 | 266 |
7 | 164 | 390 |
8 | 164 | 339 |
9 | 163 | 364 |
10 | 154 | 390 |
Avg | 154.2 | 302.7 |
In addition to a load test, the standard I/O and the NIO solutions were both used in the Orbitz production environment for some period of time. I was able to monitor each one and collect statistics for load and latency. Table 2 shows the statistics from the Orbitz.com production environment.
Solution | Average Latency | Average Load |
Standard I/O | 1489 | 0.8 |
Multiplexed NIO | 1199 | 0.7 |
The code I present here is only part of the complete solution used in the Orbitz.com application. I simplified some of the code, but have retained the major work required to implement a multiplexed NIO solution.
NIO is not a simple API to use and I encountered many issues during testing that required fine tuning. A few things I found that should be kept in mind are:
Selector
. This can cause memory leaks that result in a Java VM crash. The Selector
can be queried to determine how many keys it currently has and that number can be compared to a configurable threshold.run
method runs in a tight loop. The earlier code runs in a tight loop and this may be undesirable for low- to medium-volume applications. To cause a looser loop, one of the Selector
methods with a timeout can be used. This will cause the call to that method to block until a Channel
is ready.Selector
is empty inside the run
method. Inside the add methods, the NIOWorker
thread can then be notified when work is successfully added.ByteBuffer
and CharsetDecoder
from the doRead
method to instance variables on the NIOWorker
object.Selector
for each loop iteration in the main run
method to determine if any work has timed out. This check will prevent the Selector
from becoming full of invalid connections due to network failures.One major NIO pitfall to be aware of is that you should always use multiplexed NIO. Putting NIO code into multiple threads reduces overall performance and can cause excessive load. I experimented with using NIO in multiple threads and observed that due to enormous amounts of context switching the load nearly tripled, while latency and throughput remained unchanged.
A multiplexed NIO solution is an ideal solution for the Orbitz application requirements because of the efficiencies of multiplexed I/O operations. Multiplexed I/O means that there is a single thread doing many I/O operations by leveraging a Selector
to perform the multiplexing. Using a Selector
, multiple Channel
s executing I/O operations are managed and watched over concurrently. This means that at no point is the Java VM ever waiting on Channel
s that are not ready to perform I/O; because the Selector
knows precisely the Channel
s that are ready to perform I/O and those that are not. Because there is only a single thread performing I/O, the problem of context switching between multiple threads performing I/O operations has been eliminated. All of these conditions greatly reduce overhead and latency while increasing total throughput over a standard I/O implementation.
Thanks to David P. Thomas of Orbitz for reviewing this article.
Brian is a senior architect with Orbitz.com. He can be contacted at [email protected].
Solution | Average Latency | Average Load |
Standard I/O | 1489 | 0.8 |
Multiplexed NIO | 1199 | 0.7 |
Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.