Hadoop streaming is a utility that comes with the Hadoop 1 distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. For example:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
Streaming Workflow
In the above example, both the mapper and the reducer are executables that read the input from stdin (line by line) and emit the output to stdout. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.
When an executable is specified for mappers, each mapper task will launch the executable as a separate process when the mapper is initialized. As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.
When an executable is specified for reducers, each reducer task will launch the executable as a separate process then the reducer is initialized. As the reducer task runs, it converts its input key/values pairs into lines and feeds the lines to the stdin of the process. In the meantime, the reducer collects the line oriented outputs from the stdout of the process, converts each line into a key/value pair, which is collected as the output of the reducer. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value. However, this can be customized, as discussed later.
This is the basis for the communication protocol between the Map/Reduce framework and the streaming mapper/reducer.
You can supply a Java class as the mapper and/or the reducer. The above example is equivalent to:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /bin/wc
User can specify stream.non.zero.exit.is.failure as true or false to make a streaming task that exits with a non-zero status to be Failure or Success respectively. By default, streaming tasks exiting with non-zero status are considered to be failed tasks.
Streaming Command Options
Streaming supports streaming command options as well as generic command options. The general command line syntax is shown below. Be sure to place the generic options before the streaming options, otherwise the command will fail.
bin/hadoop command [genericOptions] [streamingOptions]
The Hadoop streaming command options are
Parameter | Optional/Required | Description |
-input directoryname or filename | Required | Input location for mapper |
-output directoryname | Required | Output location for reducer |
-mapper executable or JavaClassName | Required | Mapper executable |
-reducer executable or JavaClassName | Required | Reducer executable |
-file filename | Optional | Make the mapper, reducer, or combiner executable available locally on the compute nodes |
-inputformat JavaClassName | Optional | Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default |
-outputformat JavaClassName | Optional | Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default |
-partitioner JavaClassName | Optional | Class that determines which reduce a key is sent to |
-combiner streamingCommand or JavaClassName | Optional | Combiner executable for map output |
-cmdenv name=value | Optional | Pass environment variable to streaming commands |
-inputreader | Optional | For backwards-compatibility: specifies a record reader class (instead of an input format class) |
-verbose | Optional | Verbose output |
-lazyOutput | Optional | Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write) |
-numReduceTasks | Optional | Specify the number of reducers |
-mapdebug | Optional | Script to call when map task fails |
-reducedebug | Optional | Script to call when reduce task fails |