This is a hand-in for the Map Reduce assignment of the Big Data
course at the Radboud University, Nijmegen.
Back to the blog index.
This blog post uses a Docker container derived from a Spark notebook. The
Dockerfile
can be found at
https://github.com/rubigdata/hello-hadoop-camilstaps/, if you have access
rights. The Dockerfile
takes care of installing Hadoop, setting up SSH and
compiles the FrequencyAnalyser.java
that we will use in the example below.
To build the Docker image, simply run:
$ docker build -t hadoop .
Then you can run it with:
$ docker run --rm --name hadoop hadoop
And attach a new shell to it using:
$ docker exec hadoop /bin/bash
To gain some experience with Hadoop, we will run a simple example command, both on a single host and on a simulated cluster.
Hadoop comes with a set of examples. From /opt/hadoop-2.7.3
, we can for
example do the following:
# cp -R etc/hadoop input
# hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'
This command gives a lot of logging output, telling us about different jobs
that start and finish. It basically grep
s through the settings (etc/hadoop
)
which we had copied to the input for the regular expression dfs[a-z.]+
, which
roughly speaking gives us settings related to the distributed file
system. This is a filter job.
Then, all the lines left are mapped with \s -> (s,1)
: for each setting that
we found, we create a tuple containing the setting and the occurrence count (at
that point 1).
In the reduce step, tuples with equal first elements (the settings) are
combined by summing their second elements, effectively counting all occurrences
of the setting in etc/hadoop
.
The end result is sorted and written to the output. We can inspect it:
# cat output/part-*
6 dfs.audit.logger
4 dfs.class
3 dfs.server.namenode.
2 dfs.period
2 dfs.audit.log.maxfilesize
2 dfs.audit.log.maxbackupindex
1 dfsmetrics.log
1 dfsadmin
1 dfs.servers
1 dfs.file
The output
directory that has been created also contains a _SUCCESS
file,
which can be seen as the job’s ‘exit code’, and two .crc
files which are used
for integrity checks.
The above ran mapper and reduction jobs on a single host. We can also simulate
a cluster on a single machine. This is done by editing two configuration files
(e.g., using vim
):
/opt/hadoop-2.7.3/etc/hadoop/core-site.xml
:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
This makes Hadoop use the distributed file system (instead of the local one, as above) by default.
/opt/hadoop-2.7.3/etc/hadoop/hdfs-site.xml
:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
In this toy example, we don’t care too much about our data and don’t need to have it replicated. Also, in the end this still runs on one machine, so replication is only useful to a rather limited extent. We therefore override the default replication factor of 3.
We now need to format the file system, start it up (for which we need to SSH to our own machine - on a real cluster, we would SSH to another machine - so we need to start the SSH daemon) and create some required files:
# service start ssh
# /opt/hadoop-2.7.3/sbin/start-dfs.sh
# hdfs dfs -mkdir /user
# hdfs dfs -mkdir /user/root
We copy our data to the DFS, and run the same Hadoop command as before. However, because we changed the configuration, it will now run on the simulated cluster. We then get the output from the DFS in order to inspect it.
hdfs dfs -put etc/hadoop input
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'
hdfs dfs -get output output
The output differs slightly from before:
# cat output/part-*
6 dfs.audit.logger
4 dfs.class
3 dfs.server.namenode.
2 dfs.period
2 dfs.audit.log.maxfilesize
2 dfs.audit.log.maxbackupindex
1 dfsmetrics.log
1 dfsadmin
1 dfs.servers
1 dfs.replication
1 dfs.file
The dfs.replication
line did not exist above. This is expected, because we
have added that setting ourselves.
To clean up, run sbin/stop-dfs.sh
.
Frequency analysis is a method in cryptanalysis which can be used to
break classical ciphers such as Caesar and Vigènere. It makes use of the fact
that in natural language, letters are not distributed uniformly. Hence,
counting how often letters occur in ciphertext and comparing that to a typical
distribution of a particular language may give some insight in what ciphertext
letter maps to what plaintext letter. In a more advanced setting, we can also
analyse digraphs and trigraphs: groups of two and three letters. For
example, in English, the trigraph the
is very common. If in a ciphertext the
trigraph wkh
is roughly equally common, that ciphertext may have been encoded
with Caesar-3. Besides cryptanalysis, frequency analysis also has applications
in other fields, such as historical linguistic research.
Counting letter, digraph and trigraph occurrences can be represented as a Map
Reduce task. The mapper extracts all groups of n letters (1 for letters, 2
for digraphs, etc.) and emits them with a count of 1
. The reducer reduces a
number of inputs with the same letter by summing their counts.
An example is given in
FrequencyAnalyser.java
.
In the Docker image we have used, this has already been compiled. It also
includes the complete works of William Shakespeare, taken from project
Gutenberg, in /data/100.txt.utf-8
.
We can run the frequency analyser on this file using:
$ cd /env
$ hadoop jar fa.jar FrequencyAnalyser /data/100.txt.utf-8 output 3
The 3
in this commands makes that we search for trigraphs. This is done using
Hadoop’s Configuration
class, as
explained by Praveen Sripati:
Configuration conf = new Configuration();
if (args.length >= 3)
conf.set("sublen", args[2]);
And in the mapper (some implementation details have been omitted):
private int subLen = 1;
protected void setup(Context context) {
subLen = Integer.parseInt(
context.getConfiguration().get("sublen", "1"));
}
public void map(...) {
// ...
for (int i = 0; i < curWord.length() - subLen + 1; i++) {
word.set(curWord.substring(i, i + subLen));
context.write(word, one);
}
// ...
}
The context.write()
call emits the event, which is subsequently picked up by
a reducer. The reducer sums the counts of its inputs in a straightforward
manner:
public void reduce(
Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
result.set(sum);
context.write(key, result);
}
The final output is placed in output
, but is not sorted, as the grep
example above. We can easily sort it ourselves:
$ sort -k2n output/part-r-00000
...
hat 18814
her 18985
you 22280
and 32737
the 52081
Perhaps the trigraph hat
is surprising. This is from Olde English hath
.
Did Shakespeare write more about Romeo or Juliet? Of course, we can answer this question by inspecting the whole list of word counts, but this does not scale. Instead, we can write a slightly modified version of the frequency analyser. We only need to adapt the mapper:
private final static Text romeo = new Text("R");
private final static Text juliet = new Text("J");
public void map(...) {
// ...
if (curWord.equals("romeo"))
context.write(romeo, one);
else if (curWord.equals("juliet"))
context.write(juliet, one);
// ...
}
This analyser is included in the GitHub repository as well, and built into the Docker image. It can be run using the same arguments as the FrequencyAnalyser. The result:
J 76
R 137
So Romeo is mentioned more frequently.
By only emitting events for romeo
and juliet
, we greatly reduce overhead
and reducer work. Here are timing statistics for the FrequencyAnalyser doing a
basic word count and RomeoOrJuliet:
Measure | FrequencyAnalyser | RomeoOrJuliet |
---|---|---|
real | 0m4.800s | 0m3.706s |
user | 0m8.696s | 0m5.192s |
sys | 0m0.244s | 0m0.196s |
We can optimise this even further, by emitting 1
for Romeo and -1
for
Juliet. The outcome (one number) is then the difference in occurrences between
the two. It is positive when Romeo occurs more often, and negative when Juliet
occurs more often. Since this only uses one key, we could override the
partitioner and ignore the key.
Combiners can be placed between mappers and reducers to reduce disk and network
overhead. In Hadoop, a combiner is simply an extension of Reducer
with the
same input and output key and value types. In our case, the combiner can even
be the same as the reducer:
job.setCombinerClass(IntSumReducer.class);
This is how we initially got the boilerplate code.
To measure the efficiency gain, we can compare the running time of the FrequencyAnalyser both with and without the combiner. Here are the results:
Measure | Without combiner | With combiner |
---|---|---|
real | 0m5.908s | 0m4.800s |
user | 0m9.300s | 0m8.696s |
sys | 0m0.252s | 0m0.244s |
The speed gain is noticeable and will be substantial on larger data sets.