open-data-2021-LahaLuhem

open-data-2021-LahaLuhem created by GitHub Classroom

Part A: Cleaning the Data

Obtaining the data and other prelimiaries

The data was downloaded using the following shell script. The ([ ! -f ... ]) block was used to guard with a test on file existence, in order to downloading files multiple times, if already downloaded.

mkdir -p /opt/hadoop/share/data
cd /opt/hadoop/share/data
pwd
echo BAG data...
[ ! -f BAG_ADRES.csv ] \
  && wget --quiet https://rubigdata.github.io/course/data/BAG_ADRES.csv \
  && echo Downloaded BAG data || echo BAG data already exists
echo Artworks data...
[ ! -f kunstopstraat-kunstwerk.csv ] \
  && wget --quiet -O kunstopstraat-kunstwerk.csv https://www.nijmegen.nl/kos/opendata/ \
  && echo Downloaded artworks data || echo Artworks file already exists

Next, Spark’s SQL types were imported for use later on

import org.apache.spark.sql.types._

Finally, the data was loaded using the following import for CSV files

val bagdata = spark.read.format("csv").option("header", true).load("file:///opt/hadoop/share/data/BAG_ADRES.csv").cache()

and was put asked to be held in RDD cache.


Data: first impressions

For a more generic impression of the data, running

bagdata.printSchema
bagdata.show(2)

would give us the summary of the variables in it, along with their inferred types as seen. Additionally only the top 2 rows can be ‘peaked’ at.
Summary obtained

show(), sample() and describe() can be used to gain more specifc insights of the data. Running:

bagdata.describe().show()

Gives us a profiled report of the values present in each of the variables (headers) of the data.
Data-profiled


Cleaning out the NULLs

[df].na.drop can be used to drop records containg ‘nulls’ in columns, by a criterion. Opting for a ruthless approach using

println(bagdata.count())
val null_cleared = bagdata.na.drop("any")
println(null_cleared.count())

the number of records dropped to 650 from the original 96,867 (only 650 records have no null values). So this in not a good idea.
From the Data-profiled above, it can be seen that ‘HUISNUMMERTOEVOEGING’ and ‘HUISLETTER’ have the least count, meaning most null/missing values. They are not a very important variable anyways in my opinion, as the house number is enough.

The following variables were added to the filtering incrementally (if not too many records were lost):
final_list using

val null_cleared_data = bagdata.na.drop( Seq("ACTCODE", "ADRES_ID", ..., "WOONPLAATS") )

The final number of records were still at a viable 95912, as luckily the null-values occured at the same record (row) for every new column-based stripping.


Schema: using types

The case class

Using the more structured Dataset API a case class is defined for storing some relevant variables.

case class Addr(street:String, quarter:String, x:Float, y:Float)

and then parsed the data into it by

val addrDF = null_cleared_data.select($"STRAAT" as "street",
                              $"X_COORD".cast(FloatType) as "x",
                              $"Y_COORD".cast(FloatType) as "y",
                              $"WIJK_OMS" as "quarter").as[Addr].cache()
addrDF.show(2)

Format conversion

The all the parsing fails due to number format exception (using 0,5 instead of 0.5), causing the null-values to appear, as seen in the result. Hence, there is a need for a custom converter. This was made in the form of

import java.text.NumberFormat
import java.util.Locale
val nf = NumberFormat.getInstance(Locale.forLanguageTag("nl")); // Handle floats written as 0,05 instead of 0.05

def convToFloat(s: String): Option[Float] = {
  try {
    Some(nf.parse(s).floatValue)
  } catch {
    case e: Exception => None
  }
}

which when applied to
Seq( "0.045", "0,054", "abc", "45.789,34" ).map( x => convToFloat(x).getOrElse(null)) would give
List(45.0, 0.054, null, 45789.34)
This is then registered as a user-defined function so that it can be used in the Spark SQL framework flawlessly.

The null-check was done by using:

