Channels ▼
RSS

Parallel

Lock-Free Queues

Source Code Accompanies This Article. Download It Now.


Petru is a Vice President for Morgan Stanley, where he works as a C++ senior programmer in investment banking. He can be contacted at petru.marginean@gmail.com.


This article as written assumes a sequentially consistent model. In particular, the code relies on specific order of instructions in both Consumer and Producer methods. However, without inserting proper memory barrier instructions, these instructions can be reordered with unpredictable results (see, for example, the classic Double-Checked Locking problem).

Another issue is using the standard std::list<T>. While the article mentions that it is the developer responsibility to check that the reading/writing std::list<T>::iterator is atomic, this turns out to be too restrictive. While gcc/MSVC++2003 has 4-byte iterators, the MSVC++2005 has 8-byte iterators in Release Mode and 12-byte iterators in the Debug Mode.

The solution to prevent this is to use memory barriers/volatile variables. The downloadable code featured at the top of this article has fixed that issue.

Many thanks to Herb Sutter who signaled the issue and helped me fix the code. --P.M.

Queues can be useful in a variety of systems involving data-stream processing. Typically, you have a data source producing data—requests coming to a web server, market feeds, or digital telephony packets—at a variable pace, and you need to process the data as fast as possible so there are no losses. To do this, you can push data into a queue using one thread and process it using a different thread—a good utilization of resources on multicore processors. One thread inserts data into the queue, and the other reads/deletes elements from the queue. Your main requirement is that a high-rate data burst does not last longer than the system's ability to accumulate data while the consumer thread handles it. The queue you use has to be threadsafe to prevent race conditions when inserting/removing data from multiple threads. For obvious reasons, it is necessary that the queue mutual exclusion mechanism add as little overhead as possible.

In this article, I present a lock-free queue (the source code for the lockfreequeue class is available online; see www.ddj.com/code/) in which one thread can write to the queue and another read from it—at the same time without any locking.

To do this, the code implements these requirements:

  • There is a single writer (Producer) and single reader (Consumer). When you have multiple producers and consumers, you can still use this queue with some external locking. You cannot have multiple producers writing at the same time (or multiple consumers consuming the data simultaneously), but you can have one producer and one consumer (2x threads) accessing the queue at the same time (Responsibility: developer).
  • When inserting/erasing to/from an std::list<T>, the iterators for the existing elements must remain valid (Responsibility: library implementor).
  • Only one thread modifies the queue; the producer thread both adds/erases elements in the queue (Responsibility: library implementor).
  • Beside the underlying std::list<T> used as the container, the lock-free queue class also holds two iterators pointing to the not-yet-consumed range of elements; each is modified by one thread and read by the other (Responsibility: library implementor).
  • Reading/writing list<T>::iterator is atomic on the machine upon which you run the application. If they are not on your implementation of STL, you should check whether the raw pointer's operations are atomic. You could easily replace the iterators to be mentioned shortly with raw pointers in the code (Responsibility: machine).

Because I use Standard C++, the code is portable under the aforementioned "machine" assumption:


template <typename T>
struct LockFreeQueue
{
  LockFreeQueue();
  void Produce(const T& t);
  bool Consume(T& t);
private:
  typedef std::list<T> TList;
  TList list;
  typename TList::iterator iHead, iTail;
};

Considering how simple this code is, you might wonder how can it be threadsafe. The magic is due to design, not implementation. Take a look at the implementation of the Produce() and Consume() methods. The Produce() method looks like this:

 
void Produce(const T& t)
{
  list.push_back(t);
  iTail = list.end();
  list.erase(list.begin(), iHead);
}

To understand how this works, mentally separate the data from LockFreeQueue<T> into two groups:

  • The list and the iTail iterator, modified by the Produce() method (Producer thread).
  • The iHead iterator, modified by the Consume() method (Consumer thread).

