Based on his experience working on distributed data processing systems at Twitter, Nathan Marz recently designed a generic architecture addressing common requirements, which he called the Lambda Architecture. Marz is well-known in Big Data: He's the driving force behind Storm and at Twitter he led the streaming compute team, which provides and develops shared infrastructure to support critical real-time applications.
Marz and his team described the underlying motivation for building systems with the lambda architecture as:
- The need for a robust system that is fault-tolerant, both against hardware failures and human mistakes.
- To serve a wide range of workloads and use cases, in which low-latency reads and updates are required. Related to this point, the system should support ad-hoc queries.
- The system should be linearly scalable, and it should scale out rather than up, meaning that throwing more machines at the problem will do the job.
- The system should be extensible so that features can be added easily, and it should be easily debuggable and require minimal maintenance.
From a bird's eye view the lambda architecture has three major components that interact with new data coming in and responds to queries, which in this article are driven from the command line:
Figure 1: Overview of the lambda architecture.
Essentially, the Lambda Architecture comprises the following components, processes, and responsibilities:
- New Data: All data entering the system is dispatched to both the batch layer and the speed layer for processing.
- Batch layer: This layer has two functions: (i) managing the master dataset, an immutable, append-only set of raw data, and (ii) to pre-compute arbitrary query functions, called batch views. Hadoop's HDFS is typically used to store the master dataset and perform the computation of the batch views using MapReduce.
- Serving layer: This layer indexes the batch views so that they can be queried in ad hoc with low latency. To implement the serving layer, usually technologies such as Apache HBase or ElephantDB are utilized. The Apache Drill project provides the capability to execute full ANSI SQL 2003 queries against batch views.
- Speed layer:This layer compensates for the high latency of updates to the serving layer, due to the batch layer. Using fast and incremental algorithms, the speed layer deals with recent data only. Storm is often used to implement this layer.
- Queries: Last but not least, any incoming query can be answered by merging results from batch views and real-time views.
Scope and Architecture of the Project
In this article, I employ the lambda architecture to implement what I call UberSocialNet (USN). This open-source project enables users to store and query acquaintanceship data. That is, I want to be able to capture whether I happen to know someone from multiple social networks, such as Twitter or LinkedIn, or from real-life circumstances. The aim is to scale out to several billions of users while providing low-latency access to the stored information. To keep the system simple and comprehensible, I limit myself to bulk import of the data (no capabilities to live-stream data from social networks) and provide only a very simple a command-line user interface. The guts, however, use the lambda architecture.
It's easiest to think about USN in terms of two orthogonal phases:
- Build-time, which includes the data pre-processing, generating the master dataset as well as creating the batch views.
- Runtime, in which the data is actually used, primarily via issuing queries against the data space.
The USN app architecture is shown below in Figure 2:
Figure 2: High-level architecture diagram of the USN app.
The following subsytems and processes, in line with the lambda architecture, are at work in USN:
- Data pre-processing. Strictly speaking this can be considered part of the batch layer. It can also be seen as an independent process necessary to bring the data into a shape that is suitable for the master dataset generation.
- The batch layer. Here, a bash shell script is used to drive a number of HiveQL queries (see the GitHub repo, in the batch-layer folder) that are responsible to load the pre-processed input CSV data into HDFS.
- The serving layer. In this layer, we use a Python script that loads the data from HDFS via Hive and inserts it into a HBase table, and hence creating a batch view of the data. This layer also provides query capabilities, necessary in the runtime phase to serve the front-end.
- Command-line front end. The USN app front-end is a bash shell script interacting with the end-user and providing operations such as listings, lookups, and search.
This is all there is from an architectural point of view. You may have noticed that there is no speed layer in USN, as of now. This is due to the scope I initially introduced above. At the end of this article, I'll revisit this topic.
The USN App Technology Stack and Data
Recently, Dr. Dobb's discussed Pydoop: Writing Hadoop Programs in Python, which will serve as a gentle introduction into setting up and using Hadoop with Python. I'm going to use a mixture of Python and bash shell scripts to implement the USN. However, I won't rely on the low-level MapReduce API provided by Pydoop, but rather on higher-level libraries that interface with Hive and HBase, which are part of Hadoop. Note that the entire source code, including the test data and all queries as well as the front-end, is available in a GitHub repository, and it is necessary to follow along with this implementation.
Before I go into the technical details such as the concrete technology stack used, let's have a quick look at the data transformation happening between the batch and the serving layer.
Figure 3: Data transformation from batch to serving layer in the USN app.
As hinted in Figure 3, the master dataset (left) is a collection of atomic actions: either a user has added someone to their networks or the reverse has taken place, a person has been removed from a network. This form of the data is as raw as it gets in the context of our USN app and can serve as the basis for a variety of views that are able to answer different sorts of queries. For simplicity's sake, I only consider one possible view that is used in the USN app front-end: the "network-friends" view, per user, shown in the right part of Figure 3.
Raw Input Data
The raw input data is a Comma Separated Value (CSV) file with the following format:
timestamp,originator,action,network,target,context 2012-03-12T22:54:13-07:00,Michael,ADD,I,Ora Hatfield, bla 2012-11-23T01:53:42-08:00,Ted,REMOVE,I,Marvin Garrison, meh ...
The raw CSV file contains the following six columns:
timestampis an ISO 8601 formatted date-time stamp that states when the action was performed (range: January 2012 to May 2013).
originatoris the name of the person who added or removed a person to or from one of his or her networks.
actionmust be either
REMOVEand designates the action that has been carried out. That is, it indicates whether a person has been added or removed from the respective network.
networkis a single character indicating the respective network where the action has been performed. The possible values are:
targetis the name of the person added to or removed from the network.
contextis a free-text comment, providing a hint why the person has been added/removed or where one has met the person in the first place.
There are no optional fields in the dataset. In other words: each row is completely filled. In order to generate some test data to be used in the USN app, I've created a raw input CSV file from generatedata.com in five runs, yielding some 500 rows of raw data.
USN uses several software frameworks, libraries, and components, as I mentioned earlier. I've tested it with:
- Apache Hadoop 1.0.4
- Apache Hive 0.10.0
- Hiver for Hive access from Python
- Apache HBase 0.94.4
- HappyBase for HBase access from Python
I assume that you're familiar with the bash shell and have Python 2.7 or above installed. I've tested the USN app under Mac OS X 10.8 but there are no hard dependencies on any Mac OS X specific features, so it should run unchanged under any Linux environment.
Building the USN Data Space
The first step is to build the data space for the USN app, that is, the master dataset and the batch view, and then we will have a closer look behind the scenes of each of the commands.
First, some pre-processing of the raw data, generated earlier:
$ pwd /Users/mhausenblas2/Documents/repos/usn-app/data $ ./usn-preprocess.sh < usn-raw-data.csv > usn-base-data.csv
Next we want to build the batch layer. For this, I first need to make sure that the Hive Thrift service is running:
$ pwd /Users/mhausenblas2/Documents/repos/usn-app/batch-layer $ hive --service hiveserver Starting Hive Thrift Server ...
Now, I can run the script that execute the Hive queries and builds our USN app master dataset, like so:
$ pwd /Users/mhausenblas2/Documents/repos/usn-app/batch-layer $ ./batch-layer.sh INIT USN batch layer created. $ ./batch-layer.sh CHECK The USN batch layer seems OK.
This generates the batch layer, which is in HDFS. Next, I create the serving layer in HBase by building a view of the relationships to people. For this, both the Hive and HBase Thrift services need to be running. Below, you see how you start the HBase Thrift service:
$ echo $HBASE_HOME
$ cd /Users/mhausenblas2/bin/hbase-0.94.4
starting master, logging to /Users/...
$ ./bin/hbase thrift start -p 9191
13/05/31 09:39:09 INFO util.VersionInfo: HBase 0.94.4
As now both Hive and HBase Thrift services are up and running, I can run the following command (in the respective directory, wherever you've unzipped or cloned the GitHub repository):
$ echo $HBASE_HOME /Users/mhausenblas2/bin/hbase-0.94.4 $ cd /Users/mhausenblas2/bin/hbase-0.94.4 $ ./bin/start-hbase.sh starting master, logging to /Users/... $ ./bin/hbase thrift start -p 9191 13/05/31 09:39:09 INFO util.VersionInfo: HBase 0.94.4
Now, let's have a closer look at what is happening behind the scenes of each of the layers in the next sections.