Hadoop and Map Reduce

This is a hand-in for the Map Reduce assignment of the Big Data course at the Radboud University, Nijmegen.
Back to the blog index.

Hadoop

This blog post uses a Docker container derived from a Spark notebook. The Dockerfile can be found at https://github.com/rubigdata/hello-hadoop-camilstaps/, if you have access rights. The Dockerfile takes care of installing Hadoop, setting up SSH and compiles the FrequencyAnalyser.java that we will use in the example below.

To build the Docker image, simply run:

$ docker build -t hadoop .

Then you can run it with:

$ docker run --rm --name hadoop hadoop

And attach a new shell to it using:

$ docker exec hadoop /bin/bash

To gain some experience with Hadoop, we will run a simple example command, both on a single host and on a simulated cluster.

Standalone operation

Hadoop comes with a set of examples. From /opt/hadoop-2.7.3, we can for example do the following:

# cp -R etc/hadoop input
# hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'

This command gives a lot of logging output, telling us about different jobs that start and finish. It basically greps through the settings (etc/hadoop) which we had copied to the input for the regular expression dfs[a-z.]+, which roughly speaking gives us settings related to the distributed file system. This is a filter job.

Then, all the lines left are mapped with \s -> (s,1): for each setting that we found, we create a tuple containing the setting and the occurrence count (at that point 1).

In the reduce step, tuples with equal first elements (the settings) are combined by summing their second elements, effectively counting all occurrences of the setting in etc/hadoop.

The end result is sorted and written to the output. We can inspect it:

# cat output/part-*
6	dfs.audit.logger
4	dfs.class
3	dfs.server.namenode.
2	dfs.period
2	dfs.audit.log.maxfilesize
2	dfs.audit.log.maxbackupindex
1	dfsmetrics.log
1	dfsadmin
1	dfs.servers
1	dfs.file

The output directory that has been created also contains a _SUCCESS file, which can be seen as the job’s ‘exit code’, and two .crc files which are used for integrity checks.

Pseudo-distributed cluster

The above ran mapper and reduction jobs on a single host. We can also simulate a cluster on a single machine. This is done by editing two configuration files (e.g., using vim):

This makes Hadoop use the distributed file system (instead of the local one, as above) by default.

In this toy example, we don’t care too much about our data and don’t need to have it replicated. Also, in the end this still runs on one machine, so replication is only useful to a rather limited extent. We therefore override the default replication factor of 3.

We now need to format the file system, start it up (for which we need to SSH to our own machine - on a real cluster, we would SSH to another machine - so we need to start the SSH daemon) and create some required files:

# service start ssh
# /opt/hadoop-2.7.3/sbin/start-dfs.sh
# hdfs dfs -mkdir /user
# hdfs dfs -mkdir /user/root

We copy our data to the DFS, and run the same Hadoop command as before. However, because we changed the configuration, it will now run on the simulated cluster. We then get the output from the DFS in order to inspect it.

hdfs dfs -put etc/hadoop input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'
hdfs dfs -get output output

The output differs slightly from before:

# cat output/part-*
6	dfs.audit.logger
4	dfs.class
3	dfs.server.namenode.
2	dfs.period
2	dfs.audit.log.maxfilesize
2	dfs.audit.log.maxbackupindex
1	dfsmetrics.log
1	dfsadmin
1	dfs.servers
1	dfs.replication
1	dfs.file

The dfs.replication line did not exist above. This is expected, because we have added that setting ourselves.

To clean up, run sbin/stop-dfs.sh.

Map Reduce example: frequency analysis

Frequency analysis is a method in cryptanalysis which can be used to break classical ciphers such as Caesar and Vigènere. It makes use of the fact that in natural language, letters are not distributed uniformly. Hence, counting how often letters occur in ciphertext and comparing that to a typical distribution of a particular language may give some insight in what ciphertext letter maps to what plaintext letter. In a more advanced setting, we can also analyse digraphs and trigraphs: groups of two and three letters. For example, in English, the trigraph the is very common. If in a ciphertext the trigraph wkh is roughly equally common, that ciphertext may have been encoded with Caesar-3. Besides cryptanalysis, frequency analysis also has applications in other fields, such as historical linguistic research.

