Java Concurrency: The Executor Service
In Part 1 and Part 2 of this Java Concurrency series, we explored different ways to process requests in parallel via queues. In this installment, we'll explore the Java Executor Service, which is useful for managing pools of threads, as well as the scheduling of future events. First, let's look at a simple example.
The Java Executors
The Executors class helps you manage the creation of threads in a pool, their scheduling (if applicable), as well as their termination. In general, it helps you avoid having to create and manage threads yourself. For example, let's examine the following sample network application, where first a ServerSocket is created to listen for new client connections, and then those connections are managed:
public class JavaServer_Executor {
private static final int HANDLERS = 10;
private static final int SERVER_PORT = 5000;
private ExecutorService pool = null;
class ListenerService implements Runnable {
private ServerSocket serverSocket = null;
public ListenerService() throws IOException { … }
public void run() { … }
}
class ClientHandler implements Runnable {
private final Socket socket;
public ClientHandler(Socket socket) { … }
public void run() { … }
}
public JavaServer_Executor() {
try {
pool = Executors.newFixedThreadPool(HANDLERS);
ListenerService listener = new ListenerService();
pool.submit(listener);
}
catch ( Exception e ) {
e.printStackTrace();
}
}
public static void main(String[] args) {
JavaServer_Executor javaServer = new JavaServer_Executor();
}
}
We have the main class, JavaServer_Executor, which contains two nested classes, ListenerService and ClientHandler, which handle new client connections and manage those connections, respectively. We'll look at the nested classes in detail later. First, let's look at how the Executor service is used.
The first step is to create a thread pool via the method newFixedThreadPool(), where the number of threads is supplied and created up front. Alternatively, you can use:
newCachedThreadPool, which creates threads as needed, reusing them when it can.newScheduledThreadPool, which creates threads that execute tasks periodically.newSingleThreadExecutor, which creates one thread that executes submitted tasks from a queue .newSingleThreadScheduledExecutor, which creates one thread that executes submitted tasks periodically.
Our example calls newFixedThreadPool(), then instantiates ListenerService:
class ListenerService implements Runnable {
private ServerSocket serverSocket = null;
public ListenerService() throws IOException {
serverSocket = new ServerSocket(SERVER_PORT);
}
@Override public void run() {
try {
System.out.println("ListenerService: waiting for new clients");
while ( true ) {
// Block and wait for a client connection
Socket client = serverSocket.accept();
// Assign a thread to handle client network communication
System.out.println("ListenerService: client connected...");
ClientHandler handler = new ClientHandler(client);
pool.execute(handler);
}
}
catch ( Exception e ) {
e.printStackTrace();
pool.shutdown();
}
}
}
The ListenerService class first creates a ServerSocket on a given port, where it will wait and listen for new client connection requests on that port. Since this class is a Runnable, the processing takes place in the run() method. To start this process, the ListenerService Runnable instance is itself submitted to the Executor thread pool as a task, where it's run in its own thread; no need for you to worry about creating the thread yourself.
Handling Incoming Clients
The ListenerService obediently waits for client connections in its own thread, and when a client does connect, it hands it off to the ClientHandler class to process, providing the resulting object to the Executor pool to manage the threading. The ClientHandler class (see below) is also a Runnable, and each instance runs in its own thread provided by the pool.
class ClientHandler implements Runnable {
private final Socket socket;
public ClientHandler(Socket socket) {
this.socket = socket;
}
public void run() {
// Using Java 7 try-with-resources
// For more info: http://www.drdobbs.com/blogs/jvm/232600691
try (
ObjectInputStream ois =
new ObjectInputStream(socket.getInputStream() )
) {
while ( true ) {
// Wait for a message to arrive
String message = ois.readUTF();
System.out.println(
"ClientHandler: Thread '" +
Thread.currentThread().toString() +
"' received message: " + message);
}
}
catch ( EOFException e ) {
// Client disconnected cleanly
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}
None of this code is specific to the Executor service, but it's included here to provide a complete working sample application.
Controlling Tasks
If you replace the call to pool.execute() in the ListenerService with pool.submit(), the resulting Future object that is returned can be used to process the related asynchronous event. For example, you can periodically call the isDone() method, which will return true when the Runnable terminates, to check for the completion of an asynchronous network event. Or, if the task is taking too long to complete, you can terminate it via the cancel() method. This saves you from writing all of the boilerplate code required to handle this thread synchronization yourself.
In the next installment, we'll take a look at Java.NIO Channels, and how NetworkChannel helps you improve the efficiency of this server application example. This blog is by no means a complete discussion on Java Executors. As such, we'll continue to explore it going forward, specifically at how periodic scheduling, via the ScheduledExecutorService, can be useful.
Happy coding!
EJB

