A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class. The main function of a Combiner is to summarize the map output records with the same key. The output (key-value collection) of the combiner will be sent over the network to the actual Reducer task as input.
The Combiner class is used in between the Map class and the Reduce class to reduce the volume of data transfer between Map and Reduce. Usually, the output of the map task is large and the data transferred to the reduce task is high. The following MapReduce task diagram shows the COMBINER PHASE.
A combiner does not have a predefined interface and it must implement the Reducer interface’s reduce() method. A combiner operates on each map output key. It must have the same output key-value types as the Reducer class. A combiner can produce summary information from a large dataset because it replaces the original Map output. Although, Combiner is optional yet it helps segregating data into multiple groups for Reduce phase, which makes it easier to process.
Combiner Advantage
When a MapReduce Job is run on a large dataset, Hadoop Mapper generates large chunks of intermediate data that is passed on to Hadoop Reducer for further processing, which leads to massive network congestion. So how do go about reducing this network congestion? Is there any function in Hadoop to address this issue? The MapReduce framework offers a function known as ‘Combiner’ that can play a crucial role in reducing network congestion. As a matter of fact ‘Combiner’ is also termed as ‘Mini-reducer’. It is important to note that the primary job of a Hadoop Combiner is to process the output data from Hadoop Mapper, before passing it to a Hadoop Reducer. Technically speaking, Combiner and Reducer use the same code.
In order to understand the concept of Hadoop Combiner effectively, let’s consider the following use case, which identifies the number of complaints reported in each state. Below is the code snippet
package com.evoke.bigdata.mr.complaint;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ComplaintCombiner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
String input, output;
if(args.length == 2) {
input = args[0];
output = args[1];
} else {
input = “your-input-dir”;
output = “your-output-dir”;
}
JobConf conf = new JobConf(getConf(), ComplaintCombiner.class);
conf.setJobName(this.getClass().getName());
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
conf.setMapperClass(StateMapper.class);
conf.setReducerClass(SumCombiner.class);
//conf.setCombinerClass(SumCombiner.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ComplaintCombiner(), args);
System.exit(exitCode);
}
}
Below is the mapper code that can be used to retrieve complaints for each state:
package com.evoke.bigdata.mr.complaint;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class StateMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String s = value.toString();
String[] fields = value.toString().split(“,”);
if (fields.length > 1) {
output.collect(new Text(fields[5]), new IntWritable(1));
}
}
}
And lastly, here’s the Reducer/Combiner code that provides the total number of complaints received in each state:
package com.evoke.bigdata.mr.complaint;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SumCombiner extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int stateCount = 0;
while (values.hasNext()) {
IntWritable value = values.next();
stateCount += value.get();
}
output.collect(key, new IntWritable(stateCount));
}
}
In like manner, if the same job is executed using Hadoop Combiner by replacing setReducerClass method with a setCombinerClass method in the job code, it will drastically reduce the number of bytes transmitted over the network. Furthermore, we can clearly observe in the Job Tracker UI that very limited data is transferred to the Reduce phase
Combiner Example
The following example provides a theoretical idea about combiners. Let us assume we have the following input text file named input.txt for MapReduce.
What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance
The important phases of the MapReduce program with Combiner are
Record Reader – This is the first phase of MapReduce where the Record Reader reads every line from the input text file as text and yields output as key-value pairs.
- Input − Line by line text from the input file.
- Output − Forms the key-value pairs. The following is the set of expected key-value pairs.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Map Phase – The Map phase takes input from the Record Reader, processes it, and produces the output as another set of key-value pairs.
- Input − The following key-value pair is the input taken from the Record Reader.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
The Map phase reads each key-value pair, divides each word from the value using StringTokenizer, treats each word as key and the count of that word as value. The following code snippet shows the Mapper class and the map function.
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
- Output − The expected output is as
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Combiner Phase – The Combiner phase takes each key-value pair from the Map phase, processes it, and produces the output as key-value collection pairs.
- Input − The following key-value pair is the input taken from the Map phase.
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
The Combiner phase reads each key-value pair, combines the common words as key and values as collection. Usually, the code and operation for a Combiner is similar to that of a Reducer. Following is the code snippet for Mapper, Combiner and Reducer class declaration.
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
- Output − The expected output is
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Reducer Phase – The Reducer phase takes each key-value collection pair from the Combiner phase, processes it, and passes the output as key-value pairs. Note that the Combiner functionality is same as the Reducer.
- Input − The following key-value pair is the input taken from the Combiner phase.
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
The Reducer phase reads each key-value pair. Following is the code snippet for the Combiner.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
- Output − The expected output from the Reducer phase is as
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Record Writer – This is the last phase of MapReduce where the Record Writer writes every key-value pair from the Reducer phase and sends the output as text.
- Input − Each key-value pair from the Reducer phase along with the Output format.
- Output − It gives you the key-value pairs in text format. Following is the expected output.
What | 3 |
do | 2 |
you | 2 |
mean | 1 |
by | 1 |
Object | 1 |
know | 1 |
about | 1 |
Java | 3 |
is | 1 |
Virtual | 1 |
Machine | 1 |
How | 1 |
enabled | 1 |
High | 1 |
Performance | 1 |