Profiling
Profiling is a utility to get a representative (2 or 3) sample of built-in java profiler for a sample of maps and reduces.
In Hadoop 1, user can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapred.task.profile. The value can be set using the api JobConf.setProfileEnabled(boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job.
Once user configures that profiling is needed, she/he can use the configuration property mapred.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. The value can be set using the api JobConf.setProfileTaskRange(boolean,String). By default, the specified range is 0-2.
User can also specify the profiler configuration arguments by setting the configuration property mapred.task.profile.params. The value can be specified using the api JobConf.setProfileParams(String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for the profiling parameters is -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s
In Hadoop 2, user can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapreduce.task.profile. The value can be set using the api Configuration.set(MRJobConfig.TASK_PROFILE, boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job.
Once user configures that profiling is needed, she/he can use the configuration property mapreduce.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. The value can be set using the api Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String). By default, the specified range is 0-2.
User can also specify the profiler configuration arguments by setting the configuration property mapreduce.task.profile.params. The value can be specified using the api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for the profiling parameters is -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.
Debugging
The MapReduce framework provides a facility to run user-provided scripts for debugging. When a MapReduce task fails, a user can run a debug script, to process task logs for example. The script is given access to the task’s stdout and stderr outputs, syslog and jobconf. The output from the debug script’s stdout and stderr is displayed on the console diagnostics and also as part of the job UI.
The user needs to use DistributedCache to distribute and symlink the script file.
In Hadoop 1, a quick way to submit the debug script is to set values for the properties mapred.map.task.debug.script and mapred.reduce.task.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIs JobConf.setMapDebugScript(String) and JobConf.setReduceDebugScript(String) . In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively. In Hadoop 2, to submit the debug script is to set values for the properties mapreduce.map.debug.script and mapreduce.reduce.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String) and Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String). In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively.
The arguments to the script are the task’s stdout, stderr, syslog and jobconf files. The debug command, run on the node where the MapReduce task failed, is:
$script $stdout $stderr $syslog $jobconf
Pipes programs have the c++ program name as a fifth argument for the command. Thus for the pipes programs the command is
$script $stdout $stderr $syslog $jobconf $program
For pipes, a default script is run to process core dumps under gdb, prints stack trace and gives info about running threads.
Hadoop MapReduce provides facilities for the application-writer to specify compression for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also comes bundled with CompressionCodec implementation for the zlib compression algorithm. The gzip, bzip2, snappy, and lz4 file format are also supported.
Hadoop also provides native implementations of the above compression codecs for reasons of both performance (zlib) and non-availability of Java libraries. More details on their usage and availability are available here.
Intermediate Output – In Hadoop 1, applications can control compression of intermediate map-outputs via the JobConf.setCompressMapOutput(boolean) api and the CompressionCodec to be used via the JobConf.setMapOutputCompressorClass(Class) api. In Hadoop 2, applications can control compression of intermediate map-outputs via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api and the CompressionCodec to be used via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api.
Job Output – In Hadoop 1, applications can control compression of job-outputs via the FileOutputFormat.setCompressOutput(JobConf, boolean) api and the CompressionCodec to be used can be specified via the FileOutputFormat.setOutputCompressorClass(JobConf, Class) api. If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK – defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType) api.
In Hadoop 2, applications can control compression of job-outputs via the FileOutputFormat.setCompressOutput(Job, boolean) api and the CompressionCodec to be used can be specified via the FileOutputFormat.setOutputCompressorClass(Job, Class) api. If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK – defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api. If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK – defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api.
Bad Records – Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. Applications can control this feature through the SkipBadRecords class.
This feature can be used when map tasks crash deterministically on certain input. This usually happens due to bugs in the map function. Usually, the user would have to fix these bugs. This is, however, not possible sometimes. The bug may be in third party libraries, for example, for which the source code is not available. In such cases, the task never completes successfully even after multiple attempts, and the job fails. With this feature, only a small portion of data surrounding the bad records is lost, which may be acceptable for some applications (those performing statistical analysis on very large data, for example).
By default this feature is disabled. For enabling it, refer to SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
With this feature enabled, the framework gets into ‘skipping mode’ after a certain number of map failures. In ‘skipping mode’, map tasks maintain the range of records being processed. To do this, the framework relies on the processed record counter. This counter enables the framework to know how many records have been processed successfully, and hence, what record range caused a task to crash. On further attempts, this range of records is skipped.
The number of records skipped depends on how frequently the processed record counter is incremented by the application. It is recommended that this counter be incremented after every record is processed. This may not be possible in some applications that typically batch their processing. In such cases, the framework may skip additional records surrounding the bad record. Users can control the number of skipped records through SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long). The framework tries to narrow the range of skipped records using a binary search-like approach. The skipped range is divided into two halves and only one half gets executed. On subsequent failures, the framework figures out which half contains bad records. A task will be re-executed till the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use JobConf.setMaxMapAttempts(int) and JobConf.setMaxReduceAttempts(int).
Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through SkipBadRecords.setSkipOutputPath(JobConf, Path).