Channels ▼
RSS

.NET

Implementing Producer-Consumer Hand-Offs in C


Producer-consumer situations, in which one thread generates data for a second thread to use, occur frequently in multithreaded code. As a result, it's important to know how to code the hand-off of data between threads correctly and efficiently. This article examines an implementation in C using threading primitives from Microsoft Windows. The concepts, however, are universal and the primitives used here map to most other threading APIs.

When working with a producer-consumer situation a finite-sized queue is often the data structure of choice. It is convenient to think of the queue as circular, as in Figure 1.

Figure 1: An empty fixed-size circular queue, waiting for the first item. Both head and tail point to the first element.

Using this design, the producer thread adds chunks of data at the tail, while the consumer thread reads them from the head of the queue. Figure 2 shows a queue after the addition of a single word: "It."

Figure 2: A word has been added as the first entry in the queue.

Notice that the head is pointing at the first entry. The tail is pointing at the element in which we will place the next item. When the queue is full, but before we update tail, the queue appears as it does in Figure 3.

Figure 3: The full queue before updating the tail pointer.

When we advance the tail to point to the next element where an item would go, we will have both head and tail pointing to the upper-right element, as in Figure 1. The difference is that unlike Figure 1, the queue is full, not empty. This situation tells us that we cannot use solely the relative positions of the head and tail to determine whether the queue is full or empty. We need a flag of some sort. In practice, a counter of elements is favored over a flag. This counter will, of course, disambiguate the situation, as well as make it simple for functions to check on waiting data.

Returning to the diagrams, the queue is now full and no more words can be added until at least one is removed from the head, as in Figure 4.

Figure 4: An item has been removed from the queue and room now exists for a new item to be added.

As can be seen from the sequence of figures, head and tail chase each other around the queue, stopping only when the queue is full.

Implementing the Queue in Code

Queues are typically implemented as a structure containing a pointer to the head and tail, a counter of the number of items in the queue, and the set of elements that make up the queue itself. In our code, we will use a simple array of 10 elements.

#define Q_SIZE           10
#define Q_END     (Q_SIZE-1)
struct queue {     // the queue data structure
    int  head;
    int  tail;
    long count;
    struct item entry[Q_SIZE];
} q;

As explained, the counter helps us keep track of how many items are in the queue, which is necessary to disambiguate when the queue is full or empty. These elements will be used in a program in which the producer thread reads data from a file in blocks that are placed in a queue. The consumer thread reads those blocks and prints them to the screen. Each queue element consists of a text area and two integers: one containing the amount of data in the block, the other the block number.

#define BLK_SIZE        100
struct item {            		// the items in the queue: 
    int    blk_num;     		// the block sequence number
    size_t data_len;       	// how much data in the block
    char   data [BLK_SIZE+1];	// the block data
   };
   bool atEOF;		   	// so we know when to stop

Initializing the queue is trivial: The head, tail, and counter are all set to zero. head and tail are subscripts into the array of items rather than actual pointers, so setting them to zero points them both at the first element, as illustrated in Figure 1.

Adding items to the queue is done with the following code:

int addQueue ( struct queue *pq, char *text, size_t len )
{
static block_count = 0;
if ( pq->count == Q_SIZE )  	// if queue is full, wait
                              	// until space opens
{
    WaitForSingleObject ( hSpaceOpen, INFINITE );
    ResetEvent ( hSpaceOpen );	// and reset the event
}
memcpy ( pq->entry[pq->tail].data, text, len );
pq->entry[pq->tail].data_len = len;
pq->entry[pq->tail].blk_num  = ++block_count;
InterlockedExchangeAdd ( &(pq->count), 1L );
if ( pq->count == 1L ) 		// only true if previously was 0
    SetEvent ( hDataReady ); 	// so announce that there is
                             	//  data ready
if ( pq->tail == Q_END )
     pq->tail = 0;
else
     pq->tail++;
return ( 0 );
}

The function takes three parameters: a pointer to the queue, the text to be added to the queue, and an integer specifying the length of the text. The function first checks to see whether the queue is full. If so, it must halt the current thread (which is both reading and adding to the queue). It does this by waiting on a Windows event, hSpaceOpen. This event is signaled only when the consumer thread removes an item from a full queue, which tells the function the addition of an element may proceed. Because of this, if the queue is full, the thread will wait for the consumer thread. This is handled by the call to WaitFor­SingleObject(). Note that when Windows signals an event (that is, allows threads waiting on it to proceed), that event remains signaled until it is reset. So here, the full-queue event would remain signaled if we did not reset it immediately. However, we always want the thread to hold up when the queue is full. So, the first instruction once the event is signaled is to reset the event, so that the next time through, the thread will have to wait again if the queue is full.

