open-data-2021-LahaLuhem created by GitHub Classroom
The data was downloaded using the following shell script. The ([ ! -f ... ])
block was used to guard with a test on file existence, in order to downloading files multiple times, if already downloaded.
mkdir -p /opt/hadoop/share/data
cd /opt/hadoop/share/data
pwd
echo BAG data...
[ ! -f BAG_ADRES.csv ] \
&& wget --quiet https://rubigdata.github.io/course/data/BAG_ADRES.csv \
&& echo Downloaded BAG data || echo BAG data already exists
echo Artworks data...
[ ! -f kunstopstraat-kunstwerk.csv ] \
&& wget --quiet -O kunstopstraat-kunstwerk.csv https://www.nijmegen.nl/kos/opendata/ \
&& echo Downloaded artworks data || echo Artworks file already exists
Next, Spark’s SQL types were imported for use later on
import org.apache.spark.sql.types._
Finally, the data was loaded using the following import for CSV files
val bagdata = spark.read.format("csv").option("header", true).load("file:///opt/hadoop/share/data/BAG_ADRES.csv").cache()
and was put asked to be held in RDD cache.
For a more generic impression of the data, running
bagdata.printSchema
bagdata.show(2)
would give us the summary of the variables in it, along with their inferred types as seen. Additionally only the top 2
rows can be ‘peaked’ at.
Summary obtained
show()
, sample()
and describe()
can be used to gain more specifc insights of the data. Running:
bagdata.describe().show()
Gives us a profiled report of the values present in each of the variables (headers) of the data.
Data-profiled
[df].na.drop
can be used to drop records containg ‘nulls’ in columns, by a criterion. Opting for a ruthless approach using
println(bagdata.count())
val null_cleared = bagdata.na.drop("any")
println(null_cleared.count())
the number of records dropped to 650 from the original 96,867 (only 650 records have no null values). So this in not a good idea.
From the Data-profiled above, it can be seen that ‘HUISNUMMERTOEVOEGING’ and ‘HUISLETTER’ have the least count, meaning most null/missing values. They are not a very important variable anyways in my opinion, as the house number is enough.
The following variables were added to the filtering incrementally (if not too many records were lost):
final_list using
val null_cleared_data = bagdata.na.drop( Seq("ACTCODE", "ADRES_ID", ..., "WOONPLAATS") )
The final number of records were still at a viable 95912, as luckily the null-values occured at the same record (row) for every new column-based stripping.
Using the more structured Dataset API a case class
is defined for storing some relevant variables.
case class Addr(street:String, quarter:String, x:Float, y:Float)
and then parsed the data into it by
val addrDF = null_cleared_data.select($"STRAAT" as "street",
$"X_COORD".cast(FloatType) as "x",
$"Y_COORD".cast(FloatType) as "y",
$"WIJK_OMS" as "quarter").as[Addr].cache()
addrDF.show(2)
The all the parsing fails due to number format exception (using 0,5 instead of 0.5), causing the null-values to appear, as seen in the result. Hence, there is a need for a custom converter. This was made in the form of
import java.text.NumberFormat
import java.util.Locale
val nf = NumberFormat.getInstance(Locale.forLanguageTag("nl")); // Handle floats written as 0,05 instead of 0.05
def convToFloat(s: String): Option[Float] = {
try {
Some(nf.parse(s).floatValue)
} catch {
case e: Exception => None
}
}
which when applied to
Seq( "0.045", "0,054", "abc", "45.789,34" ).map( x => convToFloat(x).getOrElse(null))
would give
List(45.0, 0.054, null, 45789.34)
This is then registered as a user-defined function so that it can be used in the Spark SQL framework flawlessly.
The null-check was done by using:
printf(
"%d points at origin, %d null values",
addrDF.filter("x = 0 or y = 0").count,
addrDF.filter('x.isNull or 'y.isNull).count
)
that gave out 0 points at origin, 0 null values
. So we’re good to go.
Addionally, ``describe was used to check again (using the profiling), which was a green-light. Report. The previous work was performed on a Dataset, not a Dataframe.
How to find out the top 10 largest neighbourhoods in Nijmegen?
val qc_1 = addrDF.groupBy("quarter").count.cache()
val qc_1_top = qc_1.orderBy(col("count").desc).limit(10)
qc_1_top.show()
groupBy("quarter")
can make a group of records of each value of the attribute ‘quarter’. And then the ordering is put in a decending order. The results are here.
explain()
can be used to get the different kinds of plans, in the order of which they were carried out, inclusive of optimising. Runnin the following generated the dump of the plans for both, the qc_1
and the qc_1_top
dataframe actions, which can be found at qc1 Plans and qc1_10 Plans respectively.
println("Group by and count:")
println("===================")
qc_1.explain(true)
println("Order descending limit 10:")
println("==========================")
qc_1_top.explain(true)
Some operations (amongst other things) such as the filtering nulls etc. can be directly seen as a part of them.
Logical plan seems to contain very high-level instructions, which are then optimized and translated to the physical plan. It is the steps in the physical plan that are actually carried out, such as the ‘Read from CSV’ part at the root of the logical plan.
An SQL implementaion can be used inside of Spark, which can be mixed and matched with operators from Dataframe API or even down to the level of direct RDD manipulation itself. SQL seems to be a more convenient way forward when it comes to handling more and more complex queries. Using the previous ‘Top 10 populous quarters’, the same result can be achieved by using SQL as follows:
addrDF.createOrReplaceTempView("addresses")
val qc_2_top = spark.sql("SELECT quarter, count(quarter) AS qc FROM addresses GROUP BY quarter ORDER BY qc DESC LIMIT 10")
qc_2_top.explain(true)
It’s query plan, though, happens to look much simpler. top10SQL Plan
Now, the focus is shifted to the data about artworks. In the following, we load the data and make some preliminary selections.
val arts = spark.read
.format("csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("file:///opt/hadoop/share/data/kunstopstraat-kunstwerk.csv").cache()
// Select all the public art created before the year 2000
val artworks = arts.select("naam","locatie","latitude","longitude","bouwjaar","url").where("bouwjaar <= 1999")
// Since the floats in 'latitude' and 'longitude' columns are not regonized as floats, they are converted
val artsxy = artworks
.withColumn("latitude", artworks.col("latitude").cast("float"))
.withColumn("longitude", artworks.col("longitude").cast("float"))
// Checking validity
artworks.sample(true,0.1).show()
artsxy.describe().show()
The results of running the validity statments (in the end) can be found here. Seems that now we’re good to go.
spark.sql("select * from arts where (latitude is not null and longitude is null) or (latitude is null and longitude is not null)").show(15)
A lot of records with this one weirdly.
spark.sql("select * from arts where bouwjaar > 2021")
.show()
spark.sql("select MIN(bouwjaar) from artsxy")
.show()
Which turned out to be 12?!
In order to use the Objects that were made in the future, without having to redo all the work, they can be stored in the filesystem directly by:
spark.sql("select * from artsxy where (latitude is not null and longitude is not null) and bouwjaar < 9999")
.write.parquet("file:///opt/hadoop/share/data/kos.parquet")
addrDF.write.parquet("file:///opt/hadoop/share/data/addr.parquet")