Python NetWorkSpaces and Parallel Programs

Python and NetWorkSpaces make it easy to create and experiment with parallel programs without requiring specialized tools or hardware.


July 02, 2007
URL:http://www.drdobbs.com/architecture-and-design/python-networkspaces-and-parallel-progra/200001971

Robert and Nicholas are members of the Department of Computer Science and W.M. Keck Foundation Biotechnology Resource Laboratory at Yale University. Nicholas and Stephen are researchers at Scientific Computing Associates.


Parallel programming has the reputation of being an exotic field, pursued by experts using extremely large and expensive machines. Unfortunately, due in part to its history, parallel programming languages and tools still mostly focus on "big iron" and older languages such as C and Fortran. Performance improvement via parallelism should be of interest to anyone whose codes run too slowly. This requires a shift in focus.

Today, many new computers are multicore and most users have access to multiple computers. Many developers work with newer dynamic languages like Python and R. To meet the needs of these users, we've developed a Python-based coordination system called "NetWorkSpaces" (NWS) that is easy to learn, accessible via almost all development environments (including R, Java, octave, Python, Perl, and Ruby), and deployable on ad hoc collections of spare CPUs. But while its simplicity makes it a good choice for pedagogical examples, it's not a toy system. We've used NWS to run parallel programs on hundreds of processors, producing many CPU years of useful computation.

NetWorkSpaces

NetWorkSpaces (www.lindaspaces.com/products/NWS_overview.html) was developed at Scientific Computing Associates and is available at SourceForge (nws-py.sourceforge.net/).

You must install both the server (on one machine) and a client (on all machines involved in the computation). The server is implemented using Python and Twisted, which are required. Even though NWS is implemented in Python, we have NetWorkSpace client APIs for a variety of languages. While we describe the Python client here, the ideas transfer to other language clients.

NWS is based on the concept of a set of bindings, which in programming languages are sometimes known as "environments," "namespaces," "workspaces," and the like. Generally in programming languages, a binding maps a name to a value. Because this is a concept familiar to programmers, it is a good foundation for building a coordination system.

