Channels ▼


Applying the Big Data Lambda Architecture

Based on his experience working on distributed data processing systems at Twitter, Nathan Marz recently designed a generic architecture addressing common requirements, which he called the Lambda Architecture. Marz is well-known in Big Data: He's the driving force behind Storm and at Twitter he  led the streaming compute team, which provides and develops shared infrastructure to support critical real-time applications.

Marz and his team described the underlying motivation for building systems with the lambda architecture as:

  • The need for a robust system that is fault-tolerant, both against hardware failures and human mistakes.
  • To serve a wide range of workloads and use cases, in which low-latency reads and updates are required. Related to this point, the system should support ad-hoc queries.
  • The system should be linearly scalable, and it should scale out rather than up, meaning that throwing more machines at the problem will do the job.
  • The system should be extensible so that features can be added easily, and it should be easily debuggable and require minimal maintenance.

From a bird's eye view the lambda architecture has three major components that interact with new data coming in and responds to queries, which in this article are driven from the command line:

lambda architecture
Figure 1: Overview of the lambda architecture.

Essentially, the Lambda Architecture comprises the following components, processes, and responsibilities:

  • New Data: All data entering the system is dispatched to both the batch layer and the speed layer for processing.
  • Batch layer: This layer has two functions: (i) managing the master dataset, an immutable, append-only set of raw data, and (ii) to pre-compute arbitrary query functions, called batch views. Hadoop's HDFS is typically used to store the master dataset and perform the computation of the batch views using MapReduce.
  • Serving layer: This layer indexes the batch views so that they can be queried in ad hoc with low latency. To implement the serving layer, usually technologies such as Apache HBase or ElephantDB are utilized. The Apache Drill project provides the capability to execute full ANSI SQL 2003 queries against batch views.
  • Speed layer:This layer compensates for the high latency of updates to the serving layer, due to the batch layer. Using fast and incremental algorithms, the speed layer deals with recent data only. Storm is often used to implement this layer.
  • Queries: Last but not least, any incoming query can be answered by merging results from batch views and real-time views.

Scope and Architecture of the Project

In this article, I employ the lambda architecture to implement what I call UberSocialNet (USN). This open-source project enables users to store and query acquaintanceship data. That is, I want to be able to capture whether I happen to know someone from multiple social networks, such as Twitter or LinkedIn, or from real-life circumstances. The aim is to scale out to several billions of users while providing low-latency access to the stored information. To keep the system simple and comprehensible, I limit myself to bulk import of the data (no capabilities to live-stream data from social networks) and provide only a very simple a command-line user interface. The guts, however, use the lambda architecture.

It's easiest to think about USN in terms of two orthogonal phases:

  • Build-time, which includes the data pre-processing, generating the master dataset as well as creating the batch views.
  • Runtime, in which the data is actually used, primarily via issuing queries against the data space.

The USN app architecture is shown below in Figure 2:

Figure 2: High-level architecture diagram of the USN app.

The following subsytems and processes, in line with the lambda architecture, are at work in USN:

  • Data pre-processing. Strictly speaking this can be considered part of the batch layer. It can also be seen as an independent process necessary to bring the data into a shape that is suitable for the master dataset generation.
  • The batch layer. Here, a bash shell script is used to drive a number of HiveQL queries (see the GitHub repo, in the batch-layer folder) that are responsible to load the pre-processed input CSV data into HDFS.
  • The serving layer. In this layer, we use a Python script that loads the data from HDFS via Hive and inserts it into a HBase table, and hence creating a batch view of the data. This layer also provides query capabilities, necessary in the runtime phase to serve the front-end.
  • Command-line front end. The USN app front-end is a bash shell script interacting with the end-user and providing operations such as listings, lookups, and search.

This is all there is from an architectural point of view. You may have noticed that there is no speed layer in USN, as of now. This is due to the scope I initially introduced above. At the end of this article, I'll revisit this topic.

The USN App Technology Stack and Data

