Channels ▼

Jim Dempsey

Dr. Dobb's Bloggers

Two-Stage Input Parallel Pipeline: Part 2

January 28, 2010

In the previous post we looked at using the QuickThread toolkit to write a two-stage input pipeline to boost the performance of the input side of your application. Today we look at using the QuickThread toolkit to write a high-performance output end of an application. QuickThread is available from QuickThread Programming.Recap summary from Two-Stage Input Parallel Pipeline: Part 1

a) Read-in option data consisting of 10,000,000 options in ASCII text format and convert into internal data structures

b) pass option data through the Black-Scholes function computing strike price and saving it in an output array.

c) Convert the 10,000,000 price information results back to text and write to output file.

On a Dell R610 with 2x Intel Xeon 5570 processors, with one iteration for step b), we have as time in seconds:

Serial: a) 33.361, b) 0.859, c) 8.394, total 42.614 OpenMP: a) 33.636, b) 0.107, c) 8.439, total 42.181 TBB: a) 33.402, b) 0.108, c) 9.317, total 42.826 Win32 Threads: a) 41.888, b) 0.089, c) 8.421, total 41.888 QuickThread: a) 4.393, b) 0.088, c) 1.565, total 6.047

In the first article we showed how to obtain a 7.8x improvement in the input phase of the program. Today we will look at how to attain a 7x improvement in the output phase of the program.

First, we will digress a little by asking and answering a rhetorical question:

Why isn't a 3-stage parallel pipeline used?

The answer is: In a real application, you may very well structure the program to use a 3-stage parallel pipeline. However, for the requirements of this benchmark, the center stage b) is required to iterate multiple times when run under the guise of the original benchmark which is used to measure the computation efficiency within region b). Due to the runtime of region b) being relatively short, many iterations are required (typically 100) to obtain a more accurate measure of performance.

While the Win32 Threads version splits the 10,000,000 options data into number of threads sized pieces (e.g. 10,000,000 / 16 = 635,000 options), using a parallel pipeline would divide the 10,000,000 option data into chunks defined as either: a default of 1,024 options, or by way of an input parameter. With the default buffer size of 1,024 options we have a working data footprint of less than 32KB. This would give an unfair advantage to the parallel pipeline in the 2nd and later iterations. As virtually all of the working data would reside in the L1 cache.

This brings up an important point. In a real options application, there would be an un-ending input stream of data in packets. And the important factor is the latency of the packet through the pipeline. The lower the latency, the sooner you can place your buy/sell.

An additional problem is the PARSEC supplied timer code is not accumulative. That is, each thread, as well as each use of the pipeline buffer, cannot accumulate its runtime into the time total for the region of interest. This is relatively easy to change with an atomic add, but this changes the nature of the PARSEC benchmark suite. This change would require the consent of the maintainers of the code base.

In all the threading models (excepting QuickThread) the arrays for Structure of Arrays were allocated by the main thread. The QuickThread version gives you the option of using the same Structures of Arrays allocation, or by way of a per-thread NUMA allocated slice. The per-thread NUMA allocation was done in anticipation of the 100 iteration test receiving benefit of data residing within the NUMA node of the thread. However, this Black-Scholes test is not memory bandwidth limited. And when cache line pre-fetch is enabled in BIOS, there was no benefit observed in expending the additional programming time to incorporate the NUMA allocated buffers.

The consequence of per-thread allocated slice of the Structure of Arrays is the AOS to SOA copy routine is slightly more complex than it need be. And, the output from the SOA to the output file is substantially more complicated than had the single allocation of the Array of Structures been used. A conditional compilation macro was added to define which technique to use. It should be noted that when using different processor architecture it might favor the NUMA allocation.

The following code snips use the singly allocated Structure of Arrays.

The output phase of the Black-Scholes is relatively simple, at least for the serial version of the code. With error tests removed (for brevity of this blog) the serial code condenses to:

//Write prices to output file file = fopen(outputFile, "w"); rv = fprintf(file, "%i\n", numOptions); for(i=0; i

For the novice, or even many experienced parallel programmers, your interpretation of the output phase is: it reduces down to a single statement, the fprintf statement.

The typical thought process for the novice programmer is: How do you parallel-ize this loop without messing up the output?

The current output file is constructed in the sequence of the prices[i].

If you use a #pragma omp parallel for on the for loop, the sequences of the prices data would not be in the correct order. Additionally, the fprintf statements would be fighting for the access to the output file buffer. So not only would your data be out of sequence, it may very well take longer to output. (Due to interference at a critical section within the C runtime system.)

You could potentially resolve the out of sequence problem by including the sequence number in the output file:

rv = fprintf(file, "%d, %.18f\n", i, prices[i]);

But now your output file is larger, there is additional computation, and you have not eliminated the competition for the file output critical section.

Let's explore two parallelization techniques using QuickThread to improve the output performance. We will run this on an Intel Core 2 Duo Q6600 4 core no HT system. The statistics are:

Q6600 4 core no HT

Serial Total time spent prior to ROI: 47.243s Total time spent in ROI: 1.427s Total time spent following ROI: 13.494s Total time spent in application: 62.163s