Produce() is the only method that changes the list (adding new elements and erasing the consumed elements), and it is essential that only one thread ever calls Produce()—it's the Producer thread! The iterator (iTail) (only manipulated by the Producer thread) changes it only after a new element is added to the list. This way, when the Consumer thread is reading the iTail element, the new added element is ready to be used. The Consume() method tries to read all the elements between iHead and iTail (excluding both ends).

 
bool Consume(T& t)
{
  typename TList::iterator iNext = iHead;
  ++iNext;
  if (iNext != iTail)
  {
    iHead = iNext;
    t = *iHead;
    return true;
  }
  return false;
}

This method reads the elements, but doesn't remove them from the list. Nor does it access the list directly, but through the iterators. They are guaranteed to be valid after std::list<T> is modified, so no matter what the Producer thread does to the list, you are safe to use them.

The std::list<T> maintains an element (pointed to by iHead) that is considered already read. For this algorithm to work even when the queue was just created, I add an empty T() element in the constructor of the LockFreeQueue<T> (see Figure 1):

 

LockFreeQueue()
{
  list.push_back(T());
  iHead = list.begin();
  iTail = list.end();
}

[Click image to view at full size]

Figure 1: Adding an empty T() element in the constructor of the LockFreeQueue<T>.

Consume() may fail to read an element (and return false). Unlike traditional lock-based queues, this queue works fast when the queue is not empty, but needs an external locking or polling method to wait for data. Sometimes you want to wait if there is no element available in the queue, and avoid returning false. A naive approach to waiting is:

 
T Consume()
{
  T tmp;
  while (!Consume(tmp))
    ;
  return tmp;
}

This Consume() method will likely heat up one of your CPUs red-hot to 100-percent use if there are no elements in the queue. Nevertheless, this should have good performance when the queue is not empty. However, if you think of it, a queue that's almost never empty is a sign of systemic trouble: It means the consumer is unable to keep pace with the producer, and sooner or later, the system is doomed to die of memory exhaustion. Call this approach NAIVE_POLLING.

A friendlier Consume() function does some pooling and calls some sort of sleep() or yield() function available on your system:


T Consume(int wait_time = 1/*milliseconds*/)
{
  T tmp;
  while (!Consume(tmp))
  {
    Sleep(wait_time/*milliseconds*/);
  }
  return tmp;
}

The DoSleep() can be implemented using nanosleep() (POSIX) or Sleep() (Windows), or even better, using boost::thread::sleep(), which abstracts away system-dependent nomenclature. Call this approach SLEEP. Instead of simple polling, you can use more advanced techniques to signal the Consumer thread that a new element is available. I illustrate this in Listing One using a boost::condition variable.

 

#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/xtime.hpp>
    
template <typename T>
struct WaitFreeQueue
{
    void Produce(const T& t)
    {
        queue.Produce(t);
        cond.notify_one();
    }
    bool Consume(T& t)
    {
        return queue.Consume(t);
    }
    T Consume(int wait_time = 1/*milliseconds*/)
    {
        T tmp;
        if (Consume(tmp))
            return tmp;
        // the queue is empty, try again (possible waiting...)
        boost::mutex::scoped_lock lock(mtx);
        while (!Consume(tmp)) // line A
        {
            boost::xtime t;
            boost::xtime_get(&t, boost::TIME_UTC);
            AddMilliseconds(t, wait_time);
            cond.timed_wait(lock, t); // line B
        }
        return tmp;
    }
private:
    LockFreeQueue<T> queue;
    boost::condition cond;
    boost::mutex mtx;
};

Listing One

I used the timed_wait() instead of the simpler wait() to solve a possible deadlock when Produce() is called between line A and line B in Listing One. Then wait() will miss the notify_one() call and have to wait for the next produced element to wake up. If this element never comes (no more produced elements or if the Produce() call actually waits for Consume() to return), there's a deadlock. Call this approach TIME_WAIT.

The lock is still wait-free as long as there are elements in the queue. In this case, the Consumer() thread does no waiting and reads data as fast as possible (even with the Producer() that is inserting new elements). Only when the queue is exhausted does locking occur.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video