Job Formats

In Hadoop 1, JobClient is the primary interface by which user-job interacts with the JobTracker. JobClient provides facilities to submit jobs, track their progress, access component-tasks’ reports and logs, get the MapReduce cluster’s status information and so on. The job submission process involves

  • Checking the input and output specifications of the job.
  • Computing the InputSplit values for the job.
  • Setting up the requisite accounting information for the DistributedCache of the job, if necessary.
  • Copying the job’s jar and configuration to the MapReduce system directory on the FileSystem.
  • Submitting the job to the JobTracker and optionally monitoring it’s status.

Job history files are also logged to user specified directory hadoop.job.history.user.location which defaults to job output directory. The files are stored in “_logs/history/” in the specified directory. Hence, by default they will be in mapred.output.dir/_logs/history. User can stop logging by giving the value none for hadoop.job.history.user.location

User can view the history logs summary in specified directory using the following command

$ bin/hadoop job -history output-dir

This command will print job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed using the following command

$ bin/hadoop job -history all output-dir

User can use OutputLogFilter to filter log files from the output directory listing. Normally the user creates the application, describes various facets of the job via JobConf, and then uses the JobClient to submit the job and monitor its progress.

Job Authorization – In Hadoop 1, job level authorization and queue level authorization are enabled on the cluster, if the configuration mapred.acls.enabled is set to true. When enabled, access control checks are done by (a) the JobTracker before allowing users to submit jobs to queues and administering these jobs and (b) by the JobTracker and the TaskTracker before allowing users to view job details or to modify a job using MapReduce APIs, CLI or web user interfaces.

A job submitter can specify access control lists for viewing or modifying a job via the configuration properties mapreduce.job.acl-view-job and mapreduce.job.acl-modify-job respectively. By default, nobody is given access in these properties.

However, irrespective of the job ACLs configured, a job’s owner, the superuser and cluster administrators (mapreduce.cluster.administrators) and queue administrators of the queue to which the job was submitted to (mapred.queue.queue-name.acl-administer-jobs) always have access to view and modify a job.

A job view ACL authorizes users against the configured mapreduce.job.acl-view-job before returning possibly sensitive information about a job, like:

  • job level counters
  • task level counters
  • tasks’s diagnostic information
  • task logs displayed on the TaskTracker web UI
  • xml showed by the JobTracker’s web UI

Other information about a job, like its status and its profile, is accessible to all users, without requiring authorization.

A job modification ACL authorizes users against the configured mapreduce.job.acl-modify-job before allowing modifications to jobs, like:

  • killing a job
  • killing/failing a task of a job
  • setting the priority of a job

These operations are also permitted by the queue level ACL, “mapred.queue.queue-name.acl-administer-jobs”, configured via mapred-queue-acls.xml. The caller will be able to do the operation if he/she is part of either queue admins ACL or job modification ACL.

Hadoop 1 Job Control – Users may need to chain MapReduce jobs to accomplish complex tasks which cannot be done via a single MapReduce job. This is fairly easy since the output of the job typically goes to distributed file-system, and the output, in turn, can be used as the input for the next job.

However, this also means that the onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are:

  • runJob(JobConf) : Submits the job and returns only after the job has completed.
  • submitJob(JobConf) : Only submits the job, then poll the returned handle to the RunningJob to query status and make scheduling decisions.
  • setJobEndNotificationURI(String) : Sets up a notification upon job-completion, thus avoiding polling.

In Hadoop 2, job is the primary interface by which user-job interacts with the ResourceManager. Job provides facilities to submit jobs, track their progress, access component-tasks’ reports and logs, get the MapReduce cluster’s status information and so on. The job submission process involves:

  • Checking the input and output specifications of the job.
  • Computing the InputSplit values for the job.
  • Setting up the requisite accounting information for the DistributedCache of the job, if necessary.
  • Copying the job’s jar and configuration to the MapReduce system directory on the FileSystem.
  • Submitting the job to the ResourceManager and optionally monitoring it’s status.

Job history files are also logged to user specified directory mapreduce.jobhistory.intermediate-done-dir and mapreduce.jobhistory.done-dir, which defaults to job output directory. User can view the history logs summary in specified directory using the following command $ mapred job -history output.jhist This command will print job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed using the following command $ mapred job -history all output.jhist. Normally the user uses Job to create the application, describe various facets of the job, submit the job, and monitor its progress.

Hadoop 2 Job Control – The onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are:

  • submit() : Submit the job to the cluster and return immediately.
  • waitForCompletion(boolean) : Submit the job to the cluster and wait for it to finish.

