This two-part article covers the basics of parallel computing. Part 1 began with an overview, including concepts and terminology associated with parallel computing. The topics of parallel memory architectures and programming models were then explored. This installment presents a discussion on a number of issues related to designing parallel programs, concluding with several examples of how to parallelize simple serial programs. Provided courtesy Lawrence Livermore National Laboratory.
Designing Parallel Programs
Automatic vs. Manual Parallelization
Designing and developing parallel programs has characteristically been a very manual process. The programmer is typically responsible for both identifying and actually implementing parallelism.
Very often, manually developing parallel codes is a time consuming, complex, error-prone and iterative process.
For a number of years now, various tools have been available to assist the programmer with converting serial programs into parallel programs. The most common type of tool used to automatically parallelize a serial program is a parallelizing compiler or pre-processor.
A parallelizing compiler generally works in two different ways:
Fully Automatic
The compiler analyzes the source code and identifies opportunities for parallelism.
The analysis includes identifying inhibitors to parallelism and possibly a cost weighting on whether or not the parallelism would actually improve performance.
Loops (do, for) loops are the most frequent target for automatic parallelization.
Programmer Directed
Using "compiler directives" or possibly compiler flags, the programmer explicitly tells the compiler how to parallelize the code.
May be able to be used in conjunction with some degree of automatic parallelization also.
If you are beginning with an existing serial code and have time or budget constraints, then automatic parallelization may be the answer. However, there are several important caveats that apply to automatic parallelization:
Wrong results may be produced
Performance may actually degrade
Much less flexible than manual parallelization
Limited to a subset (mostly loops) of code
May actually not parallelize code if the analysis suggests there are inhibitors or the code is too complex
Most automatic parallelization tools are for Fortran
The remainder of this section applies to the manual method of developing parallel codes.
Understand the Problem and the Program
Undoubtedly, the first step in developing parallel software is to first understand the problem that you wish to solve in parallel. If you are starting with a serial program, this necessitates understanding the existing code also.
Before spending time in an attempt to develop a parallel solution for a problem, determine whether or not the problem is one that can actually be parallelized.
Example of Parallelizable Problem:
Calculate the potential energy for each of several thousand independent conformations of a molecule. When done, find the minimum energy conformation.
This problem is able to be solved in parallel. Each of the molecular conformations is independently determinable. The calculation of the minimum energy conformation is also a parallelizable problem.
Example of a Non-parallelizable Problem:
Calculation of the Fibonacci series (1,1,2,3,5,8,13,21,...) by use of the formula:
F(k + 2) = F(k + 1) + F(k)
This is a non-parallelizable problem because the calculation of the Fibonacci sequence as shown would entail dependent calculations rather than independent ones. The calculation of the k + 2 value uses those of
both k + 1 and k. These three terms cannot be calculated independently and therefore, not in parallel.
Identify the program's hotspots:
Know where most of the real work is being done. The majority of scientific and technical programs usually accomplish most of their work in a few places.
Profilers and performance analysis tools can help here
Focus on parallelizing the hotspots and ignore those sections of the program that account for little CPU usage.
Identify bottlenecks in the program
Are there areas that are disproportionately slow, or cause parallelizable work to halt or be deferred? For example, I/O is usually something that slows a program down.
May be possible to restructure the program or use a different algorithm to reduce or eliminate unnecessary slow areas
Identify inhibitors to parallelism. One common class of inhibitor is data dependence, as demonstrated by the Fibonacci sequence
above.
Investigate other algorithms if possible. This may be the single most important consideration when designing a parallel application.
Partitioning
One of the first steps in designing a parallel program is to break the problem into discrete "chunks" of work that can be distributed to multiple tasks. This is known as decomposition or partitioning.
There are two basic ways to partition computational work among parallel tasks: domain decomposition and functional decomposition.
Domain Decomposition:
In this type of partitioning, the data associated with a problem is decomposed. Each parallel task then works on a portion of the data.
There are different ways to partition data:
Functional Decomposition:
In this approach, the focus is on the computation that is to be performed rather than on the data manipulated by the computation. The problem is decomposed according to the work that must be done. Each task then performs a portion of the overall work.
Functional decomposition lends itself well to problems that can be
split into different tasks. For example:
Ecosystem Modeling
Each program calculates the population of a given group, where each group's growth depends on that of its neighbors. As time progresses, each process calculates its current state, then exchanges information with the neighbor populations. All tasks then progress to calculate the state at the next time step.
Signal Processing
An audio signal data set is passed through four distinct computational filters. Each filter is a separate process. The first segment of data must pass through the first filter before progressing to the second. When it does, the second segment of data passes through the first filter. By the time the fourth segment of data is in the first filter, all four tasks are busy.
Climate Modeling
Each model component can be thought of as a separate task. Arrows represent exchanges of data between components during computation: the atmosphere model generates wind velocity data that are used by the ocean model, the ocean model generates sea surface temperature data that are used by the atmosphere model, and so on.
Combining these two types of problem decomposition is common and natural.
Communications
Who Needs Communications?
The need for communications between tasks depends upon your problem:
You DON'T need communications
Some types of problems can be decomposed and executed in parallel with virtually no need for tasks to share data. For example, imagine an image processing operation where every pixel in a black and white image needs to have its color reversed. The image data can easily be distributed to multiple tasks that then act independently of each other to do their portion of the work.
These types of problems are often called embarrassingly parallel because they are so straight-forward. Very little inter-task communication is required.
You DO need communications
Most parallel applications are not quite so simple, and do require tasks to share data with each other. For example, a 3-D heat diffusion problem requires a task to know the temperatures calculated by the tasks that have neighboring data. Changes to neighboring data has a direct effect on that task's data.
Factors to Consider:
There are a number of important factors to consider when designing your program's inter-task communications:
Cost of communications
Inter-task communication virtually always implies overhead.
Machine cycles and resources that could be used for computation are instead used to package and transmit data.
Communications frequently require some type of synchronization between tasks, which can result in tasks spending time "waiting" instead of doing work.
Competing communication traffic can saturate the available network bandwidth, further aggravating performance problems.
Latency vs. Bandwidth
latency is the time it takes to send a minimal (0 byte) message from point A to point B. Commonly expressed as microseconds.
bandwidth is the amount of data that can be communicated per unit of time. Commonly expressed as megabytes/sec.
Sending many small messages can cause latency to dominate communication overheads. Often it is more efficient to package small messages into a larger message, thus increasing the effective communications bandwidth.
Visibility of communications
With the Message Passing Model, communications are explicit and generally quite visible and under the control of the programmer.
With the Data Parallel Model, communications often occur transparently to the programmer, particularly on distributed memory architectures. The programmer may not even be able to know exactly how inter-task communications are being accomplished.
Synchronous vs. asynchronous communications
Synchronous communications require some type of "handshaking" between tasks that are sharing data. This can be explicitly structured in code by the programmer, or it may happen at a lower level unknown to the programmer.
Synchronous communications are often referred to as blocking communications since other work must wait until the communications have completed.
Asynchronous communications allow tasks to transfer data independently from one another. For example, task 1 can prepare and send a message to task 2, and then immediately begin doing other work.When task 2 actually receives the data doesn't matter.
Asynchronous communications are often referred to as non-blocking communications since other work can be done while the communications are taking place.
Interleaving computation with communication is the single greatest benefit for using asynchronous communications.
Scope of communications
Knowing which tasks must communicate with each other is critical during the design stage of a parallel code. Both of the two scopings described below can be implemented synchronously or asynchronously.
Point-to-point - involves two tasks with one task acting as the sender/producer of data, and the other acting as the receiver/consumer.
Collective - involves data sharing between more than two tasks, which are often specified as being members in a common group, or collective. Some common variations (there are more):
Efficiency of communications
Very often, the programmer will have a choice with regard to factors that can affect communications performance. Only a few are mentioned here.
Which implementation for a given model should be used? Using the Message Passing Model as an
example, one MPI implementation may be faster on a given hardware platform than another.
What type of communication operations should be used? As mentioned previously, asynchronous communication operations can improve overall program performance.
Network media - some platforms may offer more than one network for communications. Which one is best?
Overhead and Complexity
Finally, realize that this is only a partial list of things to consider!!!
Synchronization
Types of Synchronization:
Barrier
Usually implies that all tasks are involved
Each task performs its work until it reaches the barrier. It then stops, or "blocks".
When the last task reaches the barrier, all tasks are synchronized.
What happens from here varies. Often, a serial section of work must be done. In other cases, the tasks are automatically released to continue their work.
Lock / semaphore
Can involve any number of tasks
Typically used to serialize (protect) access to global data or a section of code. Only one task at a time may use (own) the lock / semaphore / flag.
The first task to acquire the lock "sets" it. This task can then safely (serially) access the protected data or code.
Other tasks can attempt to acquire the lock but must wait until the task that owns the lock releases it.
Can be blocking or non-blocking
Synchronous communication operations
Involves only those tasks executing a communication operation
When a task performs a communication operation, some form of coordination is required with the other task(s) participating in the communication. For example, before a task can perform a send operation, it must first receive an acknowledgment from the receiving task that it is OK to send.
Discussed previously in the Communications section.
Data Dependencies
Definition:
A dependence exists between program statements when the order of statement execution affects the results of the program.
A data dependence results from multiple use of the same location(s) in storage by different tasks.
Dependencies are important to parallel programming because they are one of the primary inhibitors to parallelism.
The value of A(J-1) must be computed before the value of A(J), therefore A(J) exhibits a data dependency on A(J-1). Parallelism is inhibited.
If Task 2 has A(J) and task 1 has A(J-1),
computing the correct value of A(J) necessitates:
Distributed memory architecture - task 2 must obtain the value of A(J-1) from task 1 after task 1 finishes its computation
Shared memory architecture - task 2 must read A(J-1) after task 1 updates it
Loop independent data dependence
task 1 task 2
------ ------
X = 2 X = 4
. .
. .
Y = X**2 Y = X**3
As with the previous example, parallelism is inhibited. The value of Y is dependent on:
Distributed memory architecture - if or when the value of X is communicated between the tasks.
Shared memory architecture - which task last stores the value of X.
Although all data dependencies are important to identify when designing parallel programs, loop carried dependencies are particularly important since loops are possibly the most common target of parallelization efforts.
How to Handle Data Dependencies:
Distributed memory architectures - communicate required data at synchronization points.
Shared memory architectures -synchronize read/write operations between tasks.
Load Balancing
Load balancing refers to the practice of distributing work among tasks so that all tasks are kept busy all of the time. It can be considered a minimization of task idle time.
Load balancing is important to parallel programs for performance reasons. For example, if all tasks are subject to a barrier synchronization point, the slowest task will determine the overall performance.
How to Achieve Load Balance:
Equally partition the work each task receives
For array/matrix operations where each task performs similar work, evenly distribute the data set among the tasks.
For loop iterations where the work done in each iteration is similar, evenly distribute the iterations across the tasks.
If a heterogeneous mix of machines with varying performance characteristics are being used, be sure to use some type of performance analysis tool to detect any load imbalances. Adjust work accordingly.
Use dynamic work assignment
Certain classes of problems result in load imbalances even if data is evenly distributed among tasks:
Sparse arrays - some tasks will have actual data to work on while others have mostly "zeros".
Adaptive grid methods - some tasks may need to refine their mesh while others don't.
N-body simulations - where some particles may migrate to/from their original task domain to another task's; where the particles owned by some tasks require more work than those owned by other tasks.
When the amount of work each task will perform is intentionally variable, or is unable to be predicted, it may be helpful to use a scheduler - task pool approach. As each task finishes its work, it queues to get a new piece of work.
It may become necessary to design an algorithm which detects and handles load imbalances as they occur dynamically within the code.
Granularity
Computation / Communication Ratio:
In parallel computing, granularity is a qualitative measure of the ratio of computation to communication.
Periods of computation are typically separated from periods of communication by synchronization events.
Fine-grain Parallelism:
Relatively small amounts of computational work are done between communication events
Low computation to communication ratio
Facilitates load balancing
Implies high communication overhead and less opportunity for performance enhancement
If granularity is too fine it is possible that the overhead required for communications and synchronization between tasks takes longer than the computation.
Coarse-grain Parallelism:
Relatively large amounts of computational work are done between communication/synchronization events
High computation to communication ratio
Implies more opportunity for performance increase
Harder to load balance efficiently
Which is Best?
The most efficient granularity is dependent on the algorithm and the hardware environment in which it runs.
In most cases the overhead associated with communications and synchronization is high relative to execution speed so it is advantageous to have coarse granularity.
Fine-grain parallelism can help reduce overheads due to load imbalance.
I/O
The Bad News:
I/O operations are generally regarded as inhibitors to parallelism
Parallel I/O systems are immature or not available for all platforms
In an environment where all tasks see the same filespace, write operations will result in file overwriting
Read operations will be affected by the fileserver's ability to handle multiple read requests at the same time
I/O that must be conducted over the network (NFS, non-local) can cause severe bottlenecks
The Good News:
Some parallel file systems are available. For example:
GPFS: General Parallel File System for AIX (IBM)
Lustre: for Linux clusters (Cluster File Systems, Inc.)
PVFS/PVFS2: Parallel Virtual File System for Linux clusters (Clemson/Argonne/Ohio State/others)
PanFS: Panasas ActiveScale File System for Linux clusters (Panasas, Inc.)
HP SFS: HP StorageWorks Scalable File Share. Lustre based parallel file system (Global File System for Linux) product from HP
The parallel I/O programming interface specification for MPI has been available since 1996 as part of MPI-2. Vendor and "free" implementations are now commonly available.
Some options:
If you have access to a parallel file system, investigate using it. If you don't, keep reading...
Rule #1: Reduce overall I/O as much as possible
Confine I/O to specific serial portions of the job, and then use parallel communications to distribute data to parallel tasks. For example, Task 1 could read an input file and then communicate required data to other tasks. Likewise, Task 1 could perform write operation after receiving required data from all other tasks.
For distributed memory systems with shared filespace, perform I/O in local, non-shared filespace.
For example, each processor may have /tmp filespace which can used. This is usually much more efficient than performing I/O over the network to one's home directory.
Create unique filenames for each tasks' input/output file(s)
Limits and Costs of Parallel Programming
Amdahl's Law:
Amdahl's Law states that potential program
speedup is defined by the fraction of code (P) that can be parallelized:
1
speedup = --------
1 - P
If none of the code can be parallelized, P = 0 and the speedup = 1 (no speedup). If all of the code is parallelized, P = 1 and the speedup is infinite (in theory).
If 50% of the code can be parallelized, maximum speedup = 2, meaning the code will run twice as fast.
Introducing the number of processors performing the parallel fraction of work, the relationship can be modeled by:
1
speedup = ------------
P + S
---
N
where P = parallel fraction, N = number of processors and S = serial fraction.
It soon becomes obvious that there are limits to the scalability of parallelism. For example, at P = .50, .90 and .99 (50%, 90% and 99% of the code is parallelizable):
speedup
--------------------------------
N P = .50 P = .90 P = .99
----- ------- ------- -------
10 1.82 5.26 9.17
100 1.98 9.17 50.25
1000 1.99 9.91 90.99
10000 1.99 9.91 99.02
However, certain problems demonstrate increased performance by increasing the problem size. For example:
We can increase the problem size by doubling the grid dimensions and halving the time step. This results in four times the number of grid points and twice the number of time steps. The timings then look like:
Problems that increase the percentage of parallel time with their size are more scalable than problems with a fixed percentage of parallel time.
Complexity:
In general, parallel applications are much more complex than corresponding serial applications, perhaps an order of magnitude. Not only do you have multiple instruction streams executing at the same time, but you also have data flowing between them.
The costs of complexity are measured in programmer time in virtually every aspect of the software development cycle:
Design
Coding
Debugging
Tuning
Maintenance
Adhering to "good" software development practices is essential when when working with parallel applications - especially if somebody besides you will have to work with the software.
Portability:
Thanks to standardization in several APIs, such as MPI, POSIX threads, HPF and OpenMP, portability issues with parallel programs are not as serious as in years past. However...
All of the usual portability issues associated with serial programs apply to parallel programs. For example, if you use vendor "enhancements" to Fortran, C or C++, portability will be a problem.
Even though standards exist for several APIs, implementations will differ in a number of details, sometimes to the point of requiring code modifications in order to effect portability.
Operating systems can play a key role in code portability issues.
Hardware architectures are characteristically highly variable and can affect portability.
Resource Requirements:
The primary intent of parallel programming is to decrease execution wall clock time, however in order to accomplish this, more CPU time is required. For example, a parallel code that runs in 1 hour on 8 processors actually uses 8 hours of CPU time.
The amount of memory required can be greater for parallel codes than serial codes, due to the need to replicate data and for overheads associated with parallel support libraries and subsystems.
For short running parallel programs, there can actually be a decrease in performance compared to a similar serial implementation. The overhead costs associated with setting up the parallel environment, task creation, communications and task termination can comprise a significant portion of the total execution time for short runs.
Scalability:
The ability of a parallel program's performance to scale is a result of a number of interrelated factors. Simply adding more machines is rarely the answer.
The algorithm may have inherent limits to scalability. At some point, adding more resources causes performance to decrease. Most parallel solutions demonstrate this characteristic at some point.
Hardware factors play a significant role in scalability. Examples:
Memory-cpu bus bandwidth on an SMP machine
Communications network bandwidth
Amount of memory available on any given machine or set of machines
Processor clock speed
Parallel support libraries and subsystems software can limit scalability independent of your application.
Performance Analysis and Tuning
As with debugging, monitoring and analyzing parallel program execution is significantly more of a challenge than for serial programs.
A number of parallel tools for execution monitoring and program analysis are available.
Some are quite useful; some are cross-platform also.
One starting point: Performance Analysis Tools Tutorial
Work remains to be done, particularly in the area of scalability.
Parallel Examples
Array Processing
This example demonstrates calculations on 2-dimensional array elements, with the computation on each array element being independent from other array elements.
The serial program calculates one element at a time in sequential order.
Serial code could be of the form:
do j = 1,n
do i = 1,n
a(i,j) = fcn(i,j)
end do
end do
The calculation of elements is independent of one another - leads to an embarrassingly parallel situation.
The problem should be computationally intensive.
Array Processing Parallel Solution 1
Arrays elements are distributed so that each processor owns a portion of an array (subarray).
Independent calculation of array elements insures there is no need for communication between tasks.
Distribution scheme is chosen by other criteria, e.g. unit stride (stride of 1) through the subarrays. Unit stride maximizes cache/memory usage.
Since it is desirable to have unit stride through the subarrays, the choice of a distribution scheme depends on the programming language.
After the array is distributed, each task executes the portion of the loop corresponding to the data it owns. For example, with Fortran block distribution:
do j = mystart, myend
do i = 1,n
a(i,j) = fcn(i,j)
end do
end do
Notice that only the outer loop variables are different from the serial solution.
One Possible Solution:
Implement as SPMD model.
Master process initializes array, sends info to worker processes and receives results.
Worker process receives info, performs its share of computation and sends results to master.
Using the Fortran storage scheme, perform block distribution of the array.
Pseudo code solution:
red highlights changes for parallelism.
find out if I am MASTER or WORKER
if I am MASTER
initialize the array
send each WORKER info on part of array it owns
send each WORKER its portion of initial array
receive from each WORKER results
else if I am WORKER
receive from MASTER info on part of array I own
receive from MASTER my portion of initial array
# calculate my portion of array
do j = my first column,my last column
do i = 1,n
a(i,j) = fcn(i,j)
end do
end do
send MASTER results
endif
Example: MPI Program in C
/******************************************************************************
* FILE: mpi_array.c
* DESCRIPTION:
* MPI Example - Array Assignment - C Version
* This program demonstrates a simple data decomposition. The master task
* first initializes an array and then distributes an equal portion that
* array to the other tasks. After the other tasks receive their portion
* of the array, they perform an addition operation to each array element.
* They also maintain a sum for their portion of the array. The master task
* does likewise with its portion of the array. As each of the non-master
* tasks finish, they send their updated portion of the array to the master.
* An MPI collective communication call is used to collect the sums
* maintained by each task. Finally, the master task displays selected
* parts of the final array and the global sum of all array elements.
* NOTE: the number of MPI tasks must be evenly disible by 4.
* AUTHOR: Blaise Barney
* LAST REVISED: 04/13/05
****************************************************************************/
#include "mpi.h"
#include
#include
#define ARRAYSIZE 16000000
#define MASTER 0
float data[ARRAYSIZE];
int main (int argc, char *argv[])
{
int numtasks, taskid, rc, dest, offset, i, j, tag1,
tag2, source, chunksize;
float mysum, sum;
float update(int myoffset, int chunk, int myid);
MPI_Status status;
/***** Initializations *****/
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
if (numtasks % 4 != 0) {
printf("Quitting. Number of MPI tasks must be divisible by 4.\n");
MPI_Abort(MPI_COMM_WORLD, rc);
exit(0);
}
MPI_Comm_rank(MPI_COMM_WORLD,&taskid);
printf ("MPI task %d has started...\n", taskid);
chunksize = (ARRAYSIZE / numtasks);
tag2 = 1;
tag1 = 2;
/***** Master task only ******/
if (taskid == MASTER){
/* Initialize the array */
sum = 0;
for(i=0; i
/* Send each task its portion of the array - master keeps 1st part */
offset = chunksize;
for (dest=1; dest
/* Master does its part of the work */
offset = 0;
mysum = update(offset, chunksize, taskid);
/* Wait to receive results from each task */
for (i=1; i
/* Get final sum and print sample results */
MPI_Reduce(&mysum, &sum, 1, MPI_FLOAT, MPI_SUM, MASTER, MPI_COMM_WORLD);
printf("Sample results: \n");
offset = 0;
for (i=0; i
} /* end of master section */
/***** Non-master tasks only *****/
if (taskid > MASTER) {
/* Receive my portion of array from the master task */
source = MASTER;
MPI_Recv(&offset, 1, MPI_INT, source, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&data[offset], chunksize, MPI_FLOAT, source, tag2,
MPI_COMM_WORLD, &status);
mysum = update(offset, chunksize, taskid);
/* Send my results back to the master task */
dest = MASTER;
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&data[offset], chunksize, MPI_FLOAT, MASTER, tag2, MPI_COMM_WORLD);
MPI_Reduce(&mysum, &sum, 1, MPI_FLOAT, MPI_SUM, MASTER, MPI_COMM_WORLD);
} /* end of non-master */
MPI_Finalize();
} /* end of main */
float update(int myoffset, int chunk, int myid) {
int i;
float mysum;
/* Perform addition to each of my array elements and keep my sum */
mysum = 0;
for(i=myoffset; i < myoffset + chunk; i++) {
data[i] = data[i] + i * 1.0;
mysum = mysum + data[i];
}
printf("Task %d mysum = %e\n",myid,mysum);
return(mysum);
}
Example: MPI Program in Fortran
C *****************************************************************************
C FILE: mpi_array.f
C DESCRIPTION:
C MPI Example - Array Assignment - Fortran Version
C This program demonstrates a simple data decomposition. The master task
C first initializes an array and then distributes an equal portion that
C array to the other tasks. After the other tasks receive their portion
C of the array, they perform an addition operation to each array element.
C They also maintain a sum for their portion of the array. The master task
C does likewise with its portion of the array. As each of the non-master
C tasks finish, they send their updated portion of the array to the master.
C An MPI collective communication call is used to collect the sums
C maintained by each task. Finally, the master task displays selected
C parts of the final array and the global sum of all array elements.
C NOTE: the number of MPI tasks must be evenly disible by 4.
C AUTHOR: Blaise Barney
C LAST REVISED: 01/24/09
C **************************************************************************
program array
include 'mpif.h'
integer ARRAYSIZE, MASTER
parameter (ARRAYSIZE = 16000000)
parameter (MASTER = 0)
integer numtasks, taskid, ierr, dest, offset, i, tag1,
& tag2, source, chunksize
real*4 mysum, sum, data(ARRAYSIZE)
integer status(MPI_STATUS_SIZE)
common /a/ data
C ***** Initializations *****
call MPI_INIT(ierr)
call MPI_COMM_SIZE(MPI_COMM_WORLD, numtasks, ierr)
i = MOD(numtasks, 4)
if (i .ne. 0) then
call MPI_Abort(MPI_COMM_WORLD,ierr)
stop
end if
call MPI_COMM_RANK(MPI_COMM_WORLD, taskid, ierr)
write(*,*)'MPI task',taskid,'has started...'
chunksize = (ARRAYSIZE / numtasks)
tag2 = 1
tag1 = 2
C***** Master task only ******
if (taskid .eq. MASTER) then
C Initialize the array
sum = 0.0
do i=1, ARRAYSIZE
data(i) = i * 1.0
sum = sum + data(i)
end do
write(*,20) sum
C Send each task its portion of the array - master keeps 1st part
offset = chunksize + 1
do dest=1, numtasks-1
call MPI_SEND(offset, 1, MPI_INTEGER, dest, tag1,
& MPI_COMM_WORLD, ierr)
call MPI_SEND(data(offset), chunksize, MPI_REAL, dest,
& tag2, MPI_COMM_WORLD, ierr)
write(*,*) 'Sent',chunksize,'elements to task',dest,
& 'offset=',offset
offset = offset + chunksize
end do
C Master does its part of the work
offset = 1
call update(offset, chunksize, taskid, mysum)
C Wait to receive results from each task
do i=1, numtasks-1
source = i
call MPI_RECV(offset, 1, MPI_INTEGER, source, tag1,
& MPI_COMM_WORLD, status, ierr)
call MPI_RECV(data(offset), chunksize, MPI_REAL,
& source, tag2, MPI_COMM_WORLD, status, ierr)
end do
C Get final sum and print sample results
call MPI_Reduce(mysum, sum, 1, MPI_REAL, MPI_SUM, MASTER,
& MPI_COMM_WORLD, ierr)
print *, 'Sample results:'
offset = 1
do i=1, numtasks
write (*,30) data(offset:offset+4)
offset = offset + chunksize
end do
write(*,40) sum
end if
C***** Non-master tasks only *****
if (taskid .gt. MASTER) then
C Receive my portion of array from the master task */
call MPI_RECV(offset, 1, MPI_INTEGER, MASTER, tag1,
& MPI_COMM_WORLD, status, ierr)
call MPI_RECV(data(offset), chunksize, MPI_REAL, MASTER,
& tag2, MPI_COMM_WORLD, status, ierr)
call update(offset, chunksize, taskid, mysum)
C Send my results back to the master
call MPI_SEND(offset, 1, MPI_INTEGER, MASTER, tag1,
& MPI_COMM_WORLD, ierr)
call MPI_SEND(data(offset), chunksize, MPI_REAL, MASTER,
& tag2, MPI_COMM_WORLD, ierr)
call MPI_Reduce(mysum, sum, 1, MPI_REAL, MPI_SUM, MASTER,
& MPI_COMM_WORLD, ierr)
endif
call MPI_FINALIZE(ierr)
20 format('Initialized array sum = ',E12.6)
30 format(5E14.6)
40 format('*** Final sum= ',E12.6,' ***')
end
subroutine update(myoffset, chunksize, myid, mysum)
integer ARRAYSIZE, myoffset, chunksize, myid, i
parameter (ARRAYSIZE = 16000000)
real*4 mysum, data(ARRAYSIZE)
common /a/ data
C Perform addition to each of my array elements and keep my sum
mysum = 0
do i=myoffset, myoffset + chunksize-1
data(i) = data(i) + i * 1.0
mysum = mysum + data(i)
end do
write(*,50) myid,mysum
50 format('Task',I4,' mysum = ',E12.6)
end subroutine update
Array Processing Parallel Solution 2: Pool of Tasks
The previous array solution demonstrated static load balancing:
Each task has a fixed amount of work to do
May be significant idle time for faster or more lightly loaded processors - slowest tasks determines overall performance.
Static load balancing is not usually a major concern if all tasks are performing the same amount of work on identical machines.
If you have a load balance problem (some tasks work faster than others), you may benefit by using a pool of tasks" scheme.
Pool of Tasks Scheme:
Two processes are employed
Master Process:
Holds pool of tasks for worker processes to do
Sends worker a task when requested
Collects results from workers
Worker Process: repeatedly does the following
Gets task from master process
Performs computation
Sends results to master
Worker processes do not know before runtime which portion of array they will handle or how many tasks they will perform.
Dynamic load balancing occurs at run time: the faster tasks will get more work to do.
Pseudo code solution:
red highlights changes for parallelism.
find out if I am MASTER or WORKER
if I am MASTER
do until no more jobs
send to WORKER next job
receive results from WORKER
end do
tell WORKER no more jobs
else if I am WORKER
do until no more jobs
receive from MASTER next job
calculate array element: a(i,j) = fcn(i,j)
send results to MASTER
end do
endif
Discussion:
In the above pool of tasks example, each task calculated an individual array element as a job. The computation to communication ratio is finely granular.
Finely granular solutions incur more communication overhead in order to reduce task idle time.
A more optimal solution might be to distribute more work with each job. The "right" amount of work is problem dependent.
PI Calculation
The value of PI can be calculated in a number of ways. Consider the following method of approximating PI
Inscribe a circle in a square
Randomly generate points in the square
Determine the number of points in the square that are also in the circle
Let r be the number of points in the circle divided by the number of points in the square
PI ~ 4 r
Note that the more points generated, the better the approximation
Serial pseudo code for this procedure:
npoints = 10000
circle_count = 0
do j = 1,npoints
generate 2 random numbers between 0 and 1
xcoordinate = random1 ; ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do
PI = 4.0*circle_count/npoints
Note that most of the time in running this program would be spent executing the loop
Leads to an embarrassingly parallel solution
Computationally intensive
Minimal communication
Minimal I/O
PI Calculation Parallel Solution
Parallel strategy: break the loop into portions that can be executed by the tasks.
For the task of approximating PI:
Each task executes its portion of the loop a number of times.
Each task can do its work without requiring any information from the other tasks (there are no data dependencies).
Uses the SPMD model. One task acts as master and collects the results.
Pseudo code solution:
red highlights changes for parallelism.
npoints = 10000
circle_count = 0
p = number of tasks
num = npoints/p
find out if I am MASTER or WORKER
do j = 1,num
generate 2 random numbers between 0 and 1
xcoordinate = random1 ; ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do
if I am MASTER
receive from WORKERS their circle_counts
compute PI (use MASTER and WORKER calculations)
else if I am WORKER
send to MASTER circle_count
endif
Simple Heat Equation
Most problems in parallel computing require communication among the tasks. A number of common problems require communication with "neighbor" tasks.
The heat equation describes the temperature change over time, given initial temperature distribution and boundary conditions.
A finite differencing scheme is employed to solve the heat equation numerically on a square region.
The initial temperature is zero on the boundaries and high in the middle.
The boundary temperature is held at zero.
For the fully explicit problem, a time stepping algorithm is used. The elements of a 2-dimensional array represent the temperature at points on the square.
The calculation of an element is dependent upon neighbor element values.
A serial program would contain code like:
do iy = 2, ny - 1
do ix = 2, nx - 1
u2(ix, iy) =
u1(ix, iy) +
cx * (u1(ix+1,iy) + u1(ix-1,iy) - 2.*u1(ix,iy)) +
cy * (u1(ix,iy+1) + u1(ix,iy-1) - 2.*u1(ix,iy))
end do
end do
Simple Heat Equation Parallel Solution 1
Implement as an SPMD model
The entire array is partitioned and distributed as subarrays to all
tasks. Each task owns a portion of the total array.
Determine data dependencies
Master process sends initial info to workers, checks for convergence and collects results
Worker process calculates solution, communicating as necessary with neighbor processes
Pseudo code solution:
red highlights changes for parallelism.
find out if I am MASTER or WORKER
if I am MASTER
initialize array
send each WORKER starting info and subarray
do until all WORKERS converge
gather from all WORKERS convergence data
broadcast to all WORKERS convergence signal
end do
receive results from each WORKER
else if I am WORKER
receive from MASTER starting info and subarray
do until solution converged
update time
send neighbors my border info
receive from neighbors their border info
update my portion of solution array
determine if my solution has converged
send MASTER convergence data
receive from MASTER convergence signal
end do
send MASTER results
endif
Simple Heat Equation
Parallel Solution 2: Overlapping Communication and Computation
In the previous solution, it was assumed that blocking communications were used by the worker tasks. Blocking communications wait for the communication process to complete before continuing to the next program instruction.
In the previous solution, neighbor tasks communicated border data, then each process updated its portion of the array.
Computing times can often be reduced by using non-blocking communication. Non-blocking communications allow work to be performed while communication is in progress.
Each task could update the interior of its part of the solution array while the communication of border data is occurring, and update its border after communication has completed.
Pseudo code for the second solution:
red highlights changes for non-blocking communications.
find out if I am MASTER or WORKER
if I am MASTER
initialize array
send each WORKER starting info and subarray
do until all WORKERS converge
gather from all WORKERS convergence data
broadcast to all WORKERS convergence signal
end do
receive results from each WORKER
else if I am WORKER
receive from MASTER starting info and subarray
do until solution converged
update time
non-blocking send neighbors my border info
non-blocking receive neighbors border info
update interior of my portion of solution array
wait for non-blocking communication complete
update border of my portion of solution array
determine if my solution has converged
send MASTER convergence data
receive from MASTER convergence signal
end do
send MASTER results
endif
1-D Wave Equation
In this example, the amplitude along a uniform, vibrating string is calculated after a specified amount of time has elapsed.
The calculation involves:
the amplitude on the y axis
i as the position index along the x axis
node points imposed along the string
update of the amplitude at discrete time steps.
The equation to be solved is the one-dimensional wave equation:
Note that amplitude will depend on previous timesteps (t, t-1) and neighboring points (i-1, i+1). Data dependence will mean that a parallel solution will involve communications.
1-D Wave Equation Parallel Solution
Implement as an SPMD model
The entire amplitude array is partitioned and distributed as subarrays to all tasks. Each task owns a portion of the total array.
Load balancing: all points require equal work, so the points should be divided equally
A block decomposition would have the work partitioned into the number of tasks as chunks, allowing each task to own mostly contiguous data points.
Communication need only occur on data borders. The larger the block size the less the communication.
Pseudo code solution:
find out number of tasks and task identities
#Identify left and right neighbors
left_neighbor = mytaskid - 1
right_neighbor = mytaskid +1
if mytaskid = first then left_neigbor = last
if mytaskid = last then right_neighbor = first
find out if I am MASTER or WORKER
if I am MASTER
initialize array
send each WORKER starting info and subarray
else if I am WORKER
receive starting info and subarray from MASTER
endif
#Update values for each point along string
#In this example the master participates in calculations
do t = 1, nsteps
send left endpoint to left neighbor
receive left endpoint from right neighbor
send right endpoint to right neighbor
receive right endpoint from left neighbor
#Update points along line
do i = 1, npoints
newval(i) = (2.0 * values(i)) - oldval(i)
+ (sqtau * (values(i-1) - (2.0 * values(i)) + values(i+1)))
end do
end do
#Collect results and write to file
if I am MASTER
receive results from each WORKER
write results to file
else if I am WORKER
send results to MASTER
endif