Lock-Free Queues

One thread can write and another read—at the same time!


July 01, 2008
URL:http://www.drdobbs.com/cpp/lock-free-queues/208801974

Petru is a Vice President for Morgan Stanley, where he works as a C++ senior programmer in investment banking. He can be contacted at [email protected].


This article as written assumes a sequentially consistent model. In particular, the code relies on specific order of instructions in both Consumer and Producer methods. However, without inserting proper memory barrier instructions, these instructions can be reordered with unpredictable results (see, for example, the classic Double-Checked Locking problem).

Another issue is using the standard std::list<T>. While the article mentions that it is the developer responsibility to check that the reading/writing std::list<T>::iterator is atomic, this turns out to be too restrictive. While gcc/MSVC++2003 has 4-byte iterators, the MSVC++2005 has 8-byte iterators in Release Mode and 12-byte iterators in the Debug Mode.

The solution to prevent this is to use memory barriers/volatile variables. The downloadable code featured at the top of this article has fixed that issue.

Many thanks to Herb Sutter who signaled the issue and helped me fix the code. --P.M.

Queues can be useful in a variety of systems involving data-stream processing. Typically, you have a data source producing data—requests coming to a web server, market feeds, or digital telephony packets—at a variable pace, and you need to process the data as fast as possible so there are no losses. To do this, you can push data into a queue using one thread and process it using a different thread—a good utilization of resources on multicore processors. One thread inserts data into the queue, and the other reads/deletes elements from the queue. Your main requirement is that a high-rate data burst does not last longer than the system's ability to accumulate data while the consumer thread handles it. The queue you use has to be threadsafe to prevent race conditions when inserting/removing data from multiple threads. For obvious reasons, it is necessary that the queue mutual exclusion mechanism add as little overhead as possible.

In this article, I present a lock-free queue (the source code for the lockfreequeue class is available online; see www.ddj.com/code/) in which one thread can write to the queue and another read from it—at the same time without any locking.

To do this, the code implements these requirements:

Because I use Standard C++, the code is portable under the aforementioned "machine" assumption:


template <typename T>
struct LockFreeQueue
{
  LockFreeQueue();
  void Produce(const T& t);
  bool Consume(T& t);
private:
  typedef std::list<T> TList;
  TList list;
  typename TList::iterator iHead, iTail;
};

Considering how simple this code is, you might wonder how can it be threadsafe. The magic is due to design, not implementation. Take a look at the implementation of the Produce() and Consume() methods. The Produce() method looks like this:

 
void Produce(const T& t)
{
  list.push_back(t);
  iTail = list.end();
  list.erase(list.begin(), iHead);
}

To understand how this works, mentally separate the data from LockFreeQueue<T> into two groups:

Produce() is the only method that changes the list (adding new elements and erasing the consumed elements), and it is essential that only one thread ever calls Produce()—it's the Producer thread! The iterator (iTail) (only manipulated by the Producer thread) changes it only after a new element is added to the list. This way, when the Consumer thread is reading the iTail element, the new added element is ready to be used. The Consume() method tries to read all the elements between iHead and iTail (excluding both ends).

 
bool Consume(T& t)
{
  typename TList::iterator iNext = iHead;
  ++iNext;
  if (iNext != iTail)
  {
    iHead = iNext;
    t = *iHead;
    return true;
  }
  return false;
}

This method reads the elements, but doesn't remove them from the list. Nor does it access the list directly, but through the iterators. They are guaranteed to be valid after std::list<T> is modified, so no matter what the Producer thread does to the list, you are safe to use them.

The std::list<T> maintains an element (pointed to by iHead) that is considered already read. For this algorithm to work even when the queue was just created, I add an empty T() element in the constructor of the LockFreeQueue<T> (see Figure 1):

 

LockFreeQueue()
{
  list.push_back(T());
  iHead = list.begin();
  iTail = list.end();
}

[Click image to view at full size]

Figure 1: Adding an empty T() element in the constructor of the LockFreeQueue<T>.

Consume() may fail to read an element (and return false). Unlike traditional lock-based queues, this queue works fast when the queue is not empty, but needs an external locking or polling method to wait for data. Sometimes you want to wait if there is no element available in the queue, and avoid returning false. A naive approach to waiting is:

 
T Consume()
{
  T tmp;
  while (!Consume(tmp))
    ;
  return tmp;
}

This Consume() method will likely heat up one of your CPUs red-hot to 100-percent use if there are no elements in the queue. Nevertheless, this should have good performance when the queue is not empty. However, if you think of it, a queue that's almost never empty is a sign of systemic trouble: It means the consumer is unable to keep pace with the producer, and sooner or later, the system is doomed to die of memory exhaustion. Call this approach NAIVE_POLLING.

A friendlier Consume() function does some pooling and calls some sort of sleep() or yield() function available on your system:


T Consume(int wait_time = 1/*milliseconds*/)
{
  T tmp;
  while (!Consume(tmp))
  {
    Sleep(wait_time/*milliseconds*/);
  }
  return tmp;
}

