RDW Open Data

In this blog post we’re going to analyse a public dataset from the RDW (Rijksdienst voor het Wegverkeer), the dutch governmental body for managing (public) roads and their traffic. The RDW registers every motorized vehicle allowed to travel public roads with a licence plate, and lots of supplemental data. This dataset can be found on their website, the dataset is 6.4 GB so its quite large.

Load the spark context

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL on SQL data")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames

import spark.implicits._

Open the data file from the RDW

This dataset has lots of columns: 60 in total!

val rdwdata = spark.read.format("csv").option("inferSchema","true").option("dateFormat", "dd/MM/yyyy").option("header", true).load("/data/bigdata/Open_Data_RDW__Gekentekende_voertuigen.csv").cache()
rdwdata.printSchema

root
 |-- Kenteken: string (nullable = true)
 |-- Voertuigsoort: string (nullable = true)
 |-- Merk: string (nullable = true)
 |-- Handelsbenaming: string (nullable = true)
 |-- Vervaldatum APK: string (nullable = true)
 |-- Datum tenaamstelling: string (nullable = true)
 |-- Bruto BPM: integer (nullable = true)
 |-- Inrichting: string (nullable = true)
 |-- Aantal zitplaatsen: integer (nullable = true)
 |-- Eerste kleur: string (nullable = true)
 |-- Tweede kleur: string (nullable = true)
 |-- Aantal cilinders: integer (nullable = true)
 |-- Cilinderinhoud: integer (nullable = true)
 |-- Massa ledig voertuig: integer (nullable = true)
 |-- Toegestane maximum massa voertuig: integer (nullable = true)
 |-- Massa rijklaar: integer (nullable = true)
 |-- Maximum massa trekken ongeremd: integer (nullable = true)
 |-- Maximum trekken massa geremd: integer (nullable = true)
 |-- Retrofit roetfilter: string (nullable = true)
 |-- Zuinigheidslabel: string (nullable = true)
 |-- Datum eerste toelating: string (nullable = true)
 |-- Datum eerste afgifte Nederland: string (nullable = true)
 |-- Wacht op keuren: string (nullable = true)
 |-- Catalogusprijs: integer (nullable = true)
 |-- WAM verzekerd: string (nullable = true)
 |-- Maximale constructiesnelheid (brom/snorfiets): integer (nullable = true)
 |-- Laadvermogen: integer (nullable = true)
 |-- Oplegger geremd: integer (nullable = true)
 |-- Aanhangwagen autonoom geremd: integer (nullable = true)
 |-- Aanhangwagen middenas geremd: integer (nullable = true)
 |-- Vermogen (brom/snorfiets): double (nullable = true)
 |-- Aantal staanplaatsen: integer (nullable = true)
 |-- Aantal deuren: integer (nullable = true)
 |-- Aantal wielen: integer (nullable = true)
 |-- Afstand hart koppeling tot achterzijde voertuig: integer (nullable = true)
 |-- Afstand voorzijde voertuig tot hart koppeling: integer (nullable = true)
 |-- Afwijkende maximum snelheid: integer (nullable = true)
 |-- Lengte: integer (nullable = true)
 |-- Breedte: integer (nullable = true)
 |-- Europese voertuigcategorie: string (nullable = true)
 |-- Europese voertuigcategorie toevoeging: string (nullable = true)
 |-- Europese uitvoeringcategorie toevoeging: string (nullable = true)
 |-- Plaats chassisnummer: string (nullable = true)
 |-- Technische max. massa voertuig: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Type gasinstallatie: string (nullable = true)
 |-- Typegoedkeuringsnummer: string (nullable = true)
 |-- Variant: string (nullable = true)
 |-- Uitvoering: string (nullable = true)
 |-- Volgnummer wijziging EU typegoedkeuring: integer (nullable = true)
 |-- Vermogen massarijklaar: double (nullable = true)
 |-- Wielbasis: integer (nullable = true)
 |-- Export indicator: string (nullable = true)
 |-- Openstaande terugroepactie indicator: string (nullable = true)
 |-- Vervaldatum tachograaf: string (nullable = true)
 |-- API Gekentekende_voertuigen_assen: string (nullable = true)
 |-- API Gekentekende_voertuigen_brandstof: string (nullable = true)
 |-- API Gekentekende_voertuigen_carrosserie: string (nullable = true)
 |-- API Gekentekende_voertuigen_carrosserie_specifiek: string (nullable = true)
 |-- API Gekentekende_voertuigen_voertuigklasse: string (nullable = true)

