Before we run our job, we need some driver code to wire up the mapper and reducer, which we do using the AggregateJob
class, shown Listing Three (AggregateJob.java
).
Listing Three: AggregateJob.java.
package com.tom_e_white.drdobbs.mapreduce; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AggregateJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJarByClass(getClass()); job.setJobName(getClass().getSimpleName()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(ProjectionMapper.class); job.setCombinerClass(LongSumReducer.class); job.setReducerClass(LongSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int rc = ToolRunner.run(new AggregateJob(), args); System.exit(rc); } }
AggregateJob
extends Hadoop's Configured
class and implements the Tool
interface's run()
method. Doing this allows us to pass in configuration options at runtime, which can be very handy, as we'll see shortly.
The code to launch a job is in the run()
method, and it is managed by the Job
class from the org.apache.hadoop.mapreduce
package. The Job
instance specifies various things about the job: a name for display purposes, the mapper and reducer classes that we discussed earlier, and the job output types, which have to match the mapper and reducer output types. (In case your map output types are different from your reduce output types, you should also call setMapOutputKeyClass()
and setMapOutputValueClass()
.) There are a few other things that we set that are less obvious.
The call to setJarByClass()
is needed because MapReduce is a distributed system, and Hadoop needs to know which JAR file to ship to the nodes in the cluster running the map and reduce tasks. By calling this method, we tell Hadoop to look for the local JAR that contains the AggregateJob
class. As long as the JAR contains both AggregateJob
and ProjectionMapper
, then the job will run, as there are no other dependencies. (If you need to ship third-party libraries, you can use the -libjars
command-line flag, which takes a comma-separated list of local JAR file names.)
We also need to tell the job what input data to process and where to place the output. We do this via the static APIs on FileInputFormat
and FileOutputFormat
, using positional command-line arguments to specify the file paths.
The last line of the run()
method launches the job and waits for it to complete. While it is running, it prints the progress on the console.
Running the Job
Let's run the program:
% $HADOOP_HOME/bin/hadoop jar target/hadoop-drdobbs-1.0.0.jar \ com.tom_e_white.drdobbs.mapreduce.AggregateJob data output
The hadoop jar
command is a convenient way to run Hadoop programs because it adds all the Hadoop JARs onto the client classpath. It takes the JAR file and the main class as arguments and passes the remaining arguments to the program. In this case, they are interpreted as the input and output directories. When the program has completed, we can look at the output. Note that both the input and output is on the local filesystem by default in Hadoop.
% cat output/part-r-00000 dobbs 42 doctor 1214191
The output filenames are named by the reducer task number that produced them. Because we only ran a single reducer (the default), we only have a single output file: part-r-00000
. Happily, it contains the output that we expected: the tab-separated word counts.
A Single-Node Cluster
We don't need to make any changes to the code to run the same program against a Hadoop cluster. We do need to load the data into HDFS, though, and tell the program which cluster to run against, so let's see how to do that.
The first thing we need is to find a suitable cluster. If you don't have access to a Hadoop cluster, you can install a single-node cluster (by following the Apache Hadoop instructions or using a Hadoop VM, which several Hadoop vendors provide for free).
Instead of running against data on the local filesystem, we will run against HDFS. Let's copy the sample data from the local filesystem to HDFS using the hadoop fs
command:
% hadoop fs -copyFromLocal data data
The -copyFromLocal
subcommand takes two or more arguments: All but the first refer to files or directories on the local filesystem, and the last refers to a file or directory on HDFS. After issuing this command, the data resides in the /user/<username>/data
directory in HDFS. Now we can run the program again:
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \ com.tom_e_white.drdobbs.mapreduce.AggregateJob data output
This command assumes that the client Hadoop configuration files are set up appropriately to point to your cluster. Configuration files are found in the Hadoop installation's conf
directory or in /etc/hadoop/conf
. The two relevant settings here are:
fs.default.name
to specify the HDFS namenode URL (incore-site.xml
)mapred.job.tracker
to specify the MapReduce jobtracker host and port (inmapred-site.xml
)
It's possible to specify these settings on the command line, too, which is handy if you ever need to switch between clusters:
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \ com.tom_e_white.drdobbs.mapreduce.AggregateJob \ -D fs.default.name=hdfs://localhost:8020 \ -D mapred.job.tracker=localhost:8021 \ data output
Note that a space is required between the -D
argument and the key-value pair (in contrast to specifying Java system properties).
The output is in HDFS, so we use the hadoop fs
command again to view it:
% hadoop fs -cat output/part-r-00000 dobbs 42 doctor 1214191
A Multi-Node Cluster
Running on a multi-node cluster is no different than running on a single-node cluster, although it is wise to set the number of reducers to something higher than the default (one). On an otherwise unused cluster, the number of reducers can be set to the number of reduce slots in the whole cluster (that is, the number of task trackers × number of reduce task slots per task tracker). In practice, and particularly on a heavily used cluster, you should set the number of reducers to a smaller fraction of the cluster.
I ran the job over the full dataset using four reducers with the following command (the cluster settings were in core-site.xml
and mapred-site.xml,
so they were not needed on the command line):
% hadoop jar target/hadoop-drdobbs-1.0.0.jar \ com.tom_e_white.drdobbs.mapreduce.AggregateJob \ -D mapred.reduce.tasks=4 \ data output % hadoop fs -cat output/part-r-00000 | head -5 ! 202142786 """" 3312106937 $0.00005 52 $0.0003 111 $0.0007 117
The output is sorted by key (within each reduce partition); so at the beginning of the file, we get 1-grams that start with punctuation. In the next installment of the series, which runs next week, we'll write some more complex queries using higher-level languages and take a look at real-world Hadoop clusters and applications.
Tom White has been an Apache Hadoop committer since February 2007, and is a member of the Apache Software Foundation. He is an engineer at Cloudera, a company set up to offer Hadoop tools, support, and training. He is the author of the best-selling O'Reilly book Hadoop: The Definitive Guide..
Related Articles
Introduction to Hadoop: Real-World Hadoop Clusters and Applications