Hadoop & Mapreduce Tutorial | The Reduce Phase

The Reduce Phase

The three main sub phases in the Reduce phase are-

  • Shuffle
  • Merge/Sort
  • Invocation of the reduce() method.

The Shuffle phase – The Shuffle phase ensures that the partitions reach the appropriate Reducers. The Shuffle phase is a component of the Reduce phase. During the Shuffle phase, each Reducer uses the HTTP protocol to retrieve its own partition from the Mapper nodes. Each Reducer uses five threads by default to pull its own partitions from the Mapper nodes defined by the property mapreduce.reduce.shuffle.parallelcopies.

But how do the Reducer’s know which nodes to query to get their partitions? This happens through the Application Master. As each Mapper instance completes, it notifies the Application Master about the partitions it produced during its run. Each Reducer periodically queries the Application Master for Mapper hosts until it has received a final list of nodes hosting its partitions.

The reduce phase begins when the fraction of the total mapper instances completed exceeds the value set in the property mapreduce.job.reduce.slowstart.completedmaps. This does not mean that the reduce() invocations can begin. Only the partition download from the Mapper nodes to the Reducer nodes can initiate.

Merge/Sort – At this point the Reducer has received all the partitions it needs to process by downloading them from the Mapper nodes using the HTTP protocol. The key-value pairs received from the Mapper needs to be sorted by the Mapper output key (or reducer input key).

Each of the partition downloaded from the Mapper are already sorted by the Mapper output key. But all the partitions received from all the Mapper’s need to be sorted by the Mapper output key. This is the Merge/Sort phase. The end result of this phase will be that all the records meant to be processed by the Reducer will be sorted by the Mapper output key (or Reducer input key). Only when this phase completes we are ready to make the reduce() call.

Invocation of the reduce() method – MapReduce developers often wonder why we have the sort phase. The reduce method in the Reducer handles all the values for one key in a single reduce method invocation.

public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}

In the interface to the reduce call, the values are received in the Iterable instance. But are all values for a given key held in memory during the execute of a reduce() call. No! This would easily overwhelm the JVM memory when one key has millions of values. In fact the entire Iterable interface is just that – an interface. In reality the framework is simply iterating the file comprising of Reducer input key-value pairs sorted by reducer key.

When a new key is encountered a new reduce call is made. During the iteration of each value instance from the values instance the value instance is updated with a new set of values (the main reason why you should not rely on references to an earlier value in the iteration). If the iteration of the values instance is skipped or breaks, the framework ensures that the pointer is at the next key in the partition file being processed by the reducer. This is the file that is produced by the “Merge/Sort” process described earlier. It is the sort phase which allows millions of values to be processed efficiently (in one pass while reading a file whose records are sorted by the Reducer input key) for a given key inside the reduce invocation through the familiar Iterable interface without running out of memory or having to make multiple passes on a reducer input file.

Optimizing the Sort/Merge phase on the Reducer node

Some of the parameters which can be utilized to optimize the sort behavior on the Reducer node are

  • reduce.shuffle.input.buffer.percent – This is the proportion of heap memory dedicated to storing map outputs retrieved from the mapper during the shuffle phase. If a specific mapper output is small enough it is retrieved into this memory or else it is persisted to the local disk of the reducer
  • reduce.shuffle.merge.percent – This is the usage threshold of the heap memory defined by the
  • reduce.shuffle.input.buffer.percent at which an in-memory
  • merge of the mapper outputs maintained in memory will be initiated.
  • reduce.merge.inmem.threshold – This is the threshold in terms of the number of mapper outputs accumulated in memory after which the in-memory merge process initiates. Either the threshold defined by this property or the property defined above (mapreduce.reduce.shuffle.merge.percent) needs be reached for in-memory merge process to be initiated.

Apply for Big Data and Hadoop Developer Certification

https://www.vskills.in/certification/certified-big-data-and-apache-hadoop-developer

Back to Tutorials

Hadoop & Mapreduce Tutorial | MapReduce Internals – The Map Phase
JobTracker and TaskTracker classes

Get industry recognized certification – Contact us

keyboard_arrow_up
Open chat
Need help?
Hello 👋
Can we help you?