In the previous two installments of this tutorial, I explained how to write and run a MapReduce job on a developer machine. In this final article in the series, I discuss higher-level query languages and examine what real-world Hadoop clusters and applications look like.
Beyond MapReduce
In Hadoop: Writing and Running Your First Project, I wrote a simple MapReduce program to analyze Google book data. Even though the analysis was simple, it required somewhat more than 50 lines of Java to implement the algorithm. By using a high-level language like Pig or Hive, I can achieve the same result with much less code.
Pig
Here is the Pig query that the equivalent of all that aforementioned Java code in four lines:
records = LOAD 'data' AS (word:chararray, year:int, count:int, pages:int, books:int); grouped_records = GROUP records BY word; counts = FOREACH grouped_records GENERATE group, SUM(records.count); DUMP counts;
Because Pig Latin (the language used to write Pig queries) is probably unfamiliar, let's go through this program line by line. The first line describes the input data we want to process. The LOAD
operator takes a filename or directory ('data'
refers to a directory here) and an optional AS
clause to name the fields in the input dataset, along with their types. The result of the LOAD
operator is a relation, which is a set of tuples. Pig programs are all about manipulating relations, and the language offers a variety of operators to transform relations in different ways.
The second line uses the GROUP BY
operator to group the input data by the word
field. The result of this step is a relation with one tuple per word in the input. Note that this relation is not necessarily materialized: Pig runs the program only when a DUMP
or STORE
statement is encountered, and then it has considerable leeway in how it chooses to produce the result.
On the third line, the grouped data is transformed to generate a set of tuples where the first field is the word
(the group
term refers to the grouping field word
) and the second field is the per-group sum of the count
field of the original records relation.
The final line runs the program and writes the result to the console. We can run the script as follows:
% pig pig/aggregate.pig (dobbs,42) (doctor,1214191)
Hive
Hive, which uses SQL for its query language, may be more familiar. Before we can run a query in Hive, we need to define the structure of the data using a schema. In contrast to relational databases, the schema does not need to be defined before the data is loaded. Hive uses the schema at read time (schema on read), not at data load time (schema on write). Here, I tell Hive about the data that already exists in HDFS, and what its schema is:
CREATE EXTERNAL TABLE words (word STRING, year INT, occurrences INT, pages INT, books INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/user/cloudera/data';
The EXTERNAL
modifier tells Hive that it shouldn't manage the data, so if you drop the table, it should not delete the data. Once Hive knows about the table, we can query it:
SELECT word, SUM(occurrences) FROM words GROUP BY word;
We can run the query as follows:
% hive -f hive/aggregate.sql dobbs 42 doctor 1214191
More Complex Queries
Consider the following Hive query to find the top 10 words of all time in the books dataset:
SELECT word, SUM(occurrences) AS occurrences FROM words GROUP BY word ORDER BY occurrences DESC LIMIT 10;
The query produces the correct answer, but the current implementation of Hive is less efficient than it could be. This is not a criticism of Hive, since many Hive queries are very well optimized and only expert MapReduce programmers would be able to match their performance. Instead, it is a reminder that the performance of even seemingly simple constructs is ever-changing, so as a programmer, it pays to understand how they are implemented.
In this case, the ORDER BY
followed by LIMIT
is implemented as a MapReduce job, but rather than limiting the output to the top 10 occurrences on the map and the reduce side, Hive sends the entire dataset through the shuffle and only applies the limit in the reducer. (Note: This is true in Hive 0.10.0, but the optimization will be included in later versions. See Top-K Optimization for more details.)
While it's possible to implement Top-K in MapReduce to take advantage of the optimization (see the code that accompanies this series for an implementation), Java programmers might consider looking at a higher-level library such as Cascading or Crunch. Let's have a look at the implementation of a top 10 search in Crunch.
Crunch
The central data type in Crunch is a PCollection<T>
, which represents a distributed, unordered collection of elements of type T
. Think of it as a parallel version of java.util.Collection<T>
that is not materialized in memory; or as a Java equivalent of a Pig relation. We start our Crunch program by loading the input data:
PCollection<String> lines = readTextFile(args[0]);
which uses the readTextFile()
convenience function provided by CrunchTool
(which our class extends, the full source is shown in Listing One) to represent the input (specified by args[0]
) as a PCollection
of lines of type String
.
Listing One: TopK.java
package com.tom_e_white.drdobbs.crunch; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.fn.Aggregators; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.util.CrunchTool; import org.apache.hadoop.util.ToolRunner; import java.io.Serializable; public class TopK extends CrunchTool implements Serializable { @Override public int run(String[] args) throws Exception { int k = Integer.parseInt(args[2]); PCollection<String> lines = readTextFile(args[0]); PTable<String, Long> records = lines .parallelDo(new DoFn<String, Pair<String, Long>>() { @Override public void process(String line, Emitter<Pair<String, Long>> emitter) { String[] split = line.split("\t+"); String word = split[0]; long count = Long.parseLong(split[2]); emitter.emit(Pair.of(word, count)); } }, Avros.tableOf(Avros.strings(), Avros.longs())); PGroupedTable<String, Long> groupedRecords = records .groupByKey(); PTable<String, Long> counts = groupedRecords .combineValues(Aggregators.SUM_LONGS()); PTable<String, Long> topk = Aggregate.top(counts, k, true); writeTextFile(topk, args[1]); run(); return 0; } public static void main(String[] args) throws Exception { int rc = ToolRunner.run(new TopK(), args); System.exit(rc); } }
There are various operations you can perform on PCollection
. The one we are interested in here is parallelDo()
, which applies a function to every element in the collection and returns a new PCollection
, possibly of a different type. Let's map a String
to a Pair<String, Long>
(Pair
is a Crunch built-in type), parsing each line to extract the word and the number of occurrences, just like we did in the ProjectionMapper
in the previous installment in this series.
PTable<String, Long> records = lines .parallelDo(new DoFn<String, Pair<String, Long>>() { @Override public void process(String line, Emitter<Pair<String, Long>> emitter) { String[] split = line.split("\t+"); String word = split[0]; long count = Long.parseLong(split[2]); emitter.emit(Pair.of(word, count)); } }, Avros.tableOf(Avros.strings(), Avros.longs()));
The result is a PTable<K, V>
, a subtype of PCollection
that represents a distributed multi-map with keys of type K
and values of type V
. In this case, the return type is PTable<String, Long>
. The second argument to parallelDo()
specifies how the return type is serialized. In this example, I choose to use Avro as an intermediate storage representation: The declaration Avros.tableOf(Avros.strings(), Avros.longs())
says we want a PTable<String, Long>
, rather than a PCollection<Pair<String, Long>>
. Both types are conceptually similar, but the reason we want a PTable
is that it provides a grouping operation, which I use next:
PGroupedTable<String, Long> groupedRecords = records.groupByKey();
This pattern should be familiar; it is the same operation as Pig's GROUP BY
. In Crunch, the return type is a PGroupedTable
, with the combineValues()
method for aggregating the grouped values, here just by summing:
PTable<String, Long> counts = groupedRecords .combineValues(Aggregators.SUM_LONGS());
The result so far is that of word count, a PTable<String, Long>
of words and their global counts over all years. To get the top 10 results, we use a Crunch library:
PTable<String, Long> topk = Aggregate.top(counts, k, true);