The DoSleep() can be implemented using nanosleep() (POSIX) or Sleep() (Windows), or even better, using boost::thread::sleep(), which abstracts away system-dependent nomenclature. Call this approach SLEEP. Instead of simple polling, you can use more advanced techniques to signal the Consumer thread that a new element is available. I illustrate this in Listing One using a boost::condition variable.

 

#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/xtime.hpp>
    
template <typename T>
struct WaitFreeQueue
{
    void Produce(const T& t)
    {
        queue.Produce(t);
        cond.notify_one();
    }
    bool Consume(T& t)
    {
        return queue.Consume(t);
    }
    T Consume(int wait_time = 1/*milliseconds*/)
    {
        T tmp;
        if (Consume(tmp))
            return tmp;
        // the queue is empty, try again (possible waiting...)
        boost::mutex::scoped_lock lock(mtx);
        while (!Consume(tmp)) // line A
        {
            boost::xtime t;
            boost::xtime_get(&t, boost::TIME_UTC);
            AddMilliseconds(t, wait_time);
            cond.timed_wait(lock, t); // line B
        }
        return tmp;
    }
private:
    LockFreeQueue<T> queue;
    boost::condition cond;
    boost::mutex mtx;
};

Listing One

I used the timed_wait() instead of the simpler wait() to solve a possible deadlock when Produce() is called between line A and line B in Listing One. Then wait() will miss the notify_one() call and have to wait for the next produced element to wake up. If this element never comes (no more produced elements or if the Produce() call actually waits for Consume() to return), there's a deadlock. Call this approach TIME_WAIT.

The lock is still wait-free as long as there are elements in the queue. In this case, the Consumer() thread does no waiting and reads data as fast as possible (even with the Producer() that is inserting new elements). Only when the queue is exhausted does locking occur.

The Ping-Pong Test

To compare the three approaches (NAIVE_POLLING, SLEEP, and TIME_WAIT), I implemented a test called "Ping-Pong" that is similar to the game of table tennis (the source code is available online). In Figure 2, there are two identical queues between the threads T1 and T2. You first load one of the queues with a number of "balls," then ask each thread to read from one queue and write to the other. The result is a controlled infinite loop. By limiting the game to a fixed number of reads/writes ("shots"), you get an understanding of how the queue behaves when varying the waiting/sleep time and strategy and the number of "balls." The faster the game, the better the performance. You should also check CPU usage to see how much of it is used for real work.

Figure 2: The Ping-Pong test.

In a wait-free system, the more balls in the game, the better the performance gain compared to the classic locking strategy. This is because wait-free is an optimistic concurrency control method (works best when there is no contention), while classical lock-based concurrency control is pessimistic (assumes contention happens and preemptively inserts locking).

Ready to play? Here is the Ping-Pong test command line:


$> ./pingpong [strategy] [timeout] [balls] [shots]

When you run the program, the tests show the results in the table shown in Figure 3:

[Click image to view at full size]

Figure 3: Ping-Pong test results.

Figure 4 presents a table with the results for a classic approach (see SafeQueue). These results show that this queue is, on average, more than four-times slower than the LockFreeQueue. The slowdown comes from the synchronization between threads. Both Produce() and Consume() have to wait for each other to finish. CPU usage is almost 100 percent for this test (similar to the NO_WAIT strategy, but not even close to its performance).

[Click image to view at full size]

Figure 4: Classic approach results.

Final Considerations

The single-threaded code below shows the value of the list.size() when Producing/ Consuming elements:

 
LockFreeQueue<int> q;   // list.size() == 1
q.Produce(1);    // list.size() == 2
int i;
q.Consume(i);  // list.size() == still 2!;
               // Consume() doesn't modify the list
q.Produce(i);    // list.size() == 2 again;

The size of the queue is 1 if Produce() was never called and greater than 1 if any element was produced.

No matter how many times Consume() is called, the list's size will stay constant. It is Produce() that is increasing the size (by 1); and if there were consumed elements, it will also delete them from the queue. In a way, Produce() acts as a simple garbage collector. The whole thread safety comes from the fact that specific data is modified from single threads only. The synchronization between threads is done using iterators (or pointers, whichever has atomic read/write operation on your machine). Also consider this code:

 
usleep(1000);    // sleep 1 microsecond

On the face of it, this line of code makes a thread sleep for 1 microsecond, and then continue. In reality, 1 microsecond is just a lower bound to the duration of the call.

The man page for usleep() says, "The usleep() function suspends execution of the calling process for (at least) usec microseconds. The sleep may be lengthened slightly by any system activity or by the time spent processing the call or by the granularity of system timers," or if you use the nanosleep() function. "Therefore, nanosleep() always pauses for at least the specified time; however, it can take up to 10 ms longer than specified until the process becomes runnable again."

So if the process is not scheduled under a real-time policy, there's no guarantee when your thread will be running again. I've done some tests and (to my surprise) there are situations when code such as:

 
cond.timed_wait(lock, x);    // x = e.g. 1 millisecond

will actually wait for more than 1 second.

Acknowledgments

Many thanks to Andrei Alexandrescu who took the time to review this article. Also, thanks to Radu Duta for making useful corrections.

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.