Channels ▼

Bil Lewis

Dr. Dobb's Bloggers

The Pipeline

September 01, 2010

The Pipeline

In "Multithreaded Programming with PThreads" and "Multithreaded Programming with Java," I reviewed the use of a pipeline for the purpose of allowing parallel processing and concluded that there wasn't a lot of value in it. It's easier to write a straightforward program that simply distributes chunks of data for processing to multiple threads.

I stand by my reasoning for the situations I described, but have subsequently encountered different situations that merit independent analysis. In particular, while working in Bioinformatics at MIT, I ran into the question of how to stream large amounts of data ("large" means "more than fits in memory") and for this purpose, pipelines suddenly make good sense--not for parallelization explicitly, but a nice parallelization model falls out of it.

Let us consider a typical situation, where I have a million sequences of DNA, each 10000 bases long. In a string format, this is 20GB of 16 bit characters to start with, too large for the machines I'm using. Assuming that each sequence is to be analysed independently from the others, then it is a simple matter to read in each sequence, send it into the pipeline, and then go back for the next.

In this fashion, each step in the pipeline deals with exactly one 10k base pair sequence at a time and memory size is not an issue.

Notice that we have not addressed parallelization yet.

A pipeline consists of a series of steps which are run one after the other, accepting a datum for input, processing it, and passing a different datum to the next step. For example, I may wish to read a sequence, trim off low quality bases, search for the most similar sequence in a database of known sequences, run an alighment against that known sequence, and output an assessment of the presumed species the DNA came from.

I would write a pipeline that looks like this:

pipeline = createPipeline(reader, trimmer, similarer, aligner, assessor, writer);

outputFile = pipeline.run(inputFile);

The reader would be passed one datum--the input file. It would read each sequence in the file, passing that sequence to the trimmer, which would drop zero or more bases and then pass the result on to the similarer. It would find the most similar known sequence and pass the pair of them on to the aligner, which align them and pass that on to the assessor, which would assess the quality of the alignment and finally pass that on to the writer, which would write the results out to a file with a generated file name, which would be the final result of the pipeline.

Before I arrived at the institute, their pipelines were generally even simpler than this one. In particular, almost all the steps took a sequence as input and passed out a sequence to the next step. As a result, steps were easily reused.

The problems came when the objects being passed to the different steps were not all sequences. Now the existing steps could not be used verbatium and the solutions to this were ad-hoc -- everyone did something different. For example, a common way of dealing with the "SIMILARER" step, which needed to pass two sequences down the pipeline, was to build an array of two objects. The receiving step would then take that array apart.

As long as we were only passing two or three objects, it wasn't too bad, but as the number of objects increased, things got more difficult. And should we decide to reorder some steps, things got downright ugly. With no way to verify the correctness of the arrays, we would often find that we were passing the wrong array. Something as trivial as one person writing a step that took an array [seq1, seq2] and another person writing a step that took [seq2, seq1] would mess us up.

The usual solution was to write a new step that looked just like the old step, but accepted a different set of arguments. Add to this that most pipelines were being written in Jython and things got really rough. The lack of automatic refactoring in Jython meant the we rarely refactored our steps, exacerbating the situation.

Pipelines could be written in Java, but rarely were. I will show all examples in Java because it will be easier to see what's going on.

A typical step looked about like this:

public Similarer extends Step {

public void execute(Object obj) { Sequence s1 = ((Object[]) obj)[0]; Sequence s2 = ((Object[]) obj)[1]; double minScore = calculateMin(s1); double maxMismatch = calculateMaxMis(s1, s2); Alignment a = align(s1, s2, minScore, maxMismatch); Object[] output = new Object[] {s1, s2, a}; callNext(output); } }

The thing to notice is that the main process() method does a lot of work inline which would have to be duplicated if the order of the input sequences was to be different. Add to this the fact that to test the method, it is necessary to build a pipeline because callNext() must have something to call.

The solution I came up with was to separate the input section that needed to know about the object being passed down from the processing section that just needed to input data:

public Similarer extends Step {

public void execute(Object obj) { Sequence s1 = ((Object[]) obj)[0]; Sequence s2 = ((Object[]) obj)[1]; Object[] output = process(s1, s2); callNext(output); }

public Object[] process(Sequence s1, Sequence s2) { double maxMismatch = calculateMaxMis(s1, s2); Alignment a = align(s1, s2, minScore, maxMismatch); Object[] output = new Object[] {s1, s2, a}; return output; }

This allowed us to test the processing portion of the step independent from a pipeline, vastly simplifying the procedure and making debugging much easier without all the pipeline machinery in the mix. It also allowed us to reuse the step with different input arguments, because now we could subclass the step, specializing the execute() method to the input while leaving the process() method alone:

public Similarer2 extends Similarer {

public void execute(Object obj) { Sequence s1 = ((Object[]) obj)[1]; // different order Sequence s2 = ((Object[]) obj)[0]; Object[] output = process(s1, s2); callNext(output); } }

The array of objects worked reasonably as long as it was small and everybody ordered their arguments in the same fashion. In more complicated pipelines this remained a problem, until we simply got rid of it and created specific container objects to be passed down:

public SimilarerContainer extends Container { Sequence s1, s2;

Sequence getS1() {return s1;} ... }

public Similarer2 extends Similarer {

public void execute(Object obj) { SimilarerContainer c = (SimilarerContainer ) obj; Sequence s1 = c.getS1(); Sequence s2 = c.getS2(); Object[] output = process(s1, s2); callNext(output); } }

The one other debugging problem that often occured was when a pipeline ran correctly with all of the test arguments, but in a production run an unknown argument caused it to throw an exception. It was often difficult to figure out what the original input arguments were that caused the problem. So I added a standard catch clause with an error message that included the arguments:

public Similarer2 extends Similarer { int nProcessed = 0;

public void execute(Object obj) { SimilarerContainer c = (SimilarerContainer ) obj; Sequence s1 = c.getS1(); Sequence s2 = c.getS2(); try { Object[] output = process(s1, s2); nProcessed++; callNext(output); } catch (Exception e) { throw new RuntimeException(e, "Similarer2 " + nProcessed + " " + s1 + " " + s2); } }

This way when an exception did occur, the programmer would immediately know what the problematic arguments were and be able to isolate them. This made debugging MUCH easier.

====

You can see from the code examples above, that each step knows nothing about the preceeding or following steps at all, just that there has to be one. (The first and last steps can be special pseudo steps that know about starting and ending a pipeline.) In particular, the steps don't know if the other steps are in the same thread or even the same process. This allows the pipeline to decide on how parallelization might occur.

Obviously, for steps that are in different processes (possibily different machines), the objects being passed must be serializable. The default Java serialization is just fine for most objects and we can ignore serialization for most purposes. About the only aspect of serialization we care about is the cost.

While running steps in parallel is a nice thing, it's not generally a big win. We let the pipeline take care of it and otherwise ignore it. The real parallelization win comes when we can parallelize a single step, which turns out to be fairly simple. The data must not have any order dependencies, of course, and any type of aggregation requires some extra work, but otherwise it's a simple declaration.

For example, the previous pipeline's middle steps are all independent and can be done in parallel. So we can make a pipeline like this:

subPipeline = createParallelPipeline(ungroup, trimmer, similarer, aligner, assessor);

pipeline = createPipeline(reader, group, subPipeline, ungroup, writer);

where the step GROUP will gather up the first N objects from the preceeding step and send down a single list of those objects to the first of the parallel pipelines, then the next N to the second, third, etc. We will leave it to the pipeline infrastructure to figure out the optimal number of subpipelines to create, and if those subpipelines should be threads in a single process or full processes on possibly different machines.

In this example, the most expensive step is the aligner, which may take 6 seconds per alignment. The other steps combined will take a few hundred milliseconds. A group size of 100 (meaning each parallel subpipeline will take about 10 minutes to complete) would be reasonable. The serialization time for these sequences is also very small.

The net result of this is that a set of 1 million sequences will take about 6m seconds (80 days) to complete in the first pipeline, but only 6k seconds (2 hours) if 1000 cores are available (a not unreasonable expectation).

So, in some situations, pipelines do rock!

-BilIn "Multithreaded Programming with PThreads" and "Multithreaded Programming with Java," I reviewed the use of a pipeline for the purpose of allowing parallel processing and concluded that there wasn't a lot of value in it. It's easier to write a straightforward program that simply distributes chunks of data for processing to multiple threads.

I stand by my reasoning for the situations I described, but have subsequently encountered different situations that merit independent analysis.

Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 


Video