Recently, Dr. Dobb's discussed Pydoop: Writing Hadoop Programs in Python, which will serve as a gentle introduction into setting up and using Hadoop with Python. I'm going to use a mixture of Python and bash shell scripts to implement the USN. However, I won't rely on the low-level MapReduce API provided by Pydoop, but rather on higher-level libraries that interface with Hive and HBase, which are part of Hadoop. Note that the entire source code, including the test data and all queries as well as the front-end, is available in a GitHub repository, and it is necessary to follow along with this implementation. 

Before I go into the technical details such as the concrete technology stack used, let's have a quick look at the data transformation happening between the batch and the serving layer.

Figure 3: Data transformation from batch to serving layer in the USN app.

As hinted in Figure 3, the master dataset (left) is a collection of atomic actions: either a user has added someone to their networks or the reverse has taken place, a person has been removed from a network. This form of the data is as raw as it gets in the context of our USN app and can serve as the basis for a variety of views that are able to answer different sorts of queries. For simplicity's sake, I only consider one possible view that is used in the USN app front-end: the "network-friends" view, per user, shown in the right part of Figure 3.

Raw Input Data

The raw input data is a Comma Separated Value (CSV) file with the following format:

2012-03-12T22:54:13-07:00,Michael,ADD,I,Ora Hatfield, bla 
2012-11-23T01:53:42-08:00,Ted,REMOVE,I,Marvin Garrison, meh

The raw CSV file contains the following six columns:

  • timestamp is an ISO 8601 formatted date-time stamp that states when the action was performed (range: January 2012 to May 2013).
  • originator is the name of the person who added or removed a person to or from one of his or her networks.
  • action must be either ADD or REMOVE and designates the action that has been carried out. That is, it indicates whether a person has been added or removed from the respective network.
  • network is a single character indicating the respective network where the action has been performed. The possible values are: I, in-real-life; T, Twitter; L, LinkedIn; F, Facebook; G, Google+
  • target is the name of the person added to or removed from the network.
  • context is a free-text comment, providing a hint why the person has been added/removed or where one has met the person in the first place.

There are no optional fields in the dataset. In other words: each row is completely filled. In order to generate some test data to be used in the USN app, I've created a raw input CSV file from in five runs, yielding some 500 rows of raw data.

Technology Stack

USN uses several software frameworks, libraries, and components, as I mentioned earlier. I've tested it with:

I assume that you're familiar with the bash shell and have Python 2.7 or above installed. I've tested the USN app under Mac OS X 10.8 but there are no hard dependencies on any Mac OS X specific features, so it should run unchanged under any Linux environment.

Building the USN Data Space

The first step is to build the data space for the USN app, that is, the master dataset and the batch view, and then we will have a closer look behind the scenes of each of the commands.

First, some pre-processing of the raw data, generated earlier:

$ pwd
$ ./ < usn-raw-data.csv > usn-base-data.csv

Next we want to build the batch layer. For this, I first need to make sure that the Hive Thrift service is running:

$ pwd
$ hive --service hiveserver
Starting Hive Thrift Server

Now, I can run the script that execute the Hive queries and builds our USN app master dataset, like so:

$ pwd
$ ./ INIT
USN batch layer created.
$ ./ CHECK
The USN batch layer seems OK.

This generates the batch layer, which is in HDFS. Next, I create the serving layer in HBase by building a view of the relationships to people. For this, both the Hive and HBase Thrift services need to be running. Below, you see how you start the HBase Thrift service:

$ echo $HBASE_HOME
$ cd /Users/mhausenblas2/bin/hbase-0.94.4
$ ./bin/
starting master, logging to /Users/...
$ ./bin/hbase thrift start -p 9191
13/05/31 09:39:09 INFO util.VersionInfo: HBase 0.94.4

As now both Hive and HBase Thrift services are up and running, I can run the following command (in the respective directory, wherever you've unzipped or cloned the GitHub repository):

$ echo $HBASE_HOME
$ cd /Users/mhausenblas2/bin/hbase-0.94.4
$ ./bin/ 
starting master, logging to /Users/...
$ ./bin/hbase thrift start -p 9191
13/05/31 09:39:09 INFO util.VersionInfo: HBase 0.94.4

Now, let's have a closer look at what is happening behind the scenes of each of the layers in the next sections.

Related Reading

More Insights

Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.