Channels ▼
RSS

Task Farming & the Message Passing Interface

Source Code Accompanies This Article. Download It Now.


Sep03: Task Farming & the Message Passing Interface

Paulo is a researcher in distributed systems at the University of Coimbra, Portugal. He can be reached at pmarques@dei.uc.pt.


Whenever it is necessary to solve a computationally intensive problem, there are three different approaches that can be undertaken—you can work faster, work smarter, and/or work harder. This article is about working harder. In particular, I explain how you can use the Message Passing Interface (MPI) to parallelize applications using the task-farming paradigm. Task farming is the most simple and commonly used way of parallelizing applications. A master is set up, which takes care of creating tasks and distributing them among workers. The workers perform the tasks and send the results back to the master, which reassembles them.

MPI is the standard for developing parallel applications using message passing (http://www.mpi-forum.org/). It is based on exchanging messages among processes. There are implementations for any size and type of machine. This includes everything from clusters of PCs running Windows and Linux, to supercomputers such as Cray X1 or IBM SP Power3 systems. For this article, I use the MPICH MPI implementation from the Argonne National Laboratory on a Linux cluster (http://www-unix.mcs.anl.gov/mpi/mpich/), and WMPI from CriticalSoftware on a Windows cluster (http://www.criticalsoftware.com/hpc/). If you don't have access to multiple machines, you can still run the program using multiple processes on a single machine.

MPI Concepts

MPI is based on the concept of communicating processes. When users execute an MPI program, they specify the number of processes with which it should run. You can specify the machines to be used or let the MPI run time choose them. The processes are able to communicate by exchanging messages. A community of communicating processes is called a "communicator." In MPI, you can specify different communicators that represent different subcommunities of processes that exchange messages in some order and topology. The default communicator, which lets all processes exchange messages among themselves, is called "MPI_COMM_WORLD." Inside a communicator, each process is attributed a number called "rank." In MPI_COMM_WORLD, the ranks of the processes range from 0 to the total number of processes minus 1.

When programming with MPI, the first call before any other MPI function must be to MPI_Init(). To shut down the MPI library, MPI_Finalize() must be called. After that, no more calls to MPI functionality can be made. At any given moment, a process can find out its rank in a communicator via the MPI_Comm_rank() function, and the size of the communicator using MPI_Comm_size().

The most basic functions for exchanging messages are MPI_Send() (for sending messages) and MPI_Recv() (for receiving them). Listing One is a "Hello World" program where any other process sends a message to process 0, which prints it out. Since MPI uses language-independent data types, you must specify the data type being employed when sending and receiving messages. Listing One uses MPI_CHAR because text messages are being exchanged. Language-independent data types are mandatory so that heterogeneous clusters can be used. If you have to pass complex structures among processes, there are MPI functions for building new user-defined types. Nonetheless, if the environments where the programs are going to be executed are homogeneous, then using MPI_BYTE and sending the structure directly is alright, although portability is hindered. Other important data types include MPI_INT, MPI_LONG, MPI_FLOAT, and MPI_DOUBLE.

When using MPI_Send(), you have to indicate a communicator and a tag for the message. The tag is an integer that you can use to differentiate among different classes of messages. When calling MPI_Recv(), a tag must also be specified. The first message with the corresponding tag is delivered or the process is blocked until one arrives. If any tag is acceptable, then the MPI_ANY_TAG constant can be used. When using MPI_Recv(), it is also possible to specify a detailed error return status. If one is not desired, the constant MPI_STATUS_IGNORE is used.

In terms of calling semantics, MPI_Send() and MPI_Recv() are blocking—the functions will not return until MPI has safely stored the message to be sent, or was able to retrieve a message for delivery. A call to MPI_Send() does not guarantee that the message has been delivered on the other side, just that it is safely underway.

MPI is a complete and somewhat complex standard. It supports different message passing semantics, collective and one-sided communication, and dynamic process creation, among other features.

Task Farming

Different problems must be parallelized in different ways. One of the most common and useful ways of parallelizing an application is using the task-farm model. Basically, the problem is divided into a number of independent tasks (jobs) that must be evaluated. One of the processes (master) is responsible for generating these tasks and distributing them among the available worker processes. Each worker performs a certain task and sends the result back to the master. In turn, the master generates a new job that is sent to that worker. This proceeds until no more jobs are available, at which time the problem is solved.

Figure 1 shows the basic structure of a task farm. Each worker is executing in an infinite loop. In the loop, a worker starts by sending a message to the master requesting a job. It then waits until a message arrives. The message can be either a new job or an order to quit. The order to quit occurs when the master no longer has any jobs to distribute. Upon receiving a new job, the worker performs the task and sends the result back to the master.

The master is also executing in an infinite loop. It starts by waiting for a message from any worker. Two types of messages are possible: a request for a new job or the result of a completed job. If the message type indicates a completed job, that result is saved. Then, if there is still work to be done, the master generates a new job and sends it to the worker. If no more jobs are available, an order to quit is sent. When all the workers have terminated, the master can terminate.

While this is the basic algorithm, in practice, it must be adapted whenever an application is being parallelized. For instance, in many cases, the jobs may be generated prior to receiving a request for work from a worker, or the master can be continuously receiving partial solutions from workers when they are processing the jobs. Each application is specific. Even so, the common structure is the one discussed.

One important consideration when using task farming is the granularity of the jobs. The total performance of a parallel application is dependent on many factors. One especially important factor is the ratio of communication over computation. If the jobs are small, each worker is constantly communicating with the master, which may be a slow operation due to latency and available bandwidth. If the jobs are too large—and especially if they are not of equal size (which is common)—then it is possible to create large asymmetries between the workers. This generates load-balancing problems, where some of the workers are doing much more work than others: Thus, the total speedup of the application suffers. Also, larger jobs normally mean that the master has to spend much more time on their generation, which can become a serious bottleneck. The term "granularity" is normally used to designate the size of the jobs being generated.

Parallelizing the N-Queens Problem

N-Queens is one of the most studied problems in computer science. Although it intrinsically does not bare any special interest, it constitutes a good base for studying algorithms and synthetically benchmarking systems. The problem consists in finding all the solutions for the placement of N-Queens on an N×N chess board. The only condition to abide is that no two queens attack themselves. What this means is that no queen can share a row, a column, or a diagonal with any other queen. Figure 2 shows a solution for a 6×6 board.

The most straightforward way of solving the problem is by using a backtracking algorithm. Each possible configuration is represented by an N-size vector, where each element represents the row where a queen is. For instance, the board in Figure 2 is represented by [1,3,5,0,2,4]. The algorithm works by having a nonattacking configuration up until column i, trying to place a queen in the i+1 column. If a queen is successfully placed in a nonattacking configuration, the algorithm proceeds to the next column. This continues until the last column, finding a solution, or a placement is made in an attacking configuration. When this happens, backtracking occurs so that other placements can be tried. Backtracking guarantees that every possible solution is eventually visited. Listing Two partially shows the algorithm (the complete code is available electronically; see "Resource Center," page 5).

While simple, this is a computationally intensive process. Although it is possible to optimize it and even to find much better algorithms (researchers have been able to solve the problem in almost linear time), for this article, I am not interested in the "working smarter" approach. Instead, the aim is to create a parallel version of N-Queens using the task-farm paradigm. For simplifying the task, I only count the solutions for each problem size. Even so, if you're interested in seeing all the solutions, it is just a matter of saving each one as soon as it is generated.

Parallelizing the algorithm in Listing Two is straightforward. In this case, a job consists in a valid placement of queens up until a certain column. A result consists in the number of solutions found for that particular job. A worker must find all the solutions valid for that board prefix, then send the number back to the master.

In Listing Three, the master, generates valid configurations using the normal algorithm up until column GRANULARITY. Whenever a valid configuration is found, it sends it to a worker using the send_job_worker() function; see Listing Four.

The function send_job_worker() is of crucial importance. When invoked, it blocks until a message is received from a worker. That message contains the number of solutions that a worker has found in the last job that it processed. It also indicates that the worker is ready to perform a new job. After receiving the message, the master sends it the precomputed board prefix, which constitutes the new job. The function also returns the number of solutions found by that worker. This allows the master to keep an accurate count of the number of solutions found at any time, updating the n_solutions variable.

Looking back at Listing Three, you see that master_place_queen() exits as soon as no more jobs exist. Nevertheless, there may still be workers executing tasks. Thus, the master still has to collect any pending results sent by the workers and order them to quit. This is accomplished in wait_remaining_results() (Listing Five). After a quit message has been sent to all the workers, the master can then exit.

The implementation of the worker is in direct correspondence to the master (Listing Six). The whole process starts by having the worker send a message, where it states that the number of solutions found during the last job is 0. This message is innocuous and allows the master to send it the first task, starting up the whole procedure. The main part of the worker consists in a while loop, where it blocks until a new message is received. This message can be either a new job or an order to quit. If it is a new job, it processes the work by calling the worker_place_queen(). This routine is exactly equal to the place_queen() routine of the serial version (Listing Two). One important point is that the worker starts by computing at column GRANULARITY, which is where the master has left off. After the solutions for a particular job are computed, the worker sends the number of solutions found to the master, and blocks waiting for a new message.

Running the Program

The complete program (available electronically) is a bit more complete than the one presented here. The granularity of the jobs is configurable, and timing instructions have also been included for doing performance measurements. Yet, it closely follows the one presented here.

For running the parallel version of the N-Queens problem, you will need an MPI implementation. I have compiled and run this program using both WMPI (Windows) and MPICH (Linux). Included with the source code are instructions on how to compile and run the program using both Windows and Linux.

Because no one in the high-performance computing community that parallelizes applications is allowed to write an article without presenting a speedup graph, Figure 3 shows the results obtained for solving the problem for a 15×15 board. The results were obtained using WMPI II on a cluster of 1-GHz Celeron machines, running Windows 2000.

DDJ

Listing One

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define BUF_SIZE            80
int main(int argc, char* argv[])
{  
  int  id;                  // The rank of this process
  int  n_processes;         // The size of the world
  char buffer[BUF_SIZE];    // A message buffer

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &id);
  MPI_Comm_size(MPI_COMM_WORLD, &n_processes);

  if (id == 0) {
    // Process 0 - Receive messages from all other processes
    for (int i=1; i<=n_processes-1; i++) {
      MPI_Recv(buffer, BUF_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
               MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      printf("%s", buffer);
    }
  }
  else {
    // All other processes send a message to process 0
    sprintf(buffer, "Hello, I'm process %d\n", id);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
  }
  MPI_Finalize();
  return 0;
}

Back to Article

Listing Two

// Solve the NQueens problem for the <size> NxN board
int n_queens(int size)
{
  int board[size];
  int n_solutions = place_queen(0, board, size);
  return n_solutions;
}
int place_queen(int column, int board[], int size)
{
  int n_solutions = 0;
  // Try to place a queen in each line of <column>
  for (int i=0; i<size; i++) {
    board[column] = i;
    // Check if this board is still a solution
    bool is_sol = true;
    for (int j=column-1; j>=0; j--) {
      if ((board[column] == board[j])               ||
          (board[column] == board[j] - (column-j))  ||
          (board[column] == board[j] + (column-j))) {
        is_sol = false;
        break;
      }
    }
    if (is_sol) {                    // It is a solution!
      if (column == size-1) {
        // If this is the last column, printout the solution
        ++n_solutions;
        print_solution(board, size);
      } else {
        // The board is not complete. Try to place the queens
        // on the next level, using the current board
        n_solutions+= place_queen(column+1, board, size);
      }
    }
  }
  return n_solutions;
}

Back to Article

Listing Three

// The master process (returns the number of solutions found)
int master(int size)
{
  int board[size];
  int n_solutions;
  n_solutions = master_place_queen(0, board, size);
  n_solutions+= wait_remaining_results();
}
int master_place_queen(int column, int board[], size)
{
  int n_solutions = 0;
  for (int i=0; i<size; i++) {
    // Place the queen on the correct line of <column>
    board[column] = i;
    // Check if this board is still a solution
    bool is_sol = true;
    for (int j=column-1; j>=0; j--) {
      if ((board[column] == board[j])               ||
          (board[column] == board[j] - (column-j))  ||
          (board[column] == board[j] + (column-j))) {
        is_sol = false;
        break;
      }
    }
    if (is_sol) {                    // If it is a solution...
      if (column == GRANULARITY-1) {  
        // If we are at the last level (granularity of the job), 
        // this is a job for sending to a worker
        n_solutions+= send_job_worker(board, size);
      }
      else {
        // Not in the last level, try to place queens in the 
        // next one using the current board
        n_solutions+= master_place_queen(column+1, board, size);
      }
    }
  }
  return n_solutions;
}

Back to Article

Listing Four

int send_job_worker(int board[], int size)
{
  int n_solutions = 0;      // The number of solutions found meanwhile
  job to_do;                // The job to do
  // Build the job
  to_do.type = DO_WORK;
  for (int i=0; i<GRANULARITY; i++)
    to_do.board[i] = board[i];
  // Receive the last result from a worker
  worker_msg msg;
  MPI_Recv(&msg, sizeof(msg), MPI_BYTE, MPI_ANY_SOURCE, 
           MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  n_solutions = msg.solutions_found;
  // Send the new job to the worker
  MPI_Send(&to_do, sizeof(to_do), MPI_BYTE, msg.origin, 0, MPI_COMM_WORLD);
  return n_solutions;
}

Back to Article

Listing Five

int wait_remaining_results()
{
  int n_solutions = 0;
  // Wait for remaining results, sending a quit whenever a new result arrives
  job byebye;
  byebye.type = QUIT;
  while (n_workers > 0)
  {
    // Receive a message from a worker
    worker_msg msg;
    MPI_Recv(&msg, sizeof(msg), MPI_BYTE, MPI_ANY_SOURCE, 
             MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    n_solutions+= msg.solutions_found;
    MPI_Send(&byebye, sizeof(byebye),MPI_BYTE,msg.origin, 0, MPI_COMM_WORLD);
    --n_workers;
  }
  return n_solutions;
}

Back to Article

Listing Six

void worker(int size)
{
  int n_solutions;
  // There is a default message named ask_job which lets a worker request a 
  /// job reporting the number of solutions found in the last iteration
  worker_msg ask_job;
  ask_job.origin          = id;
  ask_job.solutions_found = 0;
  // Request initial job
  MPI_Send(&ask_job, sizeof(ask_job), MPI_BYTE, MASTER, 0, MPI_COMM_WORLD);
  while (true) {
    // Wait for a job or a quit message
    job work_to_do;
    MPI_Recv(&work_to_do, sizeof(work_to_do), MPI_BYTE, MPI_ANY_SOURCE, 
             MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    if (work_to_do.type == QUIT)
      break;
    n_solutions = worker_place_queen(GRANULARITY, work_to_do.board, size);
    // Ask for more work
    ask_job.solutions_found = n_solutions;
    MPI_Send(&ask_job, sizeof(ask_job), MPI_BYTE, MASTER, 0, MPI_COMM_WORLD);
  }
}

Back to Article


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.