Hadoop MapReduce uses typed data at all times when it interacts with user-provided Mappers and Reducers: data read from files into Mappers, emitted by mappers to reducers, and emitted by reducers into output files is all stored in Java objects.
Writable Types
Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. Hadoop provides several stock classes which implement Writable: Text (which stores String data), IntWritable, LongWritable, FloatWritable, BooleanWritable, and several others. The entire list is in the org.apache.hadoop.io package of the Hadoop source.
In addition to these types, you are free to define your own classes which implement Writable. You can organize a structure of virtually any layout to fit your data and be transmitted by Hadoop. As a motivating example, consider a mapper which emits key-value pairs where the key is the name of an object, and the value is its coordinates in some 3-dimensional space. The key is some string-based data, and the value is a structure of the form:
struct point3d {
float x;
float y;
float z;
}
The key can be represented as a Text object, but what about the value? How do we build a Point3D class which Hadoop can transmit? The answer is to implement the Writable interface, which requires two methods:
public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}
The first of these methods initializes all of the fields of the object based on data contained in the binary stream in. The latter writes all the information needed to reconstruct the object to the binary stream out. The DataInput and DataOutput classes (part of java.io) contain methods to serialize most basic types of data; the important contract between your readFields() and write() methods is that they read and write the data from and to the binary stream in the same order. The following code implements a Point3D class usable by Hadoop:
A Point class which implements Writable
public class Point3D implements Writable {
public float x;
public float y;
public float z;
public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}
public Point3D() {
this(0.0f, 0.0f, 0.0f);
}
public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}
public String toString() {
return Float.toString(x) + “, “
+ Float.toString(y) + “, “
+ Float.toString(z);
}
}
Hadoop’s Interface – Hadoop has the Writable interface specified as
import java.io.DataInput ;
import java.io.DataOutput ;
import java.io.IOException ;
public interface Writable
{
void write(DataOutput out) throws IOException ;
void readFields(DataInput in) throws IOException ;
}
The main purpose of this interface is to provide mechanisms for the serialization and deserialization of data as it is passed across the network or read and written from the disk. Every data type to be used as a value input or output from a mapper or reducer (that is, V1, V2, or V3) must implement this interface.
Data to be used as keys ( K1, K2, K3) has a stricter requirement: in addition to Writable , it must also provide an implementation of the standard Java Comparable interface. The specifications are
public interface Comparable
{
public int compareTO( Object obj) ;
}
The compare method returns -1, 0, or 1 depending on whether the compared object is less than, equal to, or greater than the current object. Hadoop also has the WritableComparable interface as –
public interface WritableComparable extends Writable, Comparable
{}
Wrapper Classes – Wrapper classes are used in Hadoop 1, for addressing different data types is also provided and are as
- Primitive Data types – They hold a single primitive value that can be set either at construction or via a setter method. They include BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, VIntWritable (a variable length integer type) and VLongWritable (a variable length long type)
- Array – These classes provide writable wrappers for arrays of other Writable objects. They are ArrayWritable and TwoDArrayWritable
- Map – It is used for key-value pairs. They include –
- AbstractMapWritable: This is a base class for other concrete Writable map implementations
- MapWritable: This is a general purpose map mapping Writable keys to Writable values
- SortedMapWritable: This is a specialization of the MapWritable class that also implements the SortedMap interface
Custom Key Types
As written, the Point3D type will work as a value type like we require for the mapper problem described above. But what if we want to emit Point3D objects as keys too? In Hadoop MapReduce, if (key, value) pairs sent to a single reduce task include multiple keys, the reducer will process the keys in sorted order. So key types must implement a stricter interface, WritableComparable. In addition to being Writable so they can be transmitted over the network, they also obey Java’s Comparable interface. The following code listing extends Point3D to meet this interface:
A WritableComparable version of Point3D
public class Point3D implements WritableComparable {
public float x;
public float y;
public float z;
public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}
public Point3D() {
this(0.0f, 0.0f, 0.0f);
}
public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}
public String toString() {
return Float.toString(x) + “, “
+ Float.toString(y) + “, “
+ Float.toString(z);
}
/** return the Euclidean distance from (0, 0, 0) */
public float distanceFromOrigin() {
return (float)Math.sqrt(x*x + y*y + z*z);
}
public int compareTo(Point3D other) {
float myDistance = distanceFromOrigin();
float otherDistance = other.distanceFromOrigin();
return Float.compare(myDistance, otherDistance);
}
public boolean equals(Object o) {
if (!(other instanceof Point3D)) {
return false;
}
Point3D other = (Point3D)o;
return this.x == other.x && this.y == other.y
&& this.z == other.z;
}
public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}
}
It is important for key types to implement hashCode() as well. The methods hashCode() and equals() have been provided in this version of the class as well.
Using Custom Types
Now that you have created a custom data type, Hadoop must be told to use it. You can control the output key or value data type for a job by using the setOutputKeyClass() and setOutputValueClass() methods of the JobConf object that defines your job. By default, this will set the types expected as output from both the map and reduce phases. If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf’s setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer. The types delivered as input to the Mapper are governed by the InputFormat used.
Faster Comparison Operations
The default sorting process for keys will read instances of the key type in from a stream, parsing the byte stream with the readFields() method of the key class, and then call the compareTo() method of the key class on the two objects. For faster performance, it may be possible to decide on an ordering between two keys just by looking at the byte streams and without parsing all of the data contained therein. For example, consider comparing strings of text. If characters are read in sequentially, then a decision can be made on their ordering as soon as a character position is found where the two strings differ. Even if all of the bytes for the object must be read in, the object itself does not necessarily need to be instantiated around those bytes. To support this higher-speed sorting mechanism, you can extend the WritableComparator class with a comparator specific to your own data type. In particular, the method which should be overridden is
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
The default implementation is in the class org.apache.hadoop.io.WritableComparator. The relevant method has been reproduced here:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
Its operation is exactly as described above; it performs the straightforward comparison of the two objects after they have been individually deserialized from their separate byte streams (the b variables), which each have their own start offset (s) and length (l) attributes. Both objects must be fully constructed and deserialized before comparison can occur. The Text class, on the other hand, allows incremental comparison via its own implementation of this method. The code from org.apache.hadoop.io.Text is shown here:
/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}
The Text object is serialized by first writing its length field to the byte stream, followed by the UTF-encoded string. The method decodeVIntSize determines the length of the integer describing the length of the byte stream. The comparator then skips these bytes, directly comparing the UTF-encoded bytes of the actual string-portion of the stream in the compareBytes() method. As soon as it finds a character in which the two streams differ, it returns a result without examining the rest of the strings.
You do not need to manually specify this comparator’s use in your Hadoop programs. Hadoop automatically uses this special comparator implementation for Text data due to the following code being added to Text’s static initialization:
static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}
Defining custom writable types allows you to intelligently use Hadoop to manipulate higher-level data structures, without needing to use toString() to convert all your data types to text for sending over the network. If you will be using a type in a lot of MapReduce jobs, or you must process a very large volume of them (as is usually the case in Hadoop), defining your own data type classes will provide a significant performance benefit.