sparkling-streams-2021-LahaLuhem

sparkling-streams-2021-LahaLuhem created by GitHub Classroom

Assignment 5: Sparkling Spark

We will use the abstractions offered by Spark Structured Streaming to analyze the data from an online marketplace from the popular game of Runescape.

Initial set-up

  1. The docker container to be used was being run in the Huygens building, as done before. A recap of the instructions can be found in an older blog-post here.

  2. The hey-spark container needed a bit of modification, which was achieved by:
    sed -i -e 's|hdfs://localhost:9000|file:///tmp|' /opt/hadoop/etc/hadoop/core-site.xml
    
  3. It was known that there may certain external errors that were possible to be run into. Luckily, there were some diagnostic remedies that were made available here.

  4. A python file for running a simulation was needed, that would output Runescape-like output to port 9999. This was done by:
    wget https://raw.githubusercontent.com/rubigdata-dockerhub/hadoop-dockerfile/master/stream.py
    docker cp stream.py hey-spark:/
       
    docker exec hey-spark sh -c "python3 stream.py &"
    

    The first two need to be run only once, and the last one (actual execution), everytime the container is run.

  5. Some more libraries to be imported:
    import spark.implicits._
    import org.apache.spark.sql.streaming.Trigger
    


Initializing variables and peeking output

// Create a dataframe tied to the TCP/IP stream on localhost port 9999 using the readStream operation:
val socketDF = spark.readStream
  .format("socket")
  .option("host", "0.0.0.0")
  .option("port", 9999)
  .load()

// Check whether the object is specifically a 'Streaming' Dataframe. Should return 'true':
socketDF.isStreaming

// Writing a sample of data from the TCP/IP connection into an in-memory dataframe for further analysis:
val streamWriterMem = socketDF
  .writeStream
  .outputMode("append")
  .format("memory")



// Start streaming!
val memoryQuery = streamWriterMem  
  .queryName("memoryDF")
  .start()

// Run for 5 second...
memoryQuery
  .awaitTermination(5000)
  
// ... and stop the query, to avoid filling up memory:
memoryQuery
  .stop()

Now we should have access to a 5 second long window of data (not evalueated yet though due to lazy-evaluation!) into a variable memoryDF.
Using SQL, we can now peek the top 10 rows:

-- Query the top 10 rows
select * from memoryDF LIMIT 10

which shows us this finally after actually executing.


Stream Processing (dynamic)

Since now, we were working on a static 5-second window of the generated data. Now, we extend to actual continuously incoming. Spark’s Dataframe enables us to treate the stream as normal static data, for the most part. We acheive this by using streaming queries as follows:

// Create and start a streaming query on the same TCP/IP input stream
val consoleQuery = socketDF
  .writeStream
  .outputMode("append")
  .format("console")
  .start()


// Running this a couple of times shows the current state of the incoming data
spark.streams.active

// To stop simply use this
consoleQuery.stop()

Application to Runescape data

We define RuneData class to be able to encompass the points of interest more compactly, along with regexes to help parse the incoming data according to the strcture defined in RuneData:

// Class definition
case class RuneData(material: String, type: String, price: Integer)


// Defining the regex for parsing the text (only exact classes (with '()') are usable as variables later on)
val myregex = "\"^([A-Z].+) ([A-Z].+) was sold for (\\\\d+)\""

// Assigning the parsed text appropriately
val q = f"select regexp_extract(value, $myregex%s, 1) as material, regexp_extract(value, $myregex%s, 2) as type, cast(regexp_extract(value, $myregex%s, 3) as Integer) as price from runeUpdatesDF"


// Finally linking them together into a streaming query plan
val runes = spark.sql(q).as[RuneData]

Using a similar procedure for inspecting before, the output can be checked mid-stream. And sure enough, the required format is obtained as can be seen here


Output to disk

Spark Strucuted Streaming makes it very easy to write to a file-system, as a Paraquet file:

mkdir -p /opt/hadoop/share/runedata
Setup another writer to copy the query output to disk, for 3 seconds
val streamWriterDisk = runes
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "file:///tmp/checkpoint")
  .trigger(Trigger.ProcessingTime("3 seconds"))

//Start the query!
val stream2disk = streamWriterDisk
                  .start("file:///opt/hadoop/share/runedata")

Checking the data manually, the paraquet files present without any errors. The option(...) basically commands the writer to create a checkpoint every second, to provide a lot of fault tolerance.


Working with data (finally)

We load the data from the previously made parquet files by:

val runes = spark
  .read
  .parquet("file:///opt/hadoop/share/runedata/part-*")
  .createOrReplaceTempView("runes")

and just to check, we run a simple SQL query on it:

-- Average price per material:
SELECT material, avg(price) FROM runes GROUP BY material

and we get the expected table with average price of each of the materials that appeared in the data. Output not shown here for reasons of brevity.

Answering predefined questions

  1. How many rune items were sold?
    SELECT count(*) FROM runes   WHERE material = "Rune"
    

    Which gave 4993

  2. How many of each item type was sold?
    SELECT type, count(*) FROM runes   GROUP BY type
    

    Result

  3. How much gold was spent buying swords?
    SELECT sum(price) FROM runes   WHERE type = "Sword"
    

    Which gives us 544123620 Gold spent in total.

  4. I was not able to completely figure out how to do the batch updating. But after some time of diving into the Documentation for Streaming Dataframe, I reckoned that it had to do be done by specifying a different mode of updating, along with defining a trigger for such an update (to ‘when new n number of entries come in’).

  5. Additionally for doing any more interseting analysis, I do not know the game of Runescape myself, and so do not have an adequate amount of domain knowledge myself.