PartA: The Basics
Environment versions
Spark and Scala versions checked using
sc.version, util.Properties.versionString
and found to be Spark 3.1.1 and Scala version 2.12.10
For the rest of the assignment, Scala will be treated as host (base) language, and Spark as one of the guest ones. So Spark’s functionality will be accessed via a Spark Context in Zeppelin Notebooks, which will be available as a variable called sc.
Some very basic exmples were tried out to get a grip on Scala. It was easy for me as I have had a lot of experience in languages such as Java, Dart, Kotlin, Haskell, etc. before. Hence, I’ll refrain from shedding more light on my adventures with Scala.
My first RDD
using the statement
val rdd = sc.parallelize(0 to 999,8)
directly distribute a Scala collection (from 0 to 999) to form an RDD (in 8 partitions). No output is shown due to lazy evaluations (no action used, only a single transformation, which is not enough to trigger any actual execution).
The first action comes in the form of the statement
val sample = rdd.takeSample(false, 4)
which takes 4 sample from the rdd giving
sample: Array[Int] = Array(305, 480, 91, 76) along with a higher execution time (of ~12 secs) as compared to running just the transformations. Additionally the UI for stages get updated as well to show 8 completed jobs.
8 jobs were fired off to be able to partition the provided range into 8 in parallel.
Counting words (‘Hello World’)
Gutenberg data (with Shakespeare plays) was downloading and placed in the working directly succesfully.
Prelinaries
The number of words was counted using the following:
val lines = sc.textFile("file:///opt/hadoop/100.txt")
println( "Lines:\t", lines.count, "\n" +
"Chars:\t", lines.map(s => s.length).
reduce((v_i, v_j) => v_i + v_j))
which gives the output of
(Lines: ,147838,
Chars: ,5545144)
. The map transforms each sentence to its length in characters, and the following reducer, which continuously adds the summation then
The length of the longest sentence in the corpus
println(
"Longest sentence:\t", lines.map(s => s.length)
.reduce( (s1, s2) => s1 max s2 )
)
s1 will always hold the length of the longest sentence so far. The out put was Longest sentence: 78 characters.
map vs flatMap
Running the following in intermediate steps yields the following discoveries
val words = lines.flatMap(line => line.split(" "))
.filter(_ != "")
.map(word => (word,1))
val wc = words.reduceByKey(_ + _)
wc.take(10)
map yields a more wrapped data-type Array(Array(""), Array(Project, Gutenberg’s, ...), ..., by) whereas flatMap unwraps it to Array("", Project, Gutenberg’s, ..., by).
Then the ""s are omitted by using the filter() transformation. The last map adds a counter to each new word.
The ‘reduceByKey()’ performs adding the count of each unique word (=1) whenever another of such word appears. take(10) action displays 10 of the sampled counts of these unique words then.
The full process tree of the query was seen by using wc.toDebugString and checking against the Spark UI.
To count or not to count
To count the most frequent words, val top10 = wc.takeOrdered(10) was used. But this gave the ordering based on the ASCII values and not the count itself. To fix this, the a custom ordering was supplied to the method wc.takeOrdered(10)(Ordering[Int].reverse.on(x=>x._2)).
By this the ordering is pegged to the second element in the tuple (x=>x._2). The order is asked to be reversed (otherwise it would have been ascending).
Specific word frequencies
Frequencies for a specidic word (e.g. Name) were found using
wc.filter(_._1 == word).collect
by which the the key part of the tuple (=_._1)
Alternatives
A lot of alternatives were mentioned for carrying out the same task of computing the top N results.
Some alternatives took way longer than others. Some were made to be convoluting. But the purpose of each element in each of the alternatives, was understood thoroughly.
Results I/O
The individual word-pairs were stored in the local filesystem by issuing the following:
words.saveAsTextFile("file:///opt/hadoop/wc")
For checking, a Shell command was issued directly from the Notebook
ls -ahl /opt/hadoop/wc
The there were the following:
-rw-r--r-- 1 hadoop hadoop 8 Apr 4 15:59 ._SUCCESS.crc
-rw-r--r-- 1 hadoop hadoop 70K Apr 4 15:59 .part-00000.crc
-rw-r--r-- 1 hadoop hadoop 0 Apr 4 15:59 _SUCCESS
-rw-r--r-- 1 hadoop hadoop 8.7M Apr 4 15:59 part-00000
of which only the last two were viewable. The _SUCCESS denoted the exit code of the job submitted and part-00000 the actual unique word counts.
How to count
val words = lines.flatMap(line => line.split(" "))
.map(w => w.toLowerCase().replaceAll("(^[^a-z]+|[^a-z]+$)", ""))
.filter(_ != "")
.map(w => (w,1))
.reduceByKey( _ + _ )
[^a-z] allows only parts of the string to be matched that contain whole words. The $ at the end signifies the matching to check at the end as well (as ^ signifies the beginning).
Using the following to get the number of appearances of ‘Macbeth’ the complete works, again:
words
.filter(_._1 == "macbeth")
.collect
.map({case (w,c) => "%s occurs %d times"
.format(w,c)
})
.map(println)
the following output is received macbeth occurs 285 times.
The counts are now higher as now (using a regex pattern) even the edge-cases such as “Macbeth’s” or “Macbeth,” are accounted for, as compared to before, when only the exact string “ Macbeth “ was being counted.
Head over to the Advanced Part (B)