Crunch's implementation of top
is efficient because it prunes results in the map to ensure that the minimum set of intermediate data is sent over the shuffle. More importantly, this example hints at the fact that Crunch operations are composable. That is, top
works with any PTable
at any stage in the pipeline. Contrast this to MapReduce, where reusable, composable libraries are not common (in Hadoop itself, ChainMapper
and ChainReducer
allow mappers and reducers to be chained together, but they are awkward to use and only work in the context of a single job).
The final part of the Crunch program writes the final result to the output directory specified by args[1]
:
writeTextFile(topk, args[1]); run();
The run()
method executes the entire pipeline that we constructed. I can run the Crunch pipeline in a similar way to the way we ran the MapReduce job:
% $HADOOP_HOME/bin/hadoop jar target/hadoop-drdobbs-1.0.0-job.jar com.tom_e_white.drdobbs.crunch.TopK data outputc 10 % cat outputc/part-r-00000 [doctor,5327002] [dobbs,114]
Running the pipeline over the full dataset on a cluster reveals the top words (excluding punctuation) in the book dataset are the
, of
, and
, to
, in
, and a
.
% hadoop jar target/hadoop-drdobbs-1.0.0-job.jar com.tom_e_white.drdobbs.crunch.TopK data outputc 10 % hadoop fs -cat outputc/part-r-00000 [",21396850115] [the,18399669358] [.,16834514285] [of,12042045526] [and,8588851162] [to,7305545226] [in,5956669421] [a,5422911334] ["""",3312106937] [-,3285590930]
Anatomy of a Hadoop Cluster
Let's turn now to running these tools on real Hadoop clusters. What does a typical Hadoop cluster look like? For a small Hadoop cluster consisting of a few machines, the answer is typically similar to Figure 1.
Figure 1: A typical small Hadoop cluster.
A single master machine (top) runs the HDFS namenode
and secondary namenode
, as well as the MapReduce jobtracker
. The remaining machines run HDFS datanode
s and MapReduce tasktrackers
. Each of these processes is a Java daemon, with the following responsibilities:
- The HDFS
namenode
manages the filesystem namespace for the cluster. Information about the filesystem tree and metadata for all the files is stored in memory and persistently on disk in two files: the namespace image and the edits log. - The HDFS secondary
namenode
runs a periodic checkpointing function to generate a new namespace image so that the edits logs don't become too large. Note that the secondarynamenode
does not adequately guard against failure of the primary. For that, HDFS has to be set up to run in a high-availabilty configuration (to be discussed shortly). - HDFS
datanodes
are responsible for storing and retrieving blocks (fixed-size chunks of files, typically 128 MB).Datanodes
send periodic heartbeats to thenamenode
to report the list of blocks that they are managing, and carry out thenamenode
's instructions (sent in the heartbeat reply), such as replicate blocks to otherdatanodes
or delete old blocks. - The MapReduce
jobtracker
manages all the jobs that have been submitted to the cluster. It is responsible for scheduling the jobs according to the cluster policy. Thejobtracker
archives completed jobs and makes details about them available to clients that may ask for logs or other information about job runs. - MapReduce
tasktrackers
run the map and reduce tasks on behalf of clients. Eachtasktracker
periodically sends heartbeats to thejobtracker
indicating whether it is ready to run a new task (eachtasktracker
has a preconfigured number of slots that it can run tasks in). If ready, then thejobtracker
will allocate it a task. Tasks are run in a new JVM for isolation.Tasktrackers
also run a HTTP server to serve map outputs to reduce tasks in the shuffle phase of MapReduce.
On a larger cluster, the master roles are run on separate machines because memory capacity of the master is usually the limiting factor (memory usage by the master daemons is a function of cluster size and usage). Also, by separating roles, hardware failures have less of an impact. That being said, as described so far, the namenode
is a Single Point of Failure (SPOF) in Hadoop. This SPOF can be avoided by running two namenodes
in active-standby configuration, so that if one fails, the other can take over as the active node and clients can continue to read and write data without interruption. Such a high-availability (HA) configuration also requires a ZooKeeper
ensemble (typically three nodes), which elects the active namenode
, and a shared storage service such as a set of journal nodes to store the edits log.
With HA, the complexity of the cluster is increased, but many real-world clusters actually run a variety of extra services in addition. For example, Figure 2 shows a cluster running Hive, HBase, Flume, and Oozie services, in addition to HDFS and MapReduce. (Note that Pig and Sqoop do not appear because they do not have a cluster component they are both client-side programs.)
Figure 2: A larger Hadoop cluster.
While running a small cluster with just a few services is feasible using manual scripts, when it comes to a larger cluster like the one in Figure 2, dedicated cluster management software such as Apache Ambari or Cloudera Manager is a must.
Data Application Pipelines
We've seen in detail some of the different ways to run queries on data in Hadoop. In any real-world Hadoop application, there are wider questions, such as how does data enter the system and how are production jobs run? There is no single answer to these questions, but some patterns have emerged as Hadoop has become more mainstream.
For data sources that produce real-time events, like sensor data or transaction logs, a typical data pipeline is shown in Figure 3:
Figure 2: A Hadoop pipeline for real-time events.
Flume agents collect and aggregate the events and send them to HDFS, then on a periodic basis Oozie triggers Hive jobs to produce reports, or to transform the data for further downstream analysis. This pattern is explored in depth in the article Analyzing Twitter Data with Apache Hadoop by Jon Natkins.
For online data in a relational database, Hadoop performs some traditional ETL tasks (using Sqoop, Oozie, and a tool like Pig, Hive, Cascading, or Crunch for transformation). In addition, it provides ad hoc query access to the original data. Note that Hadoop complements existing BI tools: Traditional data marts still have a role for rich business analytics over recent or high-value data that is the output of the ETL process. Such a pipeline is illustrated in Figure 4.
Figure 4: A Hadoop pipeline for a data mart.
For a more detailed discussion of Hadoop's role in BI systems, I recommend the article Big Data's New Use Cases: Transformation, Active Archive, and Exploration by Amr Awadallah.
Where To Go From Here
As illustrated in this series, there are lots of different ways to get started with Hadoop. The easiest is to download a Hadoop VM and try out some of the tools on a single node. The next step is to try a multi-node cluster to get a feel for processing larger datasets. These days, there are many freely downloadable open datasets that are helpful. The list curated by Hilary Mason is a great starting point. After that, you might build a more complex Hadoop application pipeline using tools such as Flume, Sqoop, and Oozie to round out the overall data flow. You'll soon find that processing large datasets is made considerably simpler by using Hadoop. Enjoy!
Tom White has been an Apache Hadoop committer since February 2007, and is a member of the Apache Software Foundation. He is an engineer at Cloudera, a company set up to offer Hadoop tools, support, and training. He is the author of the best-selling O'Reilly book Hadoop: The Definitive Guide.