Let’s strip the data to the columns we’re going to use

val vehDF = rdwdata.select("Kenteken", "Merk", "Voertuigsoort", "Datum tenaamstelling")

Summarize the data, we see that there are some NULL variables in there

vehDF.describe().show()

+-------+--------+------------+-------------+--------------------+
|summary|Kenteken|        Merk|Voertuigsoort|Datum tenaamstelling|
+-------+--------+------------+-------------+--------------------+
|  count|13681070|    13681067|     13681070|            13681064|
|   mean|    null|        null|         null|                null|
| stddev|    null|        null|         null|                null|
|    min|  0001ES|3DOG CAMPING| Aanhangwagen|          01/01/1954|
|    max|  ZZ9999|  wiltrailer| Personenauto|          31/12/2016|
+-------+--------+------------+-------------+--------------------+

Remove null rows, and summarize again: we see that we have removed 9 entries

val vehFilteredDF = vehDF.filter($"Kenteken".isNotNull).filter($"Merk".isNotNull).filter($"Voertuigsoort".isNotNull).filter($"Datum tenaamstelling".isNotNull)

val vehF2DF = vehFilteredDF.withColumn("Datum", vehFilteredDF("Datum tenaamstelling")).drop("Datum tenaamstelling")

vehF2DF.describe().show()

vehF2DF.printSchema()

+-------+--------+------------+-------------+----------+
|summary|Kenteken|        Merk|Voertuigsoort|     Datum|
+-------+--------+------------+-------------+----------+
|  count|13681061|    13681061|     13681061|  13681061|
|   mean|    null|        null|         null|      null|
| stddev|    null|        null|         null|      null|
|    min|  0001ES|3DOG CAMPING| Aanhangwagen|01/01/1954|
|    max|  ZZ9999|  wiltrailer| Personenauto|31/12/2016|
+-------+--------+------------+-------------+----------+

root
 |-- Kenteken: string (nullable = true)
 |-- Merk: string (nullable = true)
 |-- Voertuigsoort: string (nullable = true)
 |-- Datum: string (nullable = true)

Create a custom type to convert our SQL rows into

case class Auto(Kenteken: String, Merk: String, Voertuigsoort: String, Datum: String)
val autoDS = vehF2DF.as[Auto]

We are only interested in the year component of the date

def dateToYear (a : Auto) : Auto = {

  val year = a.Datum.split("/")(2)

  return Auto(a.Kenteken, a.Merk, a.Voertuigsoort, year)

}
val autoDSS = autoDS.map(dateToYear).cache()

Create a temp view of our data, this enables us to run SQL queries on this dataset

autoDSS.createOrReplaceTempView("kentekens")

Now we can finally do some real analysis on the data!

Let’s count the licence plates grouped by year

spark.sql("SELECT Datum, Count(Datum) as DatumCount FROM kentekens GROUP BY Datum ORDER BY DatumCount DESC LIMIT 10").collect()

Datum group

spark.sql("SELECT Merk, Count(Merk) as MerkCount FROM kentekens GROUP BY Merk ORDER BY MerkCount DESC LIMIT 10").collect()

Merk group

How many Volkswagens are driving on our roads grouped by year?

spark.sql("SELECT Datum, Count(Datum) AS DatumCount FROM kentekens WHERE Merk = 'VOLKSWAGEN' GROUP BY Datum ORDER BY DatumCount DESC LIMIT 10").collect()

Volkswagens

Which types of vehicles are there on our roads?

spark.sql("SELECT Voertuigsoort, Count(Voertuigsoort) as vehcount FROM kentekens GROUP BY Voertuigsoort ORDER BY vehcount DESC LIMIT 10").collect()

Voertuigsoort

The most uncommon brands & vehicle types

spark.sql("SELECT Merk, Count(Merk) as MerkCount FROM kentekens GROUP BY Merk ORDER BY MerkCount ASC LIMIT 50").collect()

Uncommon

Some rare brands (occurring between 30 and 100 times)

autoDSS.select("Merk").groupBy("Merk").count().filter($"count" >= 30 && $"count" < 100)

Rare