printf(
  "%d points at origin, %d null values",
  addrDF.filter("x = 0 or y = 0").count,
  addrDF.filter('x.isNull or 'y.isNull).count
)

that gave out 0 points at origin, 0 null values. So we’re good to go.
Addionally, ``describe was used to check again (using the profiling), which was a green-light. Report. The previous work was performed on a Dataset, not a Dataframe.


DataFrame Operators

How to find out the top 10 largest neighbourhoods in Nijmegen?

val qc_1 = addrDF.groupBy("quarter").count.cache()

val qc_1_top = qc_1.orderBy(col("count").desc).limit(10)

qc_1_top.show()

groupBy("quarter") can make a group of records of each value of the attribute ‘quarter’. And then the ordering is put in a decending order. The results are here.


Planning

explain() can be used to get the different kinds of plans, in the order of which they were carried out, inclusive of optimising. Runnin the following generated the dump of the plans for both, the qc_1 and the qc_1_top dataframe actions, which can be found at qc1 Plans and qc1_10 Plans respectively.

println("Group by and count:")
println("===================")
qc_1.explain(true)

println("Order descending limit 10:")
println("==========================")
qc_1_top.explain(true)

Some operations (amongst other things) such as the filtering nulls etc. can be directly seen as a part of them.
Logical plan seems to contain very high-level instructions, which are then optimized and translated to the physical plan. It is the steps in the physical plan that are actually carried out, such as the ‘Read from CSV’ part at the root of the logical plan.


Using SQL inside of Spark

An SQL implementaion can be used inside of Spark, which can be mixed and matched with operators from Dataframe API or even down to the level of direct RDD manipulation itself. SQL seems to be a more convenient way forward when it comes to handling more and more complex queries. Using the previous ‘Top 10 populous quarters’, the same result can be achieved by using SQL as follows:

addrDF.createOrReplaceTempView("addresses")

val qc_2_top = spark.sql("SELECT quarter, count(quarter) AS qc FROM addresses   GROUP BY quarter   ORDER BY qc   DESC LIMIT 10")

qc_2_top.explain(true)

It’s query plan, though, happens to look much simpler. top10SQL Plan


Fun with artworks (SQL version)

Now, the focus is shifted to the data about artworks. In the following, we load the data and make some preliminary selections.

val arts = spark.read
    .format("csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("file:///opt/hadoop/share/data/kunstopstraat-kunstwerk.csv").cache()
    
// Select all the public art created before the year 2000
val artworks = arts.select("naam","locatie","latitude","longitude","bouwjaar","url").where("bouwjaar <= 1999")

// Since the floats in 'latitude' and 'longitude' columns are not regonized as floats, they are converted
val artsxy = artworks
  .withColumn("latitude", artworks.col("latitude").cast("float"))
  .withColumn("longitude", artworks.col("longitude").cast("float"))
  
// Checking validity
artworks.sample(true,0.1).show()
artsxy.describe().show()

The results of running the validity statments (in the end) can be found here. Seems that now we’re good to go.


More fun with SQL

  1. Records where latidue is missing, or the longitude is missing, but not both (XOR)
    spark.sql("select * from arts where (latitude is not null and longitude is null) or (latitude is null and longitude is not null)").show(15)
    

    A lot of records with this one weirdly.

  2. Records that are made in the future!!
    spark.sql("select * from arts where bouwjaar > 2021")
    .show()
    

    Proof of time travel

  3. Oldest date of painting made
    spark.sql("select MIN(bouwjaar) from artsxy")
         .show()
    

    Which turned out to be 12?!


Use later

In order to use the Objects that were made in the future, without having to redo all the work, they can be stored in the filesystem directly by:

spark.sql("select * from artsxy where (latitude is not null and longitude is not null) and bouwjaar < 9999")
     .write.parquet("file:///opt/hadoop/share/data/kos.parquet")
addrDF.write.parquet("file:///opt/hadoop/share/data/addr.parquet")





Part B