QuickThread using dual thread output Total time spent prior to ROI: 11.884s Total time spent in ROI: 0.378s Total time spent following ROI: 11.002s Total time spent in application: 23.264s

QuickThread using two-stage output pipeline Total time spent prior to ROI: 11.878s Total time spent in ROI: 0.379s Total time spent following ROI: 3.382s Total time spent in application: 15.639s

We will examining the output side of the application, which is the Total time spent following ROI.

The serial code takes 13.494 seconds to output the data.

The first technique is a QuickThread non-pipelined technique using one compute thread and one I/O thread which is faster at 11.002 seconds (only marginally faster at 1.2265x).

And using a two-stage QuickThread parallel pipeline taking only 3.382 seconds or 3.989x faster than the serial technique (scaling factor 0.99734).

QuickThread dual thread output with error tests removed (for brevity of this blog):

qt::qtControl OptionDataControl; ... //Write prices to output file FILE* file = fopen(outputFile, "w"); rv = fprintf(file, "%i\n", numOptions); // output format char* fmt = "%.18f\n"; char temp[MAX_INPUT_LINE_SIZE]; // determine typical formatted size intptr_t fmtSize = sprintf(temp, fmt, prices[0]); // over estimate size requirement by factor of 2 // in the event of atypical prices intptr_t fmtSizeX2 = fmtSize*2; intptr_t allocationSize = fmtSizeX2 * bufferSizeInElements; // partition options in groups of bufferSizeInElements // (default is 1024) for(intptr_t iBegin=0; iBegin < numOptions; iBegin += bufferSizeInElements) { // determine iEnd of range intptr_t iEnd = iBegin + bufferSizeInElements; if(iEnd > numOptions) iEnd = numOptions; // allocate output buffer to hold text char* aBuffer = new char[allocationSize]; // place a NULL character into the output buffer // in the event that the thread generates no output aBuffer[0] = 0; // starting position in output buffer intptr_t offset = 0; // iterate across range of options for(intptr_t i = iBegin; i < iEnd; ++i) { // shorten size of buffer to use with sprintf_s // (Debug version wipes buffer) intptr_t spaceToUse = fmtSizeX2; if(spaceToUse > remainingBuffer) spaceToUse = remainingBuffer; // append output to buffer intptr_t bytesWritten = sprintf_s(&aBuffer[offset], spaceToUse, fmt, prices[i]); // advance output offset by bytesWritten offset += bytesWritten; // reduce remaining buffer // to avoid write past end of buffer remainingBuffer -= bytesWritten; } // for(intptr_t i = iBegin; i < iEnd; ++i) // aBuffer now contains text to write to output file // now enqueue I/O task to output buffer to file qt::parallel_task_v1( qt::IOOnDone$, // I/O class, in FIFO order &OptionDataControl, // using this control object writeBuffer, aBuffer, file); // now loop back to next buffer // N.B. writeBuffer deletes aBuffer when done } // for(intptr_t iBegin=0; OptionDataControl.WaitTillDone(); rv = fclose(file); ...

Little stub task to write buffer:

void writeBuffer(char* aBuffer, FILE* file) { if(fputs(aBuffer, file) < 0) { writeToOutputFileError = true; } delete [] aBuffer; }

The above is not too hard to follow (keeping in mind the error tests have been removed).

The dual thread technique improved the performance by 22.6%. Not a bad gain when compared with typical program improvements.

The temptation then is to enhance the above code to use multiple compute threads in parallel, to build batches of 1024 output loads. Then sequentially enqueue the outputs (one per thread). Then continue building batches and sequentially enqueuing the output.

What you are attempting to do is to duplicate the code that is available to you in the QuickThread parallel pipeline. The end result is: you will get faster code (than the two thread variant). However, you will expend more coding effort, and it will not be nearly as fast as what can be obtained by using the highly tuned QuickThread parallel pipeline.

When hand writing code (not shown), using parallel_distribute, to improve the performance, the run time of the output phase was significantly reduced to 4.870 seconds or 2.77x (using 4 threads) as compared to the performance of the original code.

While this improvement is impressive, and certainly would be worth the extra coding effort, it is no where near the performance of using the QuickThread parallel pipeline, and more importantly, while using less coding effort.

Now let's look at how to construct a two-stage parallel pipeline, which will use all of the processor cores (4 in this case), plus an additional thread for I/O. You will find the parallel pipeline code relatively easy to write (after you have done it once or twice).

// Define an output pipeline I/O context class MyOutputPipelineIoContext : public qt::PipelineIoContext { public: FILE *file; // file for stream output int arrayCount; // tunable parameter

// Because two-stage output pipeline has no input I/O stage // it is our responsibility to keep track of the // buffer sequence numbers long outputBufferSequenceNumber;

// ctor MyOutputPipelineIoContext() { file = NULL; arrayCount = 0; outputBufferSequenceNumber = 0; }

// dtor ~MyOutputPipelineIoContext() { // nothing to do }

