Channels ▼
RSS

Design

Dataflow Programming: Handling Huge Data Loads Without Adding Complexity


Support for Big Data

Because the dataflow operators in a graph work in parallel, the model allows overlapping I/O operations with computation. This is a "whole application" approach to parallelization as opposed to many thread-oriented performance frameworks that focus on hot sections of code such as for loops. This addresses a key problem in processing "big data" for today's many-core processors: feeding data fast enough to the processors.

While dataflow does this, it scales down easily as well. This distinguishes it from technologies, such as Hadoop and to a lesser extent, Map Reduce, which don't scale downward well due to their innate complexity.

We've discussed how a dataflow architecture exploits multicore. These same principles can be applied to multi-node clusters by extending dataflow queues over networks with a dataflow graph executed on multiple systems in parallel. The compositional model of building dataflow graphs allows for replication of pieces of the graph across multiple nodes. Scaling out extends the reach of dataflow to solve large data problems.

Dataflow and Actors

The actor model has been popularized by languages such as Erlang and Scala. In these models, independent actors communicate using messages. When an actor receives a message, it acts upon it as defined by functions inside the actor. Actors define communication endpoints that allow other actors to find them and send them messages. In some implementations, messages are free form and actors use regular expression forms to distinguish messages and how they are handled.

Below is a very simple example of an actor in Scala. This code is a fragment as it does not include imports or definitions of the message classes. The MessageHandler class extends the Actor library class, overriding the act() method. The act() method is invoked to handle incoming messages. The example code receives messages in a loop until a Stop message is received. Once a Stop is received, the sample invokes exit(), terminating the instance of the actor. Inside of the loop the receive block is used to process messages by type. The case statement is used per message type that can be received. Regular expressions can also be used to determine which messages are handled by a case statement. The Message type is handled by doing some processing of the message and then sending an acknowledgement to the message sender. The sender variable is defined by Actor.

class MessageHandler extends Actor {
    def act() {
        while (true) {
            receive {
                case Message =>
                    // Do some work to handle the message
                    // Send an acknowledgement
                    sender ! Ack
                case Stop =>
                    // Terminates the actor
                    exit()
            }
        }
    }
}

On the surface, the dataflow and actor models are very similar. They both support a functional, shared-nothing programming style. The concepts within each are easy to grasp. It is straightforward to pick up frameworks of either model and become proficient quickly. Both models are also easy to extend by writing new operators or actors. As with dataflow, the actor model hides multithreading issues from the developer. Having no shared memory means no synchronization worries.

As with dataflow, the actor model scales up as well as out. Both models also support an active or reactive implementation model. Active models imply an active thread per operator or actor. The reactor model allows for a work-stealing implementation, where the number of system resources utilized can be contained, thus allowing for many thousands of instances of actors.

Below is an example written in more of a dataflow style. It is a simple operator that takes strings as inputs and outputs the length of each string as output. The looping construct is similar to an Actor, but handles a more static data type.

public class StringLength extends DataflowProcess {
    
    private final StringInput input;
    private final IntOutput output;

                 public StringLength(Dataflow source) {
        // Handle properties, set up inputs and outputs …
    }

    @Override       
    protected void execute() {
        while (input.stepNext()) {
            output.push(input.asString().length());
        }

        output.pushEndOfData();
    }
}

There are differences in the two models. The actor model does not guarantee immutability or message ordering; whereas dataflow does. Nor do actors optimize for large numbers of messages or messages with large amounts of data.

Dataflow provides flow control where the actor model doesn't (at least, not out of the box). Without flow control, messages either get dropped or memory usage problems occur as communication endpoints become full. Dataflow also generally does static binding of components at composition time, whereas actors are much more dynamic. Static binding enables graph analysis to apply optimizations.

The actor model appears much better suited to task-based parallelism where concurrency is important. For example, Erlang programs control telecom equipment with thousands of unique requests per minute. Meanwhile, dataflow is focused on data parallelism where single-job data processing and analysis performance are the main concerns. In a sense, they're slightly different solutions for different domains, although conceptually similar.

Performance Benchmarks

I used the MalStone B10 benchmark, which consists of 10 billion rows of log data in five fields, to measure performance of a dataflow based framework. The overall data size is nearly a terabyte. The code for the benchmark extracts the year and week of the year from a timestamp field and aggregates a compromise indicator by site identifier, year and week. A count of overall site activity is included and the summations are cumulative.

The benchmark was run on a single-node machine with 32 cores. Figure 2 shows run times for the benchmark. The test was run with increasing numbers of cores showing good scalability as the compute resources increased.

The test was also run on a 20-node cluster using Amazon EC2 instances. Each instance contained eight cores but less capable I/O systems. The overall runtime was 10 minutes, showing the ability to scale large processing jobs across a cluster using dataflow techniques.

Figure 2: Malstone B10 run times using a dataflow framework.

Conclusion

Multicore and many-core will be the performance answer of processor providers for years to come. Software has yet to catch up, but mainstream evolution towards parallel, scalable software is taking place. As a software architecture, dataflow fits well in the big data domain. It enables impressive multicore and multi-node utilization and scalability. It's easy to learn and easy to use and scales both up and out well, as well as down for smaller workloads.

Further Reading

Simple Concurrency with Dataflow Variables in C++


Jim Falgout is the chief technologist for Pervasive DataRush; Pervasive is a company that specializes in embedded, cloud-based, and highly parallel database technologies.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video