Job Input

InputFormat describes the input-specification for a MapReduce job.

The MapReduce framework relies on the InputFormat of the job to

  • Validate the input-specification of the job.
  • Split-up the input file(s) into logical InputSplit instances, each of which is then assigned to an individual Mapper.
  • Provide the RecordReader implementation used to glean input records from the logical InputSplit for processing by the Mapper.

The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapred.min.split.size.

Clearly, logical splits based on input-size is insufficient for many applications since record boundaries must be respected. In such cases, the application should implement a RecordReader, who is responsible for respecting record-boundaries and presents a record-oriented view of the logical InputSplit to the individual task.

TextInputFormat is the default InputFormat. If TextInputFormat is the InputFormat for a given job, the framework detects input-files with the .gz extensions and automatically decompresses them using the appropriate CompressionCodec. However, it must be noted that compressed files with the above extensions cannot be split and each compressed file is processed in its entirety by a single mapper.

InputSplit – InputSplit represents the data to be processed by an individual Mapper.

Typically InputSplit presents a byte-oriented view of the input, and it is the responsibility of RecordReader to process and present a record-oriented view. FileSplit is the default InputSplit. It sets map.input.file to the path of the input file for the logical split.

RecordReader – RecordReader reads <key, value> pairs from an InputSplit. Typically the RecordReader converts the byte-oriented view of the input, provided by the InputSplit, and presents a record-oriented to the Mapper implementations for processing. RecordReader thus assumes the responsibility of processing record boundaries and presents the tasks with keys and values.

Job Output

OutputFormat describes the output-specification for a MapReduce job. The MapReduce framework relies on the OutputFormat of the job to:

  • Validate the output-specification of the job; for example, check that the output directory doesn’t already exist.
  • Provide the RecordWriter implementation used to write the output files of the job. Output files are stored in a FileSystem.

TextOutputFormat is the default OutputFormat.

OutputCommitter – OutputCommitter describes the commit of task output for a MapReduce job. The MapReduce framework relies on the OutputCommitter of the job to

  • Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job will be moved to RUNNING state.
  • Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion. Job cleanup is done by a separate task at the end of the job. Job is declared SUCCEDED/FAILED/KILLED after the cleanup task completes.
  • Setup the task temporary output. Task setup is done as part of the same task, during task initialization.
  • Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit.
  • Commit of the task output. Once task is done, the task will commit it’s output if required.
  • Discard the task commit. If the task has been failed/killed, the output will be cleaned-up. If task could not cleanup (in exception block), a separate task will be launched with same attempt-id to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce slots, whichever is free on the TaskTracker. And JobCleanup task, TaskCleanup tasks and JobSetup task have the highest priority, and in that order.

Task Side-Effect Files – In some applications, component tasks need to create and/or write to side-files, which differ from the actual job-output files.

In such cases there could be issues with two instances of the same Mapper or Reducer running simultaneously (for example, speculative tasks) trying to open and/or write to the same file (path) on the FileSystem. Hence the application-writer will have to pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapred.output.dir}/_temporary/_${taskid} sub-directory accessible via ${mapred.work.output.dir} for each task-attempt on the FileSystem where the output of the task-attempt is stored. On successful completion of the task-attempt, the files in the ${mapred.output.dir}/_temporary/_${taskid} (only) are promoted to ${mapred.output.dir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This process is completely transparent to the application.

The application-writer can take advantage of this feature by creating any side-files required in ${mapred.work.output.dir} during execution of a task via FileOutputFormat.getWorkOutputPath(), and the framework will promote them similarly for succesful task-attempts, thus eliminating the need to pick unique paths per task-attempt.

Note: The value of ${mapred.work.output.dir} during execution of a particular task-attempt is actually ${mapred.output.dir}/_temporary/_{$taskid}, and this value is set by the MapReduce framework. So, just create any side-files in the path returned by FileOutputFormat.getWorkOutputPath() from MapReduce task to take advantage of this feature.

The entire discussion holds true for maps of jobs with reducer=NONE (i.e. 0 reduces) since output of the map, in that case, goes directly to HDFS.

RecordWriter – RecordWriter writes the output <key, value> pairs to an output file. RecordWriter implementations write the job outputs to the FileSystem.

MapReduce Internals
Debugging and Profiling

Get industry recognized certification – Contact us

keyboard_arrow_up
Open chat
Need help?
Hello 👋
Can we help you?