A given language has rules about allowable names, allowable values for a given name, and the context in which the binding is valid (the binding's scope). The language also provides operations for establishing a binding and for retrieving the value of a bound name. Often these operations are implied by the lexical structure of code, as is the intended binding set. So, for example, in x = y the y implies a look-up of the value bound to y, while the x is the target of the assignment. Scoping rules determine which x and y are meant.

NWS provides a particular encapsulation of binding semantics. Using this encapsulation, we explicitly specify the look-up (fetch), the association of the name x with the retrieved value (store), and the intended binding set (indicated by the ws object). Thus, a simple assignment looks like this in NWS:


ws.store('x', ws.fetch('y'))

So far, we've succeeded in making a fairly routine construct more verbose. The key point is that the NWS encapsulation is amenable to a network-based implementation, which lets different processes exchange data and synchronize via NWS bindings.

In many languages, including Python, we could have used syntax similar to that of normal bindings:

ws.x = ws.y

However, the semantics of these NWS variables differ in important ways from that of normal variables. In our opinion, it's a mistake to create a false illusion of similarity when, in fact, there are important differences.

NWS is designed to be a coordination facility that is language neutral. The advantages this neutrality offers include:

To facilitate interlanguage coordination, NWS variable names are ASCII strings and don't need to conform to the variable naming rules of any given language. The values can be any native type in the client language for which that language has a workable serialization. Most values in most of the languages mentioned can be automatically serialized (the serialization is done behind the scenes by NWS, and is not of direct concern to programmers).

For example, Python NWS can automatically handle composite data structures:


>>> from nws.client import NetWorkSpace
>>> ws=NetWorkSpace('test')
>>> l=['a','b','c']
>>> t=(1,2,3)
>>> d={'list':l, 'tuple':t}
>>> ws.store('dict example', d)
>>> ws.fetch('dict example')
{'list': ['a', 'b', 'c'], 'tuple': (1, 2, 3)}

Finally, ASCII strings used as values are treated in a special way (they are not subject to the client language serialization protocol) that makes it possible for them to be exchanged across client languages. In Example 1, for instance, you can use NWS to move data from Python to R encoded as an ASCII string.

Python
>>> from nws.client import NetWorkSpace
>>> ws=NetWorkSpace('tickets')
>>> ws.store('ticket', 'ticket string')

R
> library(nws)
> ws<-netWorkSpace('tickets')
> nwsFetch(ws, 'ticket')
[1] "ticket string"

Example 1: Using NWS to move data from Python to R encoded as an ASCII string.

Coordinated Binding Behavior

Looking again at x = y, what if y has not yet been defined? With conventional sequential programs, we could get junk data. A more helpful language might signal an error or raise an exception; after all, nobody else is going to come along and define y for us.

But in an ensemble setting, somebody might well do just that. In other words, in the context of coordination, an unbound name has a perfectly valid (and useful) interpretation: "Please hold."

Now consider:

x = 123
x = 456

In the setting of conventional sequential programs, it would be unusual to see such code (and a compiler might warn about or eliminate the dead code), but it's reasonable to let the second assignment simply overwrite 123 with 456. We're the only process in town, so who else is going to care about the old value?

But in ensemble settings, other processes may be interested in the sequence of values bound to x. If so, how do we know a particular value of x has been put to good use?

Enter generative communication. Some coordination events generate data that exist independent of any process, others consume such data. We interpret variable binding as adding a value to a list of values, rather than overwriting a single value. We do so by maintaining a FIFO queue of values. But how do we shed values? To complete the picture, retrieval of a value bound to a name removes one value from the queue. In short, an assignment records a value of interest, a retrieval consumes one value, and an empty list of values triggers "Please hold" for a retrieval.

To see how well these play together in a Python session, run this "worker" code:

from nws.client import NetWorkSpace
def f(x): return x*x*x
ws = NetWorkSpace('table')
while True: 
  ws.store('r', f(ws.fetch('x')))

Then, in another Python session, run the "master":

ws = NetWorkSpace('table')
for x in range(100): ws.store('x', x)
for x in range(100): print 'f(%d) = %d'%(x, ws.fetch('r'))

You will see printed a list of numbers and their cubes, computed by the worker:

Other Coordination Patterns

Suppose processes are cooperating in a search to find a value, xmax, that maximizes a function F; that is, F(xmax) is the greatest value F attains. Each process has its own list of x values to test. We would like to do something like:

for x in MyCandidateList:
    currentMax = ws.fetch('max')
    y = f(x)
    if y > currentMax: ws.store('max', y)

But that would be wrong, since fetch consumes a value that may not be replaced.

We would like a way to consult a variable without destroying the value. In NWS, find returns a value without destroying it. So, we could try:

for x in MyCandidateList:
    currentMax = ws.find('max')
    y = f(x)
    if y > currentMax: ws.store('max', y)

But that would be wrong too—we are no longer maintaining a single maximum value, and currentMax might not really be current. How about:

for x in MyCandidateList:
    currentMax = ws.find('max')
    y = f(x)
    if y > currentMax:
        currentMax = ws.fetch('max')
        if y > currentMax: currentMax = y
        ws.store('max', currentMax)

Now we're only updating max when our currentMax really is larger. Because of the fetch/store pair, we have just one value associated with max (this pair of operations also ensures the atomicity of the update). But suppose it is unlikely that a given x will result in a new maximum. We could further reduce the coordination traffic by invoking find only every other iteration or every tenth or... Because we always consult the true current value of max before committing to a change, stale values won't lead to incorrect results. They will, however, increase the number of failed update attempts.

There are other uses for find, the most common being "write-once" variables such as initialization data that are set at the beginning of a computation and needed by two or more ensemble members.

find alters the way value queues are referenced. What about variations in the queue itself? NWS supports four modes for values:

The single mode works well with the find operation and is useful for status values; you can update the status without bothering to remove (fetch) the previous status.

from nws.client import SINGLE
ws.declare('status', SINGLE)
while True:
  time.sleep(60)
  status=...
  ws.store('status', status)

NWS Iterators

It's also possible to iterate through values in NWS. ifind returns an iterator that returns each value bound to the specified variable:

for v in range(5): ws.store('v', v)
for v in ws.ifind('v'): print v

If you try this, notice that the second loop hangs after printing five values. This is because ifind is waiting for more values. If you use another process to store more values to v, it wakes up and prints them. To signal it to stop, you can delete the binding:

ws.deleteVar('v')

Alternatively, you can use ifindTry, which creates an iterator that terminates as soon as there are no more values. Finally, ifetch and ifetchTry are similar methods that consume the values as they iterate. NWS iterators can only be used on FIFO- or Single-mode variables.

Managing Workspaces

NWS supports multiple workspaces to help organize variables into related groups. We've seen that the constructor takes the name of a workspace as its first argument. It could also take a hostname and a port number to specify the server that hosts the workspace. (Something like this had to be coming. Without it how could we form ensembles with parts running on distinct computers?)

So if multiple processes each execute:

ws = NetWorkSpace('example', serverHost='myhost.mycorp.com')

they would all have access to the same workspace, which is managed by the NWS server accessible through a default port on myhost.

But who would own it? And why would that matter? By default, the process that first mentions the workspace to the server owns it. This matters because we eventually need to clean up. Workspaces and the data in them can be persistent, meaning that they exist until explicitly destroyed. In practice this can lead to a mess, so they are by default transitory—when the process that owns them exits, they are destroyed. Persistence is useful in certain settings and is available via an option to the workspace constructor.

Usually the default ownership policy does the right thing; a master process mentions the NWS first and becomes the owner. Then other processes that use it can come and go without destroying the NWS. Finally, when the master exits, the NWS is cleaned up.

But sometimes you want to be able to mention an NWS without inadvertently becoming the owner, as might happen if an auxiliary process beat the master to the first mention. Declaring the NWS with useUse true indicates that the process doesn't want to own the NWS:

ws = NetWorkSpace('not mine', useUse=True)

Workspace naming presents a bit of a challenge: How can we avoid name collisions? Consider, for example, multiple instances of an ensemble application using the same NWS server. Each NWS workspace object has associated with it a server object (encapsulating the connection to the server managing the workspace). This object provides a mktempWs method that works something like Linux's mkdtemp. It returns a reference to a workspace whose name is guaranteed to be unique on the server. Once we've got our own workspace, we can use variable names within it without worrying about those colliding with some other ensemble program.

The Web Interface

NetWorkSpaces and the variables they contain exist independently of particular client programs. Examining the contents can be helpful when debugging or understanding a program, or monitoring the progress of an application. NWS includes a web interface that displays the workspaces and their contents. Figure 1 is a list of NetWorkSpaces; if you clicked on "test," you'd see something like Figure 2. By clicking on a variable, you can see a list of the current values (Figure 3).

[Click image to view at full size]

Figure 1: List of NetWorkSpaces.

Figure 2: Results of clicking "test".

Figure 3: Clicking on variables displays a list of current values.

Since values are serialized objects not easily interpreted by Python, how can the web interface display values that come from other languages (R vectors, octave matrices, and so on)? The translation is performed by language-specific "babelfish" services that understand how to deserialize and display their own datatypes. Babelfish are just ordinary clients that get translation tasks and return results via NWS.

As you can see, the interface provides a limited capability to alter the NWS state by deleting workspaces and variables. Appropriately designed code can make good use of this as a signaling mechanism. We use it, for example, to provide a web-intermediated process initialization for certain deployments of the Sleigh system.

Sleigh

To this point we have taken the coordinating processes for granted; we haven't described how they are created and controlled. We could make use of an external mechanism for launching and managing processes, but because this is so fundamental, NWS includes its own with Sleigh. Sleigh also provides functionality to handle certain kinds of "embarrassingly parallel" master/worker-style problems directly, absolving users from the need for direct engagement with NWS.

Sleigh launches generic "worker" processes on machines, which then wait for work to do. Here is the simplest scenario:

>>> from nws.sleigh import Sleigh
>>> s = Sleigh()

By default, Sleigh creates three workers. Let's see where the processes are actually running by asking each worker pulling the sleigh for its hostname:

>>> from socket import gethostname
>>> s.eachWorker(gethostname)
['newt', 'newt', 'newt']

The Sleigh method eachWorker runs the specified function once on each worker process, and returns the values in a list. We see that, by default, Sleigh starts three worker processes on the local machine (newt). Running our workers on the local machine is useful for testing, or if we happen to have a multicore computer, but we would like to be able to use other machines, too. So let's shut this down, and try something different:

>>> s.stop()
>>> from nws.sleigh import sshcmd
>>> s = Sleigh(nodeList=['hippo', 'newt', 'python', 'rhino'], launch=sshcmd)
>>> s.eachWorker(gethostname)
['rhino', 'hippo', 'newt', 'python']

This example assumes that users have correctly configured ssh to permit password-less login to the computers in the node list.

eachWorker is typically used to initialize workers (for example, by importing packages or loading common data from a file), but the real work in Sleigh is usually done by eachElem. This method takes as input a function and a list, and returns a list of the results of applying the function to each element in the input list. The number of tasks in the list need not equal the number of workers. Normally there are more tasks than workers, and the workers cooperate to compute all the tasks. We use eachElem to compute the same table of cubes as above:

>>> r = s.eachElem(lambda x: x*x*x, range(100))
>>> len(r)
100
>>> r[2:5]
[8, 27, 64]

We see that the results are returned in order. In contrast to the original program, Sleigh takes care of the details of handing out tasks, collecting results, and starting/stopping the workers. eachElem can handle general functions with multiple fixed and varying parameters. For example:

s.eachElem(f, [[1,2,3],[11,12,13]],[99])

would invoke f(1,11,99), f(2,12,99), and f(3,13,99). This functionality (which also extends to permutations of the argument list) can accommodate a variety of function prototypes without needing to write wrapper codes or reworking the functions themselves.

Each task wakes up in a generic worker, whichever happens to be free. The worker is given the definitions from the module that contains the function being eachElem'd, plus any definitions built up by previous invocations of eachWorker (in the example, we used eachWorker to initialize some global variables). This implies that workers maintain their state across different tasks, which can be useful, but also a potential source of bugs.

By default, eachElem is blocking—that is, the invocation will not return until all results return from the workers. As an option, eachElem can be invoked in asynchronous mode in which it immediately returns not the list of results, but a SleighPending object. This object includes methods for querying the progress of the computation and for obtaining the final results.

Putting It All Together

Now let's put everything together in a semirealistic program that calculates the primes up to, but not including, N. There are almost as many different approaches to parallel prime finders as there are primes. In this one, we use eachElem to create a number of tasks, where each task represents a range of odd integers that should be combed for primes. To test for primality, we'll try to divide each number, n, by all primes less than or equal to sqrt(n), which implies that tasks are dependent on the results of previous tasks.

Listing One is the entire code. The master instantiates a Sleigh, initializes its workers via eachWorker, creates a list of tasks, and then runs the tasks via eachElem. The two general parameters that give the chunk length (which must be even), and N, are bound to a variable in an NWS and are retrieved by the function invoked via eachWorker. Each task is represented by an integer: the beginning of a chunk of integers to check for primality. Each task returns a list of primes that were found in that chunk. Checking a particular number may require candidate factor primes that the worker doesn't already know, in which case they'll get them via find.

import sys
from nws.sleigh import Sleigh

def initPrimes():
    global chunk, chunks, limit
    limit, chunk = SleighUserNws.find('prime parameters')
    chunks = {}
def findPrimes(first):
    last = min(first+chunk, limit)
    # we need to know all the primes up to the smaller of the start of
    # this chunk or the square root of its last element.
    need = min(first-2, int(.5+(last-1)**.5))

    myPrimes = []
    for c in xrange(3, need+1, chunk):
     if not c in chunks: chunks[c] = SleighUserNws.find('primes%d'%c)
     myPrimes += chunks[c]
    newx = len(myPrimes)
    for x in xrange(first, last, 2):
        for p in myPrimes:
            if x%p == 0: break
        else: myPrimes.append(x)
    if first**2 < limit: SleighUserNws.store('primes%d'%first, myPrimes[newx:])

    return myPrimes[newx:]

def master(workers, limit, chunk):
    s = Sleigh(workerCount=workers)
    s.userNws.store('prime parameters', (limit, chunk))
    s.eachWorker(initPrimes)
    primes = [2]
    map(primes.extend, s.eachElem(findPrimes, range(3, limit, chunk)))

    return primes

if __name__=='__main__':
    primes = master(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]))
    print len(primes), primes[:3], '...', primes[-3:]
