Big Data is in the news these days, and Apache Hadoop is one of the most popular platforms for working with Big Data. Hadoop itself is undergoing tremendous growth as new features and components are added, and for this reason alone, it can be difficult to know how to start working with it. In this three-part series, I explain what Hadoop is and how to use it, presenting a simple, hands-on examples that you can try yourself. First, though, let's look at the problem that Hadoop was designed to solve.
In 2002, Doug Cutting and Mike Cafarella started building a Web crawler and searcher, called Nutch, that they wanted to scale up to crawl and search the entire Web (several billion pages at the time). The difficult part, of course, was designing something that could cope with this amount of data; and it wasn't until 2003, when Google published papers on its data processing infrastructure that solved the same problems, that "the route became clear," as Cutting put it.
The route was to build a distributed file system and a distributed processing engine that could scale to thousands of nodes. That project became Hadoop, named after the toy elephant of one of Cutting’s children. Hadoop had two principal parts: the Hadoop Distributed File System (HDFS) and MapReduce. The latter is the name Google engineers gave the distributed processing model and its implementation.
HDFS makes it easy to store large files. It optimizes for transfer speed over latency, which is what you want when you are writing files with billions of records (Web pages, in the original use case). You don't need to alter the files once they are written, because the next crawl will rewrite the whole file anew.
MapReduce enables you to operate on the contents of a file in parallel by having an independent process read each chunk (or "block") of the file. With HDFS and MapReduce, the speed of analysis scales to the size of the cluster: A well-written job can run twice as fast on a cluster that is twice as big.
While the initial application of Web search is not one that many organizations need to solve, it turns out that HDFS and MapReduce are general enough to support a very broad range of applications. Like any computer system, they both have limitations the main one being that MapReduce is batch-oriented; that is, operations take minutes or hours, not seconds, to complete. But even with these limitations, the number of problems the technologies can solve is still impressive (see MapReduce Design Patterns for a comprehensive catalog). And over time, we have seen the limitations being relaxed both incrementally (as with the performance improvements that are going into HDFS reads) and by major platform changes (as with the appearance of new processing frameworks running on data in HDFS that eschew MapReduce).
So how do you know if you need Hadoop? Simply put, Hadoop is useful when one machine is not big enough to process the data you need to process in a reasonable amount of time. Of course, what counts as reasonable depends on the task in hand, but if you find it's taking 10 minutes, say, to grep a bunch of log files, or if generating your SQL reports is taking hours, then that is a sign that Hadoop is worth investigating.
To understand the power of Hadoop, it helps to understand how MapReduce works in a bit more detail. I’ll discuss how to use the MapReduce API later, although you may be surprised to learn that these days, MapReduce is considered a low-level interface, and there are better tools for processing, which we will look at in the next article in this series.
A MapReduce job proceeds in two phases, called the map phase and the reduce phase. Each phase is simply a function that transforms the input to output. The inputs and outputs for each phase are a set of key-value pairs. MapReduce is typed, so the input key-value pairs can be of different types than the outputs. However, the map output and the reduce input must have the same types for keys and values.
Let's look at an example using the Google Books Ngram Dataset. This dataset is a collection of files containing
n-grams that have been extracted from Google's corpus of scanned books. An
n-gram is a sequence of
n words that occur together, in order, somewhere in the text. For example, the 2-grams of the sentence "I like Hadoop" are (I, like) and (like, Hadoop).
The files in the dataset are arranged so that the 1-grams are in one set of files, the 2-grams in another set of files, and so on up to 5-grams. Each file has one
n-gram per line containing the
n-gram, the year, the number of occurrences in that year, the number of books it appeared in in that year, and the number of pages it appeared on in that year. Here's a sample line from a 1-gram file, showing the number of occurrences (20) for the word 'dobbs' in books from 2007:
dobbs 2007 20 18 15
MapReduce is ideally suited for text processing, so let's use it to find the total count for each word in the dataset. For this, we need to use the 1-grams, and aggregate over the
year field. Our input comprises lines of text, which in Hadoop are usually represented as key-value pairs where the key is the offset of the line within the file and the value is the line of text. Here's a small sample:
(0, "dobbs 2007 20 18 15") (20, "dobbs 2008 22 20 12") (40, "doctor 2007 545525 366136 57313") (72, "doctor 2008 668666 446034 72694")
We care only about the word and the number of occurrences, so we ignore the other fields, and the key. The map emits the word as the key and the number of occurrences as the value:
("dobbs", 20) ("dobbs", 22) ("doctor", 545525) ("doctor", 668666)
This data is then fed into the
reduce function. However, the MapReduce framework performs an important transformation before it does this: The values for a given key are brought together so that the
reduce function processes them as a group. In this example, the input to the
reduce function would look like this:
("dobbs", [20, 22]) ("doctor", [545525, 668666])
reduce function iterates over the values for each key and does whatever processing it likes. Here we want an aggregate count, so we simple sum the values:
("dobbs", 42) ("doctor", 1214191)
The final result is the number of times each word occurred in the whole dataset.
MapReduce on a Cluster
The key point of MapReduce is that it can run at massive scale. Consider a Hadoop cluster where the input dataset is stored in HDFS (see Figure 1).
Figure 1: A Hadoop cluster.
Files are broken into 128MB blocks and each block is replicated threefold on distinct nodes in the cluster. In this diagram, we have only shown one file, composed of three blocks, making a total of nine blocks.
MapReduce takes full advantage of these properties. First, by choosing to run a map for each block, the amount of work that each map does is normally relatively small (in the order of seconds) and can be performed in parallel across the cluster. So even a job running on a large amount of input data can run the map phase in one wave of maps if the cluster is large enough. Second, maps are generally run on the same node as one of the block's replicas, which achieves data locality, thus conserving precious network bandwidth.
Figure 2 shows three maps running simultaneously on three different parts (called file splits) of the input dataset (the three highlighted HDFS blocks). The output from the maps is stored on local disk, and not written back to HDFS.
Figure 2: Three maps running simultaneously.
Notice that the map outputs are partitioned, with one partition for each reducer. The partitioning function is normally simple hash partitioning, but it is possible to override it. Here, we have shown two partitions, corresponding to two reducers. (On a typical cluster, it would be much larger.) Since the partitions for each reducer are spread across the cluster, it is not possible for the reducers to run on the same node as their input. Instead, the partitions are copied across the network to the reducers in a process known as "the shuffle," shown as dark arrows in Figure 2.