Basics

There are two mapreduce packages in HBase as in MapReduce itself: org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce. The former does old-style API and the latter the new mode. The latter has more facility though you can usually find an equivalent in the older package. Pick the package that goes with your MapReduce deploy. When in doubt or starting over, pick org.apache.hadoop.hbase.mapreduce.

MapReduce Basics

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application. Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.

The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.

Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.

  • Hadoop Streaming is a utility which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.
  • Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (non JNI™ based).

Inputs and Outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.

The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

HBase, MapReduce, and the CLASSPATH

By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.

To give the MapReduce jobs the access they need, you could add hbase-site.xml_to _$HADOOP_HOME/conf and add HBase jars to the $HADOOP_HOME/lib directory. You would then need to copy these changes across your cluster. Or you could edit $HADOOP_HOME/conf/hadoop-env.sh and add hbase dependencies to the HADOOP_CLASSPATH variable. Neither of these approaches is recommended because it will pollute your Hadoop install with HBase references. It also requires you restart the Hadoop cluster before Hadoop can use the HBase data.

The recommended approach is to let HBase add its dependency jars and use HADOOP_CLASSPATH or -libjars.

Since HBase 0.90.x, HBase adds its dependency JARs to the job configuration itself. The dependencies only need to be available on the local CLASSPATH and from here they’ll be picked up and bundled into the fat job jar deployed to the MapReduce cluster. A basic trick just passes the full hbase classpath — all hbase and dependent jars as well as configurations — to the mapreduce job runner letting hbase utility pick out from the full-on classpath what it needs adding them to the MapReduce job configuration for how this is done).

The following example runs the bundled HBase RowCounter MapReduce job against a table named usertable. It sets into HADOOP_CLASSPATH the jars hbase needs to run in an MapReduce context (including configuration files such as hbase-site.xml). Be sure to use the correct version of the HBase JAR for your system; replace the VERSION string in the below command line w/ the version of your local hbase install. The backticks (` symbols) cause the shell to execute the sub-commands, setting the output of hbase classpath into HADOOP_CLASSPATH. This example assumes you use a BASH-compatible shell.

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \

${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \

org.apache.hadoop.hbase.mapreduce.RowCounter usertable

The above command will launch a row counting mapreduce job against the hbase cluster that is pointed to by your local configuration on a cluster that the hadoop configs are pointing to.

The main for the hbase-mapreduce.jar is a Driver that lists a few basic mapreduce tasks that ship with hbase. For example, presuming your install is hbase 2.0.0-SNAPSHOT:

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \

${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar

An example program must be given as the first argument.

Valid program names are:

CellCounter: Count cells in HBase table.

WALPlayer: Replay WAL files.

completebulkload: Complete a bulk data load.

copytable: Export a table from local cluster to peer cluster.

export: Write table data to HDFS.

exportsnapshot: Export the specific snapshot to a given FileSystem.

import: Import data written by Export.

importtsv: Import data in TSV format.

rowcounter: Count rows in HBase table.

verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn’t work for incrementColumnValues’d cells since the timestamp is changed after being appended to the log.

You can use the above listed shortnames for mapreduce jobs as in the below re-run of the row counter job (again, presuming your install is hbase 2.0.0-SNAPSHOT):

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \

${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \

rowcounter usertable

You might find the more selective hbase mapredcp tool output of interest; it lists the minimum set of jars needed to run a basic mapreduce job against an hbase install. It does not include configuration. You’ll probably need to add these if you want your MapReduce job to find the target cluster. You’ll probably have to also add pointers to extra jars once you start to do anything of substance. Just specify the extras by passing the system propery -Dtmpjars when you run hbase mapredcp.

For jobs that do not package their dependencies or call TableMapReduceUtil#addDependencyJars, the following command structure is necessary:

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ‘:’ ‘,’) …

MapReduce Scan Caching

TableMapReduceUtil now restores the option to set scanner caching (the number of rows which are cached before returning the result to the client) on the Scan object that is passed in. This functionality was lost due to a bug in HBase 0.95 (HBASE-11558), which is fixed for HBase 0.98.5 and 0.96.3. The priority order for choosing the scanner caching is as follows:

  • Caching settings which are set on the scan object.
  • Caching settings which are specified via the configuration option hbase.client.scanner.caching, which can either be set manually in hbase-site.xml or via the helper method TableMapReduceUtil.setScannerCaching().
  • The default value HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING, which is set to 100.

Optimizing the caching settings is a balance between the time the client waits for a result and the number of sets of results the client needs to receive. If the caching setting is too large, the client could end up waiting for a long time or the request could even time out. If the setting is too small, the scan needs to return results in several pieces. If you think of the scan as a shovel, a bigger cache setting is analogous to a bigger shovel, and a smaller cache setting is equivalent to more shoveling in order to fill the bucket.

The list of priorities mentioned above allows you to set a reasonable default, and override it for specific operations.

HBase as a MapReduce Job Data Source and Data Sink

HBase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat or MultiTableOutputFormat, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, it is advisable to subclass TableMapper and/or TableReducer. See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

If you run MapReduce jobs that use HBase as source or sink, need to specify source and sink table and column names in your configuration.

When you read from HBase, the TableInputFormat requests the list of regions from HBase and makes a map, which is either a map-per-region or mapreduce.job.maps map, whichever is smaller. If your job only has two maps, raise mapreduce.job.maps to a number greater than the number of regions. Maps will run on the adjacent TaskTracker/NodeManager if you are running a TaskTracer/NodeManager and RegionServer per node. When writing to HBase, it may make sense to avoid the Reduce step and write back into HBase from within your map. This approach works when your job does not need the sort and collation that MapReduce does on the map-emitted data. On insert, HBase ‘sorts’ so there is no point double-sorting (and shuffling data around your MapReduce cluster) unless you need to. If you do not need the Reduce, your map might emit counts of records processed for reporting at the end of the job, or set the number of Reduces to zero and use TableOutputFormat. If running the Reduce step makes sense in your case, you should typically use multiple reducers so that load is spread across the HBase cluster.

A new HBase partitioner, the HRegionPartitioner, can run as many reducers the number of existing regions. The HRegionPartitioner is suitable when your table is large and your upload will not greatly alter the number of existing regions upon completion. Otherwise use the default partitioner.

Map/Reduce
Classes

Get industry recognized certification – Contact us

keyboard_arrow_up
Open chat
Need help?
Hello 👋
Can we help you?