hey-spark-2021-LahaLuhem

hey-spark-2021-LahaLuhem created by GitHub Classroom

View project on GitHub

PartB: The Advanced

A Range of Numbers

Using the following expressions:

import org.apache.spark.HashPartitioner

val rddRange = sc.parallelize(0 to 999,8)

printf( "Number of partitions: %d\n", rddRange.partitions.length)
rddRange.partitioner

the output is expectantly Number of partitions: 8.
This way the first 1000 naturals numbers can created quickly, using 8 partitions in parallel, as an RDD. If the number of partitions is not supplied beforehand, the framework would smartly use the maximum number of cores alloted to the docker engine automatically.


Pairs

Using default partioners

In order to create a <K,V> pair from the RDD initialized before, we will use the following mapping:

val rddPairs = rddRange.map(x => (x % 100, 1000 - x))

,by which each orignal value in the RDD will be used to derive two numbers (forming a pair). So the resultant sequences will 10 pairs of keys (ranging from 0 to 99), and the corresponding values decreasing (in counts of 1 from 1000).
Using take(15) action confirms my prediction

Using custom paartioners

Using org.apache.spark.HashPartitioner (int partions), a new partioning scheme can be defined as done below:-

val rddPairsPart2 = rddPairs.partitionBy(new HashPartitioner(2))

which will use 2 partions now, using HashMaps for look-ups. Except the order of the pairs would be different (as HashPartitioner will use the identies and not conent/value to order).\

Comparision between approaches

Even though the end result would be the same dataset (each of keys from 0 to 9 will have 10 values each), the ordering of the pairs would be different (and unpredictable) for the one with using HashPartitioner.
Furthermore, the number of partitions used would default back to the original 8 when using rddPairs.groupByKey() (observable by using rddPairs.groupByKey().partitions.map(p => (p, p.index, p.hashCode)) giving 8 elements). When rddPairsPart2.groupByKey().partitions.map(p => (p, p.index, p.hashCode)), the resultant number of elements is 2.


And the story continues…

Some more samples runs were performed, and their output was predicted

  1. val rddPGP2Count = rddPairsGroupPart2
                                 .map( {case(x,y) => 
                                     (
                                         x,
                                         y.reduce((a,b) => a + b
                                     )
                                 )} )
    

    For every <k,v> (= pair1) pair present, k will be a number [0, 99] and v will contain a list of all the corresponding values that have been aggregated for that k (within [0, 1000]). So case(x,y) will match for all the elements, and y will be reduced to the sum of all the corresponding values for that key.
    The value was something that was very much expected.

  2. rddPGP2Count.partitions.size
    

    This would yield 2 as the partitioning was done using the HashPartitioner.

  3. printf( "Number of partitions: %d\n", rddPGP2Count.partitions.length)
    rddPGP2Count.partitioner
    

    yields Number of partitions: 2 and Option[org.apache.spark.Partitioner] = None. The Partitioner is None now as after the new mapping given, the origianal sorting structure (in the form of HashMaps) is distorted, and so the Partioning scheme is switched to default.

  4. val rddPairsPart4 = rddPairs.partitionBy(new HashPartitioner(4))
    
    rddPairsPart4.take(10)
    rddPairsPart4.partitioner
    

    Only when a new HashPartitioner is defined again explicitelty, will the partioning the sets be reordered using that scheme. This was verified by getting Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4) as the result.

  5. val rddA = rddPairsPart4.values.map( x => x  + 10 )
       
    printf( "Number of partitions: %d\n", rddA.partitions.length)
    rddA.partitioner 
    

    With a new transformation operation, the HashPartitioner scheme would be unusable again, so will switc to default (= None).
    This was confirmed by getting Option[org.apache.spark.Partitioner] = None and Number of partitions: 4 as the result of the above code. So in summary, if the resultant structure does not fit the structure of the partioner used, it will default to the default one for subsequent runs.

Differing results for values.map() and mapValues()

val rddA = rddPairsPart4.values.map( x => x  + 10 )
val rddB = rddPairsPart4.mapValues( x => x  + 10 )

rddA.take(10)
rddB.take(10)

Only rddB will have elements as mapValues() is explicitely meant for an RDD with a tuple(K,V) as its structure, and will iterater over the ‘V’ part. for the rddA, its values attribute is referenced, but since it’s not a proper HashMap, it does not have any element in its value attribure, so a Unit will be returned, and the result will be empty.
This was verified by the results from the take() statements.


Controlling parallelism 2

Parallelism can be controlled more finely by using repartition() and coalesce():

val rddC = rddA.repartition(2)
val rddD = rddB.coalesce(2)

%spark
printf( "Number of rddC partitions: %d\n", rddC.partitions.length)
printf( "Number of rddD partitions: %d\n", rddC.partitions.length)

Different under-the-hood mechanisms

The partion sizes of the new RDDs resultantly are both ‘2’ (from the original 4). But the manner in which they are merged differ within repartition() and coalesce() as seen by running:

println(rddA.toDebugString)
println()
println(rddC.toDebugString)


println(rddB.toDebugString)
println()
println(rddD.toDebugString)

resulting in

4) MapPartitionsRDD[93] at map at <console>:29 []
 |  MapPartitionsRDD[92] at values at <console>:29 []
 |  ShuffledRDD[91] at partitionBy at <console>:29 []
 +-(8) MapPartitionsRDD[1] at map at <console>:27 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:28 []

(2) MapPartitionsRDD[114] at repartition at <console>:31 []
 |  CoalescedRDD[113] at repartition at <console>:31 []
 |  ShuffledRDD[112] at repartition at <console>:31 []
 +-(4) MapPartitionsRDD[111] at repartition at <console>:31 []
    |  MapPartitionsRDD[93] at map at <console>:29 []
    |  MapPartitionsRDD[92] at values at <console>:29 []
    |  ShuffledRDD[91] at partitionBy at <console>:29 []
    +-(8) MapPartitionsRDD[1] at map at <console>:27 []
       |  ParallelCollectionRDD[0] at parallelize at <console>:28 []

and

(4) MapPartitionsRDD[94] at mapValues at <console>:29 []
 |  ShuffledRDD[91] at partitionBy at <console>:29 []
 +-(8) MapPartitionsRDD[1] at map at <console>:27 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:28 []

(2) CoalescedRDD[115] at coalesce at <console>:31 []
 |  MapPartitionsRDD[94] at mapValues at <console>:29 []
 |  ShuffledRDD[91] at partitionBy at <console>:29 []
 +-(8) MapPartitionsRDD[1] at map at <console>:27 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:28 []

From the comparisions, it can be seen that repartition() seems to create new partions and perform a complete shuffle. coalesce() seems to do things more in-situ and by re-using existsing partitions.

Answer to shuffle-question

Using coalesce() requires atleast one shuffle less as unlike repartition(), a full-shuffle need not be done (because of creating brand-new partions), but can be ‘coalesced’ into existing ones. This may result in partions of uneven sizes, as compared to repartition() (where creating new partions from scratch enables keeping partition sizes more or less equal).