Channels ▼
RSS

Tools

Simple Concurrency with Dataflow Variables in C++


Dataflow Variables and Concurrency

Now, here's the nifty part: dataflow variables make it really easy to parallelize an application. If the only state shared between the separate tasks that set the dataflow variables is itself encapsulated in dataflow variables then the implementation can choose to run some or all of the tasks on separate threads and everything will "just work". If the code worked fine on a single thread and you only share state through dataflow variables then it will also work fine with multiple threads.

Of course, it requires discipline to ensure that the only shared state is in dataflow variables. That means no global variables that aren't dataflow variables, so std::cin and std::cout are off limits. You also need to ensure that if you pass a pointer through a dataflow variable then the data being pointed to isn't going to change.

If you really need to access shared state that isn't covered by dataflow variables then you need to ensure that everything is correctly synchronized. For example, you could have a global mutex which you used to protect uses of std::cin and std::cout. If you do this then you need to ensure that you're not holding locks when you query a dataflow variable, since that may cause a task to be run, which may in turn try and acquire the same lock, and thus deadlock your program.

Of course, you're not limited to automatic parallelism: you can also spawn your own threads to set the values on dataflow variables. Even if you do so, then managing shared state with dataflow variables will still make your code easier to reason about and less error-prone. For example, you can only get a deadlock from dataflow variables if there is a circular dependency, in which case you get the deadlock even on a single-threaded execution. The potential for race conditions is also much reduced -- since the variables are write-once, the biggest source of race conditions (mutating state) is gone.

Under the Hood

Okay, so we've looked at what the potential benefits are from using dataflow variables, so let's take a look at what makes them tick. I'm going to describe a simple implementation based on the new C++0x thread library. This has the benefit of being portable between platforms, while missing out on potential optimizations, and relinquishing control of the number of threads to the C++0x library.

Our variables have three operations we can do to them:

  • Assign a value directly
  • Specify that the value is the result of a task
  • wait for and retrieve the value

Assigning a value and then waiting for it is easily done with a std::promise/std::shared_future pair, whereas specifying that the value is the result of a task is done with a std::async/std::shared_future pair. The thing is we don't know which we've going to have, and we can't wait on a future until it's been associated with something. We therefore need a mechanism for waiting that allows us to first wait until the future is valid, and then wait on the future itself.

This is a problem that's been around for as long as there's been multithreaded code, and the solution is almost as old: Use a condition variable. In the implementation of get() we use a standard condition-variable wait loop to wait until the future is valid. Once the future is valid we can then wait on the future to retrieve the value.


    T const& get()
    {
        std::unique_lock<std::mutex> lk(m);
        while(!data.valid())
        {
            cond.wait(lk);
        }
        lk.unlock();
        return data.get();
    }

Listing 3: The DataFlow<T>::get() member function.

In the assignment operator we can then create a promise, set the stored future to that from the promise, and set the value of the promise. We can then notify every thread waiting on the condition variable that the future has been set, and that the value is ready to retrieve.


    DataFlow& operator=(T value)
    {
        std::unique_lock<std::mutex> lk(m);
        if(data.valid())
        {
            throw std::runtime_error("Source already set");
        }
        std::promise<T> p;
        p.set_value(std::move(value));
        data=p.get_future();
        lk.unlock();
        cond.notify_all();
        return *this;
    }

Listing 4: The DataFlow<T> assignment operator.

Similarly, in the task() member function we can launch the task with std::async, set the future to that returned from the std::async call and notify all threads waiting on the condition variable:


    template<typename F>
    void task(F&& f)
    {
        {
            std::lock_guard<std::mutex> lk(m);
            if(data.valid())
            {
                throw std::runtime_error("Source already set");
            }
            data=std::async(std::function<T()>(std::forward<F>(f)));
        }
        cond.notify_all();
    }

Listing 5: The DataFlow<T>::task() member function.

Note that since this is a write-once variable, both the task() member function and assignment operator throw an exception if the future is already valid, indicating that the variable has already been set to a value or a task result. Also note that in both cases the mutex is unlocked before the condition variable is notified. This means that the waking threads don't have to block waiting for the notifying thread to unlock the mutex before proceeding.

The use of std::async does give us a modicum of control over the use of concurrency. In particular, it gives us three options:

  • Completely synchronous execution. The tasks are only executed when the variable's value is requested in a call to get(). This allows us to easily check the logic of our code -- any circular references will repeatedly and deterministically hang the program.
  • Completely asynchronous execution. This allows for maximum concurrency, but potentially leads to oversubscription by spawning a huge number of threads. It might also result in hitting resource limits on the number of threads.
  • Allowing the implementation to choose between synchronous and asynchronous execution. This is what the implementation of task() shown above does. This is likely to provide at least some level of parallelism, but the maximum potential concurrency might not be realized if the implementation makes poor choices of which tasks to execute asynchronously. In particular, if all the asynchronous tasks block on the same variable then the concurrency is limited, even if there are other tasks that could be run in parallel.

The choice of option can be specified by providing an additional first parameter to the std::async call. By specifying std::async(std::launch::sync,std::forward<F*gt;(f)), then the task will be called synchronously by the first thread to call get(), whereas by specifying std::async(std::launch::async,std::forward<F>(f)) the task will run on its own thread. The third option, which is equivalent to not specifying the policy, is to pass std::launch::any as the lauch policy. This leaves it up to the implementation to choose.

The ideal schedule on a given set of hardware can probably be best obtained with a specially-designed thread pool. However, the use of std::async will go a fair way towards that, and can be easily be replaced later if profiling demonstrates that there is appreciable opportunity for concurrency that is not being exploited. It also provides an easy way to debug the dataflow logic by using std::launch::sync.

Try It Out

The simple implementation of dataflow variables based on the C++0x thread library described here can be downloaded here from my web site, and here from Dr. Dobb's. Let me know what you think. Has the use of dataflow variables (either in C++ or any other language) enabled you to simplify your multithreaded code?


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video