Counting letter, digraph and trigraph occurrences can be represented as a Map Reduce task. The mapper extracts all groups of n letters (1 for letters, 2 for digraphs, etc.) and emits them with a count of 1. The reducer reduces a number of inputs with the same letter by summing their counts.

An example is given in FrequencyAnalyser.java. In the Docker image we have used, this has already been compiled. It also includes the complete works of William Shakespeare, taken from project Gutenberg, in /data/100.txt.utf-8.

We can run the frequency analyser on this file using:

$ cd /env
$ hadoop jar fa.jar FrequencyAnalyser /data/100.txt.utf-8 output 3

The 3 in this commands makes that we search for trigraphs. This is done using Hadoop’s Configuration class, as explained by Praveen Sripati:

Configuration conf = new Configuration();
if (args.length >= 3)
	conf.set("sublen", args[2]);

And in the mapper (some implementation details have been omitted):

private int subLen = 1;

protected void setup(Context context) {
	subLen = Integer.parseInt(
			context.getConfiguration().get("sublen", "1"));
}

public void map(...) {
	// ...
	for (int i = 0; i < curWord.length() - subLen + 1; i++) {
		word.set(curWord.substring(i, i + subLen));
		context.write(word, one);
	}
	// ...
}

The context.write() call emits the event, which is subsequently picked up by a reducer. The reducer sums the counts of its inputs in a straightforward manner:

public void reduce(
		Text key,
		Iterable<IntWritable> values,
		Context context) throws IOException, InterruptedException {
	int sum = 0;
	for (IntWritable val : values)
		sum += val.get();
	result.set(sum);
	context.write(key, result);
}

The final output is placed in output, but is not sorted, as the grep example above. We can easily sort it ourselves:

$ sort -k2n output/part-r-00000
...
hat	18814
her	18985
you	22280
and	32737
the	52081

Perhaps the trigraph hat is surprising. This is from Olde English hath.

Example

Did Shakespeare write more about Romeo or Juliet? Of course, we can answer this question by inspecting the whole list of word counts, but this does not scale. Instead, we can write a slightly modified version of the frequency analyser. We only need to adapt the mapper:

private final static Text romeo = new Text("R");
private final static Text juliet = new Text("J");

public void map(...) {
	// ...
	if (curWord.equals("romeo"))
		context.write(romeo, one);
	else if (curWord.equals("juliet"))
		context.write(juliet, one);
	// ...
}

This analyser is included in the GitHub repository as well, and built into the Docker image. It can be run using the same arguments as the FrequencyAnalyser. The result:

J	76
R	137

So Romeo is mentioned more frequently.

By only emitting events for romeo and juliet, we greatly reduce overhead and reducer work. Here are timing statistics for the FrequencyAnalyser doing a basic word count and RomeoOrJuliet:

Measure FrequencyAnalyser RomeoOrJuliet
real 0m4.800s 0m3.706s
user 0m8.696s 0m5.192s
sys 0m0.244s 0m0.196s

We can optimise this even further, by emitting 1 for Romeo and -1 for Juliet. The outcome (one number) is then the difference in occurrences between the two. It is positive when Romeo occurs more often, and negative when Juliet occurs more often. Since this only uses one key, we could override the partitioner and ignore the key.

Combiners

Combiners can be placed between mappers and reducers to reduce disk and network overhead. In Hadoop, a combiner is simply an extension of Reducer with the same input and output key and value types. In our case, the combiner can even be the same as the reducer:

job.setCombinerClass(IntSumReducer.class);

This is how we initially got the boilerplate code.

To measure the efficiency gain, we can compare the running time of the FrequencyAnalyser both with and without the combiner. Here are the results:

Measure Without combiner With combiner
real 0m5.908s 0m4.800s
user 0m9.300s 0m8.696s
sys 0m0.252s 0m0.244s

The speed gain is noticeable and will be substantial on larger data sets.