// Init function void Init(FILE *_file, int _arrayCount) { file = _file; // save file stream arrayCount = _arrayCount; // # elements for buffer } }; // class MyOutputPipelineIoContext

// define the output pipeline buffer class MyOutputPipelineBuffer : public qt::PipelineBuffer { public: int arrayCount; // number of elements char* aBuffer; // allocated proportional to arrayCount char* fmt; // output format (could be global literal) intptr_t fmtSize; intptr_t fmtSizeX2; intptr_t allocationSize; // address of shared sequence number long* outputBufferSequenceNumber;

// the ctor of the pipeline buffer is called from // from within the startup code of the pipeline // Your pipeline buffer may have fixed size buffers // and/or dynamically allocated buffers // Here we use dynamically allocated buffers MyOutputPipelineBuffer(MyOutputPipelineIoContext* pio) { arrayCount = pio->arrayCount; fmt = "%.18f\n"; char temp[MAX_INPUT_LINE_SIZE]; // determine typical formatted size fmtSize = sprintf(temp, fmt, prices[0]); // over estimate size requirement by factor of 2 // in the event of atypical prices fmtSizeX2 = fmtSize*2; allocationSize = fmtSizeX2 * arrayCount;

// allocate the text buffer that will hold // the batch of concatenated output option prices aBuffer = new char[allocationSize]; if(!aBuffer) { writeToOutputFileError = true; printf("ERROR: Unable to allocate buffer.\n"); } }

// dtor ~MyOutputPipelineBuffer() { if(aBuffer) delete [] aBuffer; }

void ConvertPricesToText() { if(arrayCount == 0) return; // End of Data

// obtain our sequence number (thread-safe) SequenceNumber = qt::AtomicAdd(outputBufferSequenceNumber,1) - 1;

// place a NULL character into the output buffer // in the event that the thread generates no output aBuffer[0] = 0;

// compute start point intptr_t iBegin = SequenceNumber * arrayCount; if(iBegin > numOptions) { arrayCount = 0; // indicate End of Data Status = qt::Fail$; return; }

// compute end point (+1) intptr_t iEnd = iBegin + arrayCount; if(iEnd > numOptions) iEnd = numOptions;

// initialize remaining writeable area for thread intptr_t remainingBuffer = allocationSize;

// end of string offset intptr_t offset = 0;

// concatenate loop for(intptr_t i = iBegin; i < iEnd; ++i) { // shorten size of buffer to use with sprintf_s // (Debug version wipes buffer) intptr_t spaceToUse = fmtSizeX2; if(spaceToUse > remainingBuffer) spaceToUse = remainingBuffer;

// append output to buffer intptr_t bytesWritten = sprintf_s(&aBuffer[offset], spaceToUse, fmt, prices[i]); if(bytesWritten <= 0) { writeToOutputFileError = true; Status = qt::ExitFail$; // Hard error return; } offset += bytesWritten; remainingBuffer -= bytesWritten; } // for(intptr_t i = iBegin; i < iEnd; ++i) } // void ConvertPricesToText() }; // class MyOutputPipelineBuffer

// task to process buffer void PipelineProcessOutputBuffer(MyOutputPipelineBuffer* b) { b->ConvertPricesToText(); }

// task to write buffer void PipelineWriteBuffer( MyOutputPipelineIoContext* io, MyOutputPipelineBuffer* b) { // test for prior error if(b->Status != qt::Continue$) { io->Status = b->Status; // up-level error code return; } // my termination condition if(b->arrayCount == 0) { b->Status = qt::ExitSuccess$; io->Status = qt::ExitSuccess$; return; }

// here with data to write // output aBuffer to file if(fputs(b->aBuffer, io->file) < 0) { printf("ERROR: Unable to write to file.\n"); // Use ExitFail$ to shut down pipeline b->Status = qt::ExitFail$; io->Status = qt::ExitFail$; // Hard error writeToOutputFileError = true; return; } } // void PipelineWriteBuffer(

... // Now the code in main to create and run the pipeline // instantiate a pipeline I/O context MyOutputPipelineIoContext Outputpio;

// Initialize our context information Outputpio.Init(file, bufferSizeInElements);

// construct the parallel pipeline object qt::qtPipeline Outputpipeline;

// reduce number of buffers from default (threads*4 + 2) // to threads+2 (this may have to vary depending on system) Outputpipeline.initBuffers(&Outputpio, nThreads+2);

// add the two stages of the pipeline Outputpipeline.addPipe(PipelineProcessOutputBuffer); Outputpipeline.addPipe(PipelineWriteBuffer);

// startup the pipeline to output data;

// (you may run other code here prior to waiting)

// wait for pipeline to complete Outputpipeline.WaitTillDone();

// close output file rv = fclose(file); ... cleanup and exit

The code to drive the parallel pipeline is a minor reorganization of the lesser performing two thread output code shown earlier. The little bit of extra coding effort to add a parallel pipeline pays off with significantly better performance.

Jim Dempsey http://www.quickthreadprogramming.comToday we will look at using the QuickThread threading toolkit to write a high-performance, output end of an application.

Related Reading

More Insights

Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.