Channels ▼
RSS

Design

Introduction to Hadoop: Real-World Hadoop Clusters and Applications


In the previous two installments of this tutorial, I explained how to write and run a MapReduce job on a developer machine. In this final article in the series, I discuss higher-level query languages and examine what real-world Hadoop clusters and applications look like.

Beyond MapReduce

In Hadoop: Writing and Running Your First Project, I wrote a simple MapReduce program to analyze Google book data. Even though the analysis was simple, it required somewhat more than 50 lines of Java to implement the algorithm. By using a high-level language like Pig or Hive, I can achieve the same result with much less code.

Pig

Here is the Pig query that the equivalent of all that aforementioned Java code — in four lines:

records = LOAD 'data' AS (word:chararray, year:int, count:int,
  pages:int, books:int);
grouped_records = GROUP records BY word;
counts = FOREACH grouped_records GENERATE group, SUM(records.count);
DUMP counts;

Because Pig Latin (the language used to write Pig queries) is probably unfamiliar, let's go through this program line by line. The first line describes the input data we want to process. The LOAD operator takes a filename or directory ('data' refers to a directory here) and an optional AS clause to name the fields in the input dataset, along with their types. The result of the LOAD operator is a relation, which is a set of tuples. Pig programs are all about manipulating relations, and the language offers a variety of operators to transform relations in different ways.

The second line uses the GROUP BY operator to group the input data by the word field. The result of this step is a relation with one tuple per word in the input. Note that this relation is not necessarily materialized: Pig runs the program only when a DUMP or STORE statement is encountered, and then it has considerable leeway in how it chooses to produce the result.

On the third line, the grouped data is transformed to generate a set of tuples where the first field is the word (the group term refers to the grouping field word) and the second field is the per-group sum of the count field of the original records relation.

The final line runs the program and writes the result to the console. We can run the script as follows:

  % pig pig/aggregate.pig
  (dobbs,42)
  (doctor,1214191)

Hive

Hive, which uses SQL for its query language, may be more familiar. Before we can run a query in Hive, we need to define the structure of the data using a schema. In contrast to relational databases, the schema does not need to be defined before the data is loaded. Hive uses the schema at read time (schema on read), not at data load time (schema on write). Here, I tell Hive about the data that already exists in HDFS, and what its schema is:

CREATE EXTERNAL TABLE words (word STRING, year INT, occurrences INT,
  pages INT, books INT)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
LOCATION '/user/cloudera/data';

The EXTERNAL modifier tells Hive that it shouldn't manage the data, so if you drop the table, it should not delete the data. Once Hive knows about the table, we can query it:

SELECT word, SUM(occurrences)
FROM words
GROUP BY word;

We can run the query as follows:

% hive -f hive/aggregate.sql
dobbs     42
doctor     1214191

More Complex Queries

Consider the following Hive query to find the top 10 words of all time in the books dataset:

SELECT word, SUM(occurrences) AS occurrences
FROM words
GROUP BY word
ORDER BY occurrences DESC
LIMIT 10;

The query produces the correct answer, but the current implementation of Hive is less efficient than it could be. This is not a criticism of Hive, since many Hive queries are very well optimized and only expert MapReduce programmers would be able to match their performance. Instead, it is a reminder that the performance of even seemingly simple constructs is ever-changing, so as a programmer, it pays to understand how they are implemented.

In this case, the ORDER BY followed by LIMIT is implemented as a MapReduce job, but rather than limiting the output to the top 10 occurrences on the map and the reduce side, Hive sends the entire dataset through the shuffle and only applies the limit in the reducer. (Note: This is true in Hive 0.10.0, but the optimization will be included in later versions. See Top-K Optimization for more details.)

While it's possible to implement Top-K in MapReduce to take advantage of the optimization (see the code that accompanies this series for an implementation), Java programmers might consider looking at a higher-level library such as Cascading or Crunch. Let's have a look at the implementation of a top 10 search in Crunch.

Crunch

The central data type in Crunch is a PCollection<T>, which represents a distributed, unordered collection of elements of type T. Think of it as a parallel version of java.util.Collection<T> that is not materialized in memory; or as a Java equivalent of a Pig relation. We start our Crunch program by loading the input data:

PCollection<String> lines = readTextFile(args[0]);

which uses the readTextFile() convenience function provided by CrunchTool (which our class extends, the full source is shown in Listing One) to represent the input (specified by args[0]) as a PCollection of lines of type String.

Listing One: TopK.java

package com.tom_e_white.drdobbs.crunch;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.util.CrunchTool;
import org.apache.hadoop.util.ToolRunner;

import java.io.Serializable;

public class TopK extends CrunchTool implements Serializable {

  @Override
  public int run(String[] args) throws Exception {
    int k = Integer.parseInt(args[2]);
    PCollection<String> lines = readTextFile(args[0]);
    PTable<String, Long> records = lines
      .parallelDo(new DoFn<String, Pair<String, Long>>() {
        @Override
        public void process(String line, Emitter<Pair<String, Long>> emitter) 
        {
          String[] split = line.split("\t+");
          String word = split[0];
          long count = Long.parseLong(split[2]);
          emitter.emit(Pair.of(word, count));
        }
      }, Avros.tableOf(Avros.strings(), Avros.longs()));
    PGroupedTable<String, Long> groupedRecords = records
      .groupByKey();
    PTable<String, Long> counts = groupedRecords
      .combineValues(Aggregators.SUM_LONGS());
    PTable<String, Long> topk = Aggregate.top(counts, k, true);
    writeTextFile(topk, args[1]);
    run();
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int rc = ToolRunner.run(new TopK(), args);
    System.exit(rc);
  }
}

There are various operations you can perform on PCollection. The one we are interested in here is parallelDo(), which applies a function to every element in the collection and returns a new PCollection, possibly of a different type. Let's map a String to a Pair<String, Long> (Pair is a Crunch built-in type), parsing each line to extract the word and the number of occurrences, just like we did in the ProjectionMapper in the previous installment in this series.

PTable<String, Long> records = lines
  .parallelDo(new DoFn<String, Pair<String, Long>>() {
      @Override
      public void process(String line, 
             Emitter<Pair<String, Long>> emitter) {
        String[] split = line.split("\t+");
        String word = split[0];
        long count = Long.parseLong(split[2]);
        emitter.emit(Pair.of(word, count));
      }
    }, Avros.tableOf(Avros.strings(), Avros.longs()));

The result is a PTable<K, V>, a subtype of PCollection that represents a distributed multi-map with keys of type K and values of type V. In this case, the return type is PTable<String, Long>. The second argument to parallelDo() specifies how the return type is serialized. In this example, I choose to use Avro as an intermediate storage representation: The declaration Avros.tableOf(Avros.strings(), Avros.longs()) says we want a PTable<String, Long>, rather than a PCollection<Pair<String, Long>>. Both types are conceptually similar, but the reason we want a PTable is that it provides a grouping operation, which I use next:

PGroupedTable<String, Long> groupedRecords = 
     records.groupByKey();

This pattern should be familiar; it is the same operation as Pig's GROUP BY. In Crunch, the return type is a PGroupedTable, with the combineValues() method for aggregating the grouped values, here just by summing:

PTable<String, Long> counts = groupedRecords
.combineValues(Aggregators.SUM_LONGS());

The result so far is that of word count, a PTable<String, Long> of words and their global counts over all years. To get the top 10 results, we use a Crunch library:

PTable<String, Long> topk = Aggregate.top(counts, k, true);


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video