Game of Life Distributed: Preamble
Since the Game of Life simulation can be parallelized though domain decomposition of the grid, as I described in my last post, the parts of the decomposed grid can be placed on different nodes of a distributed cluster or network of workstations. Parallelizing code for distributed memory processing presents different opportunities and challenges from doing a similar conversion into a shared memory environment. I'm planning to touch on some of these opportunities and challenges with Game of Life as a model problem. But before I do that, I want to do a rudimentary review of distributed memory parallelism and message passing libraries. (I've only written about shared memory parallelism to this point and I want to make sure we're all using the same vocabulary.)
White PapersMore >>
If you're familiar with the Message-Passing Interface (MPI) library and programming multiple processes on a cluster of nodes, you can probably skip over the rest of this post and look for the next one, which details a distributed memory version of Game of Life. Just for fun, if you're not going to finish this post, you should have a bowl of Life cereal (or even go out and buy yourself a whole box to snack on) between now and then.
As the name implies, the concurrent computations are distributed from each other across separate nodes in the system. The MPI specification has a couple hundred different functions. This seems like a lot (and it is), but you really only need to master six different functions to be able to write just about any parallel computation you can imagine. These six functions are:
When you launch an MPI application a number of processes are started, typically one process per node in the cluster. Before any other MPI function can be called, each process must call
MPI_Init() in order to prepare the process for execution within the MPI runtime. At the end of the computation, or at least after all message passing has been completed within the process, a call to
MPI_Finalize() will free up any memory allocated by the MPI library.
Part of the MPI initialization is to place each process into the global communicator. MPI uses communicators to define communication patterns between processes. Every process will be part of the global communicator, known as
MPI_COMM_WORLD. The function
MPI_Comm_size() returns the number of processes that are contained in a given communicator. Calling this function with the
MPI_COMM_WORLD communicator allows the calling process to know how many total processes were launched. Within a communicator, each process will have a unique identification number or rank.
MPI_Comm_rank() returns the rank of the calling process within the specified communicator. Processes are numbered from 0 to
In many of the MPI programs I've written, the first three executable lines were something like the following:
MPI_Init(argc, argv); MPI_Comm_size(MPI_COMM_WORLD, &num_procs); MPI_Comm_rank(MPI_COMM_WORLD, &myid);
This sets up each process in the
MPI_COMM_WORLD communicator and lets it know how many sibling processes there are in the execution (
num_procs) and its place within that family of processes (
Where threads simply write a value into a shared location in order to share data between themselves, nodes in the cluster have nothing shared. Each node is connected to one or more other nodes through a network and there are functions to communicate between processes across the network wires. This communication is used to share data and tasks between distributed processes. The rank of a process is used as a destination and a source for sending and receiving messages with the last two functions from the list of six.
MPI_Send() function performs a blocking send of the specified data to the specified destination. The parameter list includes the address of the local buffer memory containing the message (data) to be sent, the number of elements in the message, the data type of the elements, the rank of the destination process, a user-defined integer tag (to differentiate messages from the same process or to label a message's contents), and the communicator to be used. It is a "blocking" send because the caller is blocked until the message transfer is complete.
On the other side of the network, a process calls
MPI_Recv() to perform a blocking receive of the specified data from the specified source. The parameter list includes the address of the local buffer to store the received message (data), the number of elements in the message, the data type of the elements, the rank of the process that is sending the expected message, a user-defined integer tag, and the communicator to be used. It is a "blocking" receive because the caller is blocked until a message with a matching tag and source is received. Even when there are other messages sitting in the node's message queue, if none of those messages has the right tag and source, the
MPI_Recv() continues to block the calling process. This is like waiting for a love letter to arrive in your mail box. Until you find that red envelope (tag) from that special someone (source), you will ignore all other letters (even other letters from the right source in white envelopes or letters in red envelopes from other people). MPI does have a facility to match any source rank or any tag with wildcard parameters to avoid delays when multiple source processes may be sending data.
One favorable consequence of having to share data through an explicit send and receive protocol is the fact that there can never be a storage conflict (data race) between two processes. However, this deliberate need to send and receive messages between processes can be a difficult concept to programmers that are unfamiliar with parallelism or those that have done exclusively shared memory parallelism.
While there are no data races, there is the chance that processes can deadlock if you are not careful in how the communication between processes is established. The easiest example of this is when processes need to exchange (send and receive) data with some other process. If each process first attempts to receive data from the sibling process, all processes will be blocked since no process will be sending data to satisfy the blocking receive function call.
One way to avoid such a situation is to use non-blocking, or asynchronous, sends or receives. With an asynchronous send, the sender does not wait for the message to be buffered by the MPI runtime and execution returns to the sending process. To ensure that the correct data is sent, the sending process must not modify the contents of the local message buffer memory until it has guaranteed the message has been sent.
For asynchronous receive, the calling process notifies the system that it has a buffer prepared to receive data from another process. Control is then immediately returned to the calling process, which continues on with processing; however, nothing should be read from the receive buffer until the process guarantees that the message transfer has been successfully completed. Deadlock is avoided by allowing a process to post several receives, regardless of whether the sender has actually sent the message or not, and then block for the messages only when the data is required.
I've glossed over a lot of the nitty gritty details of the functions described, but I just want to give you a simple peek into the modest details of programming for distributed parallelism. Also, there are other MPI functions that make processing messages and coordinating the computations between processes so much easier than the two basic point-to-point communication functions I've just described. Perhaps we can delve into those at another time.
One final consideration that I want to impart to you, before I run out of space here, is the fact that transforming a serial code to a distributed memory version requires that the whole program needs to work with whatever distribution of work and data you implement. If you add threads to a serial code, especially when using something like OpenMP or Microsoft's Task Parallel Library, you can get away with adding parallelism a little bit at a time. Since all memory is shared, if a portion of the code remains serial, that code has access to all the shared data that was touched or computed on by threads. You can modify other portions of the application at a later time if you want to increase the performance. Not so with distributed computations. Since the "shared" data will be divided and stored across the nodes of a cluster, all computations that access such data need to be coded to exchange data through the messaging library if the required data is no resident of the node. Thus, the whole application must be modified, then debugged and tuned. It can be a big undertaking, so be sure to stock up on plenty of snacks, like Life cereal, for when it gets sticky.