After determining that there is room in the queue, the code loads the data into the queue item indicated by the tail. It then increments the queue's item counter via Windows' interlocked functions. The interlocked functions are an efficient mechanism for updating counters on a mutually exclusive basis — no other thread can access the counter during the interlock operation. Note that in our implementation of the queue, in which one thread writes to the queue and the other removes (simply by reading) items, the only data item they might both need to modify at the same time is the counter. Hence the use of mutual exclusion on this one data item. After incrementing the counter, the code checks to see whether the new item is the only one in the queue. If it is, the function knows that the consumer thread was waiting for it, because the consumer thread (whose code follows) always goes into a wait state when the queue is empty. Hence, the producer code checks for this condition, and if it's found, it sets the event that alerts the consumer code to retrieve the newly enqueued item.

Finally, the code increments tail, making the proper adjustment to wrap around to the beginning of the array/queue as needed.

The consumer thread is coded as follows:

int getQueue ( struct queue *pq, struct item *dest )
{
if ( pq->count <= 0L ) 	// if queue is empty, check EOF.
				// If not EOF,
   if ( atEOF == TRUE ) 	// then wait for more data
       return ( 0 );
   else
   {
       WaitForSingleObject ( hDataReady, INFINITE );
       ResetEvent ( hDataReady );
   }                 
   memcpy ( dest->data, pq->entry[pq->head].data,
            pq->entry[pq->head].data_len );
   dest->blk_num = pq->entry[pq->head].blk_num;
   dest->data_len = pq->entry[pq->head].data_len;
   InterlockedExchangeAdd ( &(pq->count), -1L ); 
   if ( pq->count == ( Q_SIZE - 1 ))	// only true if
                                      	// queue was full
       SetEvent ( hSpaceOpen );     	// so announce
                                    	// there's room     
   if ( pq->head == Q_END )
       pq->head = 0;
   else
       pq->head++;
   return ( 0 );
}

This code bears a lot of similarity to the producer routine. Because it's trying to remove items from the queue, it first checks whether the queue is empty. If so, it must make an additional check: Is there any more data to come? It does this by checking the atEOF flag. If it's not end of file (EOF), then the thread waits to be signaled by the addQueue() function, as was just explained. It immediately resets the event, copies the file chunk, decrements the item counter via interlocked functions, and then checks to see whether the queue was full prior to removing this item. If the queue was indeed full, it knows the producer thread is waiting or will eventually be waiting to add an item (since we're not at EOF), and so it signals the hSpaceOpen event, which informs the producer thread that a slot is available and to proceed. It then adjusts the head of the queue, making the adjustment for wrapping around to the beginning of the array, as needed. Notice, that it signals the producer thread before it adjusts the head. It could just as well signal it after. But since the producer thread never accesses the head data item, the function does not need to make the producer thread wait. A central idea of parallel programming is to keep threads waiting on each other as little as possible. Hence, once the code knows the slot is available and it's safe to announce it, it does so before doing anything else.

Now, let's look at the consumer thread:

unsigned __stdcall ProcessData ( void* pv )
{
   int i;
   struct item *data_out;
   data_out = (struct item*) malloc ( sizeof (struct item)); 
   // wait to be told there's data waiting
   WaitForSingleObject ( hDataReady, INFINITE );
   ResetEvent ( hDataReady );
   while ( 1 )
   {
      if ( q.count == 0 && atEOF == TRUE )
          break;
      else
      {
          getQueue ( &q, data_out );
          // print the fetched data
          for ( i = 0; i < data_out->data_len; i++ )
                printf ( "%c", data_out->data[i] );
      }
   }
   return ( 0 );
}

This code is straightforward. It waits for a Windows event to signal that there is an item in the queue. It then enters an infinite loop that exits only when the queue is empty and the producer thread has reached end of file (EOF). The loop simply gets queue items and prints the data to the screen. Notice that all the negotiation between threads as to when data is ready is handled within the queue primitives. Hence, the logic of this consumer thread is minimal.


This article was updated and adapted from "Programming with Hyper-Threading Technology" by Rich Gerber and Andrew Binstock, published by Intel Press. Portions are copyright Intel Corp.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Comments:

Video