Listing One

The workers use SleighUserNws as their NWS; this is a workspace object corresponding to a uniquely named NWS that is created by Sleigh. It is a convenient place to store variables related to the Sleigh run. Also, the workers remember which primes they've seen so that they don't have to get them for subsequent tasks.

Conclusion

Python and NetWorkSpaces make it easy to create and experiment with parallel programs without requiring specialized tools or hardware. Communication and synchronization are greatly simplified by reduction to manipulation of variables in a shared workspace.

The programs can be tested on a single CPU using multiple processes (or threads), or for actual speedup, on multicore CPUs or a collection of computers. The state of the parallel computation is captured by the variables stored in NetWorkSpace. These can be visualized via the web interface, which makes it easy to understand and debug the program, and monitor progress. Because NetWorkSpaces is language-neutral, code and idioms can be transferred to different environments, and it can even be used to create ensembles from programs written in different languages.

Acknowledgments

Thanks to Scientific Computing Associates (www.lindaspaces.com) for supporting the development of NetWorkSpaces, and Twisted Matrix Labs for supporting Twisted (twistedmatrix.com). Sleigh was inspired in part by the R Project's SNOW package (short for "Simple Network Of Workstations"), developed by Luke Tierney, A.J. Rossini, Na Li, and H. Sevcikova (cran.r-project.org/doc/packages/snow.pdf).

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.