Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

JVM Languages

Building Dynamic Fail-Over Java Servers


Nov01: Building Dynamic Fail-Over Java Servers

Chang is vice president of product engineering for elipva Ltd (http://www.elipva.com/) and coauthor of Beyond the Web Lifestyle (ISBN 0-13-017937-X). He can be contacted at [email protected].


Failures come in many guises, ranging from misplaced CD-ROMs to full-fledged disasters such as fires, earthquakes, or tornados that destroy servers (not to mention the buildings they are housed in). Clearly, it is imperative that you have a means of fail-over to ensure the uninterrupted running of mission-critical software. To most programmers, however, fail-over relates to high availability and fault tolerance, both of which are usually a system administrator's responsibility. In reality, developers must face the possibility of a systems outage. Data redundancy is often not enough — mission-critical software needs to provide the necessary functionalities as quickly as possible. Consequently, the design of the software itself demands robustness and a certain hardiness to survive hardware failure and operate as though little or nothing occurred.

Still, implementing a software fail-over design can be a tedious and unrewarding task. In this article, I present a Java-based fail-over mechanism called "Xander" (available electronically; see "Resource Center," page 5) that is straightforward to implement.

In general, requirements for server software include:

  • Fail-over. Servers must be able to dynamically detect failure of one server and react for another one to take over.
  • Multiple networks. Servers must reside in one or more network segments in case a segment fails.

  • Minimal resources. Servers should use a minimal amount of resources for maintenance of the fail-over mechanism.

  • Configurable. Servers must be configurable enough so that the software can run under many conditions.

  • Flexible. Servers must be flexible enough to incorporate whatever processing you want — besides fail-over.

How Xander Works

Unquestionably, the most important requirement is for software servers to continue operating despite failure at any point in time. Failure means that the server cannot be detected because it (or the network) is not operational. Consequently, basic system designs involve multiple servers running on different machines at the same time. The premise is that if one server fails, another one can take over and continue.

A basic design assumption of systems such as Xander, of course, is that only one server — the primary server — can be active at any point in time. The other servers are running, but dormant. This design allows simplicity in fail-over. Each server is started up with a current cluster status that indicates the servers that are available in the cluster.

This mechanism loosely adheres to the Carrier Sense Multiple Access/Collision Detection (CSMA/CD) protocol used by the Ethernet protocol. In the CSMA/CD protocol, each interface waits until there is no signal on the channel before the interface can begin transmitting. If some other interface is transmitting, there is a signal on the channel (the "carrier"). All other interfaces must wait until the carrier ceases before trying to transmit. This process is called "carrier sense." All Ethernet interfaces are equal in their ability to send frames onto the network. One interface doesn't get a higher priority over another. This is what "multiple access" means.

Sometimes it is possible for two interfaces to sense that the network is idle and simultaneously start transmitting frames. When this happens, the Ethernet system can sense the collision of signals, stop the transmission, and resend the frames. This is called "collision detect."

The servers perform some kind of carrier sense by detecting each other's existence. There is no priority in any of the servers (although they can be easily modified to support a priority scheme). Collision detection is implemented via a combination of detection and promotion/demotion mechanisms. The sequence goes like this:

  1. On startup, the server sends out a notification of its existence.

  2. The server adds itself to the current cluster status and starts to monitor the network for other servers.

  3. The server also monitors the network to see if there is a running primary server.

  4. If there are no primary servers, the server promotes itself as the primary server and starts performing server functions.

  5. It then declares itself as the primary server and informs other servers on the network that a primary server exists.

When a secondary/backup server is started, the sequence goes like this:

  1. On startup, the server sends out a notification of its existence. The existing servers notice the existence of an additional server and add it into their current cluster status.

  2. The newly started server discovers the existing servers and adds them to its current cluster status.

  3. The newly started server monitors the network, discovers that there is an existing primary server, and remains dormant.

The following scenarios describe how the cluster reacts to one or more servers failing. Once a nonprimary server goes down, what happens is:

  1. The failed server stops sending out notifications of its existence.

  2. The remaining servers notice that one of its server cluster members is down.

  3. The servers try to ascertain the existence of the server by pinging the failed server.

  4. After a number of failed attempts, the remaining servers remove the server from their current cluster status.

Once a primary server goes down, the same actions occur again. Additionally:

  1. The failed primary server stops sending declarations that a primary server exists.

  2. The remaining servers notice the primary server is down.

  3. The servers try to ascertain the existence of the server by pinging the failed primary server.

  4. After a certain number of failed attempts, the remaining servers remove the server from its current cluster status and go into primary contention mode.

  5. Each remaining server promotes itself as the primary server.

  6. After declaring itself, the new primary server checks if there is another primary server in the network.

  7. If another primary server exists, the new primary server demotes itself.

  8. This contention repeats itself until only one primary server exists.

Xander Implementation Details

Xander depends heavily on Java network programming. Consequently, I'll focus here on the classes that implement the server cluster (see Figure 1), highlighting major parts of the code.

Server is a standalone Java server application that synchronizes with other instances of itself running in the same or different Java Virtual Machine (JVM) to ensure fail-over. Each server runs under a different machine or IP address. Each server has a current cluster status that lists the servers in the same cluster; see Listing One.

The Server class contains all other functions, starts the Synchronizer to synchronize with other servers, starts the MultiServerThread to start accepting from a server socket, and instantiates and starts the application function if it is the primary server.

The server's default starting port number is at 5055 and default starting IP address is the local host machine. The server starts up by creating a Status object that represents the current cluster status. It then sets the current IP address and port number in the Status object, and checks if the server is started up as a primary (explicit startup as a primary server). If it is, the server is promoted as a primary server without checking for an existing primary server. This does not necessarily mean that the server eventually will become the primary server. This is because if there is an existing primary server, the server goes into primary server contention and the resolution could result in the eventual demotion of this server; see Listing Two.

Next, the server opens a server-side socket to allow the receiving and processing of socket requests. This is eventually passed on to the MultiServerThread class. Next, the necessary objects are prepared and initialized — a Synchronizer object is created, then a customized implementation of the Application interface. When preparation completes, the server goes into a perpetual loop, first requesting for synchronization, then spawning a thread to accept socket requests. If it is the primary server, Server starts the Application object.

MultiServerThread

MultiServerThread is a Thread that performs the server socket requests. Each request spawns a MultiServerThread object. MultiServerThread lets the server respond to requests from another server or an actual user; see Listing Three.

MultiServerThread is a simple threaded socket reception implementation that reads in the socket request and tokenizes it to extract the command and parameters. It then passes the information to the various private methods to process.

Users can send commands to the server to request for response. One typical way to test the server response is to telnet to port 5055. To check the server to see if it is alive, use the ask isAlive command. If the response is an *, the server is alive. The other prebuilt command is the change command that can request for a change in the current cluster status. MultiServerThread can be modified to implement other commands along similar lines; see Listing Four.

Synchronizer

Synchronizer is the class where all the action is. This class enables the fail-over capabilities of the server and is the most complex class in the entire mechanism. It consists of a main Synchronizer class (Listing Five) and four inner classes that perform the following different tasks:

  • PingThread detects the presence of servers in the current cluster status. If any of the servers are down, it removes the server from the current cluster status.
  • DeclareSelfThread lets Server declare its own presence.

  • CheckServerThread checks for the presence of new servers and adds them to the current cluster status.

  • DeclarePrimaryThread declares the server as a primary.

  • CheckPrimaryThread checks if a primary server exists in the network.

Using the WaitTimer configurable variable, a random number is generated. This is the number of seconds to wait in between starting each thread. The number created is more than 0 but less than WaitTimer. Each thread is started and the random amount of time waits before the next thread is executed. Why? Because if a uniform amount of time is chosen between the starting of threads for every loop (no time between the starting of each thread is considered as a uniform amount of time), then there's a chance that a new server just starting might never be detected. More specifically, since each loop waits the same amount of time, if the new server is started in-phase with an existing server, it will never be discovered by that server. With the randomization of the waiting time, the timing of the servers is purposely thrown out of phase. Because of the phase difference, the servers can detect each other. How long this takes depends on the other configurable variables.

PingThread is a simple thread class that loops through each server in the current cluster status and tries to open a socket to the server. If it can, it considers the server up and proceeds to the next server; otherwise, it considers the server not reachable.

Because of the randomized nature of the servers, it is possible that the server can sometimes be up while the thread does not open the socket. This is the reason why PingThread notes it down if the server is unreachable. Each time PingThread runs, it checks the number of times the server has been unreachable. After a configurable number of times this has happened, PingThread declares it to be officially down and removes it from the current cluster status; see Listing Six.

DeclareSelfThread declares its own presence by multicasting to a discovery port. It sends its own IP address and port number in a datagram to the discovery port. The discovery port is configurable; see Listing Seven.

CheckServerThread does the opposite of DeclareSelfThread — it monitors the discovery port for new servers. This includes itself at first startup. Once another server is discovered, it is added to the current cluster status. The datagram socket waits for results to come until the timeout period is reached. Again, the timeout period is configurable; see Listing Eight.

DeclarePrimaryThread and CheckPrimaryThread are two inner classes similar to DeclareSelfThread and CheckServerThread. DeclarePrimaryThread is the simpler class that performs an almost identical function as the DeclareSelfThread, except that it multicasts to the primary port to declare it is the primary server. Only the primary server runs this thread.

CheckPrimaryThread performs a similar function as CheckServerThread in that it also defines the primary contention mechanism. Once the multicast socket times out after a configurable period, a counter is incremented to indicate that the primary server cannot be detected (this could be because the primary server has failed). After the number of times this has occurred exceeds a configurable amount, the server promotes itself.

CheckPrimaryThread also checks if there exists another primary server in the network. If there is, the server immediately demotes itself. In a way, this means the primary contention consists of a race for the counter to exceed the max_primary_retry variable; see Listing Nine.

Status is the class that represents the current cluster status of the server cluster. It is a simple class implemented as a Singleton with these variables (and its respective getter/setter methods):

  • Primary indicator to indicate if it is the primary server.
  • A hashtable of server IP and ports to keep the current cluster status.

  • A string representation of the primary server's IP address.

  • A string representation of the currently running server's IP address.

  • An integer representation of the currently running server's port number.

Config is the class that represents the configuration of the server cluster. It reads the XML configuration file using the Java Document Object Model (JDOM) and stores the information within a Singleton implementation. The implementation itself is straightforward.

The server does not perform any tasks other than synchronizing with instances of itself in a server cluster and responding to server requests through a specified configurable port. For more actual processing functions, you need to implement the Application interface and configure the server cluster to call it. Application (Listing Ten) is an interface that lets you implement your own applications through the server.

Scheduler (Listing Eleven) is the default implementation of Application. The server then instantiates the Application object and calls its activate() method. The server knows which class to instantiate from the configuration file.

Using the Xander Server Cluster

The complete source code and related files for the Xander server cluster are available electronically. To build Xander, you also need Jakarta Ant (http://jakarta.apache.org/) and JDOM (http://www.jdom.org/) in your classpath. The class files are built into {XanderRoot}/bin.

You can start the Xander server by entering the command:

java xander.server.Server [-s <ip_address>] [-p <port_number>] [-P]

The -P option indicates that the server is started as the primary server. If no options are given, the default IP address is localhost and port number 5055. To test Xander, you can run it on more than one machine. However, if you do not have access to more machines, you can also bind your network adapter to multiple IP addresses.

To run Xander optimally, tweak the xander.xml (Listing Twelve) configuration file. This is necessary because no two networks are exactly the same. Because the server cluster depends heavily on network bandwidth and availability, Xander needs to be configured and personalized for each network. Of course, the alternative is automatic configuration depending on bandwidth and availability, but this increases the complexity of the mechanism and requires more resources.

The discovery and primary IP addresses and ports are arbitrary, except that the IP addresses must be in the multicast range (that is, within 224.0.0.0 to 239.255.255.255). All addresses in this range have the binary digits 1110 as their first 4 bits. This range is known as "Class D" to distinguish it from the more common Class A, B, and C addresses.

The max_server_retry and max_primary_retry variables depend on the network traffic and bandwidth. A high number means that it takes longer for the server to react in case of failure. In networks of high latency and instability, however, a low number results in the server being timed out when it is still up. A higher number of Xander servers running also needs a higher max_server_retry number. The number of retries should be between 3 and 30.

The wait timer variable is used as the multiplier to the randomly generated number that represents the number of seconds each thread waits before starting the next. A high number means that it takes a longer time to ping the servers to check if it is still up. Reaction time will be slower. A low number means faster reaction time, but potential risk if the network latency is high, which results in the ping not being able to react fast enough to return. An ideal number would be between 0.5 and 5.0.

The timeout variable is the number of seconds the server waits before timing out a multicast datagram packet. A high number means slower reaction time, causing the entire server cluster to react slower to the server's failure. A low number means faster reaction time, but likely results in greater numbers of false alarms, meaning that the max_retry variables need to be higher.

Finally, to implement the Application interface to perform specific processing, specify the full class path of your implementation in the application configuration.

Conclusion

There are many ways of achieving software fail-over; Xander is just one approach. This minimal mechanism does not cater to other related fail-over capabilities — failure recovery, load balancing, parallel processing, and the like. For example, should the server fail halfway through the processing, there is no built-in automatic recovery. Of course, you can extend Xander to provide your own recovery procedures. Also, failure is defined not as processing failure but hardware failure; that is, the server is assumed to be unreachable (see Figure 2). For software failures, you need to implement functions that detect the failure (Java's exception handling comes to mind) and activate the mechanism. Despite these shortcomings, Xander is a simple yet flexible implementation of a fail-over mechanism that can be extended and improved for other purposes.

DDJ

Listing One

public class Server {
 ...
    public static void main(String[] args) throws IOException {
      // use the current cluster status
    Status status = Status.getInstance();
    Config config = Config.getInstance();
 ...
    status.setRunningServer(server_ip);
    status.setRunningPort(server_port);
        
    if (_isPrimary) {
      status.promote();
    }
      status.addServer(server_ip, server_port);
 ...
      // create server-side TCP socket to listen to port
    ServerSocket serverSocket = null;
    boolean listening = true;

    // create server Socket
    try {
      serverSocket = 
        new ServerSocket(status.getPort(status.getRunningServer()),
          _SOCKET_BUFFER, InetAddress.getByName(status.getRunningServer()));
    }
    catch (Exception e) {
      ...
      System.exit(-1);
    }       

Back to Article

Listing Two

// Create the synchronizer to synchronize between servers
    Synchronizer synchronizer = new Synchronizer();
    String appClassName = config.getAppClassName();
    Application app;
      try {
      app = (Application)Class.forName(appClassName).newInstance();   
    }
    ...
      // listen on sockets from incoming connections from clients   
      while (listening) {   
        synchronizer.sync();
        (new MultiServerThread(serverSocket.accept())).start();
        if (status.isPrimary()) {
          System.out.println("*** PRIMARY ***");
          app.activate();
        }
       }
// close server socket
    serverSocket.close();
 ...

Back to Article

Listing Three

public void run() {
    try {
      PrintWriter out = new PrintWriter(_socket.getOutputStream(), true);
    BufferedReader in = 
       new BufferedReader(new InputStreamReader(_socket.getInputStream()));
    String inputLine = new String();
    while ((inputLine = in.readLine()) != null) {
        StringTokenizer st = new StringTokenizer(inputLine, " ");
        String command = st.nextToken();
        Vector parameters = new Vector();
                
        while (st.hasMoreTokens()) {
          parameters.add(st.nextElement());
        }                               
 ...
        // request for information
        else if (command.equalsIgnoreCase("ask")) {
          if (!parameters.isEmpty()) {
          out.println(doAsk(parameters));
        }
          else {
            out.println("##Error: Try ask <parameter>");
        }               
      }             
        else {
          out.println("##Error: No such command.");
        } // end request for information
    } // end while loop

Back to Article

Listing Four

 ...
  /** Asking for information */
  private String doAsk(Vector parameters) {
    // get the ask command
    String command = (String)parameters.get(0);
    
    if (command.equalsIgnoreCase("primary")) {
      return status.getPrimaryServer();
    }
    else if (command.equalsIgnoreCase("isAlive")) {
      return "*";
    }
    else if (command.equalsIgnoreCase("info")) {
      return "elipva Xander Server version 1.0.";
    }
  ...

Back to Article

Listing Five

public class Synchronizer {
  ...
  public void sync() {
    try {
      double randNo = (new Random((Calendar.getInstance())
          .getTime().getTime())).nextDouble() * config.getWaitTimer();
      double s = (new Double((Double.toString(randNo))
          .substring(0,5))).doubleValue();
    // declare own existence
      (new DeclareSelfThread()).start(); 
        wait(s);
    // check for other servers in the cluster
      (new CheckServersThread()).start();
    wait(s);
    // if this is a primary, declare itself as primary
      if (status.isPrimary())  {
      (new DeclarePrimaryThread()).start();
      wait(s);
        }
    // check if a primary server exists
        (new CheckPrimaryThread()).start();
    wait(s);  
    // check servers in the cluster, if down, remove it
      (new PingThread()).start();
    wait(s);                    
    }
 ...

Back to Article

Listing Six

 ...
  public void run() {    
    Hashtable servers = status.getServers();
    if (servers != null) {  
       Writer out = null;
       Enumeration hosts = servers.keys();

       while (hosts.hasMoreElements()) {
         String host = (String)hosts.nextElement();
        try {
          _socket =  new Socket(host,((Integer)servers.get(host)).intValue());
          Synchronizer.serverCount.put(host, new Integer(0));
        }
        catch (IOException e) {
          int count = 0;
          try {
            count = ((Integer)Synchronizer.serverCount.get(host)).intValue();
          }
          catch (NullPointerException ne) {}
          count++;
          int maxRetries = config.getMaxServerRetry();
          if (count > config.getMaxServerRetry()) {
            status.removeServer(host);
            Synchronizer.serverCount.remove(host);
          }
          else {
            Synchronizer.serverCount.put(host, new Integer(count));
          }
        }
  ...

Back to Article

Listing Seven

 ...
    int portOut = config.getDiscoveryPort();
    byte ttl = (byte) 1; // 1 byte ttl for subnet only
    InetAddress iaOut = null;
    try {
      iaOut = InetAddress.getByName(config.getDiscoveryServer());
    }
    catch (Exception e) {
      System.err.println("### Error : error 
                       contacting multicast port. Shutting down server.");
      e.printStackTrace();
      System.exit(-1);
    }
    String clusterStatus = status.getServersAsString();
    System.out.println("status >> " + clusterStatus);
      
    byte [] dataOut = 
     (status.getRunningServer() + ":" +  status.getRunningPort()).getBytes();
    DatagramPacket dpOut = 
         new DatagramPacket(dataOut, dataOut.length, iaOut, portOut);
    try {
      MulticastSocket msOut = new MulticastSocket(portOut);
      msOut.joinGroup(iaOut);
      msOut.send(dpOut, ttl);
      msOut.leaveGroup(iaOut);
      msOut.close();
      }
    catch (SocketException e) {
      System.err.println(e);
      e.printStackTrace();
    }
    ...

Back to Article

Listing Eight

 ...
    MulticastSocket socket = null;
    try {
      socket = new MulticastSocket(config.getDiscoveryPort());
      InetAddress address=InetAddress.getByName(config.getDiscoveryServer());
      DatagramPacket packet;
      byte[] buf = new byte[256];
      packet = new DatagramPacket(buf, buf.length);
      socket.setSoTimeout(config.getTimeout());
      socket.joinGroup(address);
      socket.receive(packet);
      String data = new String(packet.getData());
  
      StringTokenizer st = new StringTokenizer(data, ":");
      String host = st.nextToken();
      int port = Integer.parseInt((st.nextToken()).trim());

      status.addServer(host, port);

      socket.leaveGroup(address);
      socket.close();    
    }
    catch (InterruptedIOException e) {
      System.out.println("### timed out.");    
      socket.close();  
    }
    ...

Back to Article

Listing Nine

 ...
try {
  msIn.receive(dpIn);
  
  String primary = (new String(dpIn.getData())).trim();
  if (status.isPrimary() &&
 !primary.equals(status.getRunningServer())) {
    System.out.println("### More than 1 primary found.Demoting this server.");
    status.demote();
  }
  Synchronizer.primaryCount = 0;
}
catch (InterruptedIOException e) {
  Synchronizer.primaryCount++;
  System.out.println("### Timing out the primary server : " + 
                                             Synchronizer.primaryCount);  
  if (Synchronizer.primaryCount > config.getMaxPrimaryRetry()) {
    status.promote();
  }
}       
    ...

Back to Article

Listing Ten

package xander;
/** <code>Application</code> is an interface that is used to 
 * allow Xander to call the application that it runs. The default  
 * implementation of an Application is the Scheduler.
 * last modified - 6 April 2001<br> version 1.0<br>
 * author Chang Sau Sheong
 */
public interface Application {
    public void activate();
}

Back to Article

Listing Eleven

public class Scheduler implements Application {
    int _PERIOD = 10; // period in seconds
    Date _start = new Date();
    public void activate() {
        Date now = new Date();
        long start_millis = _start.getTime();
        long now_millis = now.getTime();
        
        if (now_millis > (start_millis + (_PERIOD * 1000))) {
            _start = now;
            System.out.println("DING DONG!!!");     
        }
    }
}

Back to Article

Listing Twelve

<?xml version="1.0"?>
<xander>   
<!--  Configuration for the synchronizer -->
    <synchronizer>
        <discovery>
            <server>230.0.0.1</server>
            <port>4446</port>
        </discovery>    
<primary>
            <server>225.0.0.2</server>
            <port>4000</port>
        </primary>
        <max_server_retry>15</max_server_retry>         
        <max_primary_retry>15</max_primary_retry>   
        <wait_timer>1.5</wait_timer>
        <timeout>5</timeout>
    </synchronizer>
   <!-- configuration for the application  -->
    <app>
        <name>Default Scheduler Application</name>
        <class_name>xander.Scheduler</class_name>
    </app>      
</xander>




Back to Article


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.