Anthony Williams is author of the bookC++ Concurrency in Action and of the just::thread C++0x thread library. He can be contacted at anthony.ajw@gmail.com.
Imagine for a moment a spreadsheet. Some of the cells have values in, but others have formulae which determine the value, possibly based on other cell values. Some are simple calculations such as the sum of a range of cells, while others are more complex and perform intricate calculations involving many other cell values in a multitude of ways. Naturally, the cells that are used in a formula may themselves contain formulae, in which case the spreadsheet calculation engine has to ensure that they are calculated first. If it turns out that the formula in A1 depends on the value of B27 which in turn depends on A1 then you have a circular reference and the answer is undefined.
Dataflow variables are like spreadsheet cells. They may be simplevalues, or the result of complex calculations involving other variables, but they can only be set once. Any calculation that needs the value of a variable that has not been set must block until the value has been set. This makes dataflow variables ideal for concurrency -- the implementation can take care of all the synchronization internally; all you need to do is reference the variables where they are needed and ensure that there are no loops in your logic. Remember the spreadsheet -- if a cell needs its own value to resolve its formula then you have an undefined value. The same happens with dataflow variables, though you'll likely get a hung program.
Using Dataflow Variables
Dataflow variables are available for multiple languages, including Oz, Ruby and Groovy. Also, because of their write-once property, code that uses them is very similar in structure to code written in a Functional language such as Erlang or Haskell. Let's take a look at a few examples, then look at one way of implementing them in C++.
int main()
{
DataFlow<int> x,y,z;
z.task([&](){return x.get()+y.get();});
y=99;
x=123;
std::cout<<"z="<<z.get()<<std::endl;
}
This simple example declares three dataflow variables which store integers. It then specifies that z is the result of a task, passing in a lambda function that calculates the sum of x and y as that task. The it sets the values of x and y explicitly, before printing the value of z, which is now available since x and y have been set.
Okay, how about something a bit more complex? Listing 2 shows the use of dataflow variables to calculate the mean and standard deviation of a set of data:
double standard_deviation(std::vector<double> const& values,double mean)
{
double const variance=
std::accumulate(values.begin(),values.end(),0.0,
[=](double prev,double x){return prev+(x-mean)*(x-mean);})/values.size();
return sqrt(variance);
}
double calculate_mean(std::vector<double> const& values)
{
return std::accumulate(values.begin(),values.end(),0.0)/values.size();
}
int main()
{
DataFlow<std::vector<double> > data;
DataFlow<double> sd,mean;
sd.task([&](){return standard_deviation(data.get(),mean.get());});
mean.task([&](){return calculate_mean(data.get());});
data=get_data();
std::cout<<"standard deviation="<<sd.get()<<", mean="<<mean.get()<<std::endl;
}
In this case, both the mean and standard deviation depend on the data, but the standard deviation also depends on the mean. The implementation ensures that all the dependencies are ordered correctly, so when you wait for the standard deviation value by calling sd.get() then the implementation ensures that the task to calculate the standard deviation has run. Since this in turn calls mean.get(), the implementation will also ensure that the task to calculate the mean has been run too.


