bigdata-blog-2021-LahaLuhem

bigdata-blog-2021-LahaLuhem created by GitHub Classroom

This project is maintained by rubigdata

package org.rubigdata

// Class definitions we need in the remainder:
import org.apache.hadoop.io.NullWritable
import de.l3s.concatgz.io.warc.{WarcGzInputFormat,WarcWritable}
import de.l3s.concatgz.data.WarcRecord

import collection.JavaConverters._
import org.jsoup.Jsoup
import org.jsoup.nodes.{Document,Element}
import scala.util.matching.Regex

import java.text.NumberFormat



object RUBigDataApp {
  def main(args: Array[String]) {
    // Overrule default settings in order to use WARC
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    val sparkConf = new SparkConf()
                      .setAppName("RUBigDataApp")
                      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                      .registerKryoClasses(Array(classOf[WarcRecord]))
    //                      .set("spark.dynamicAllocation.enabled", "true")
    implicit val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._



    //Initialize and load WARC file
    //Use s"hdfs:///cc-index-subset"
    val warcfile_name = //"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-00613.warc.gz"

                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-000**.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-001**.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0021*.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0022*.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0023*.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0024*.warc.gz,"+
                        "hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0025*.warc.gz"


    val warcs = sc.newAPIHadoopFile(
                  warcfile_name,
                  classOf[WarcGzInputFormat],             // InputFormat
                  classOf[NullWritable],                  // Key
                  classOf[WarcWritable]                   // Value
                )



   // Preliminary filtering out
   val filt_warcs = warcs.
                     map{ wr => wr._2}.
                     filter{ _.isValid() }.
                     map( _.getRecord() ).
                     filter( _ != null ).
                     filter( _.getHeader.getHeaderValue("WARC-Type") == "response").
                     filter(
                       wr => {
                           val body: String = wr.getHttpHeaders.get("Content-Type");
                           body match {
                             case null => false
                             case   _  => body.startsWith("text/html")
                           }
                       }
                     )




    // Use JSoup to work with wr.getRecord.getHttpStringBody()...
    val analyze_warcs = filt_warcs.
                                   filter( _.getHttpStringBody != null ).
                                   map( wr =>
                                     try {
                                        val http_body = Jsoup.parse(wr.getHttpStringBody).body()
                                        val body_text = http_body.text()
                                        if (http_body != null && body_text != null) {
                                            body_text
                                        }
                                        else {
                                            ""
                                        }
                                     } catch {
                                        case e: java.lang.NullPointerException => ""
                                        case _: Throwable => ""
                                     }
                                   ).
                                   filter( _ != "" ).

                                   map( _.replaceAll("<([^>]*)>", "") ).                // remove full html-tags
                                   map( _.replaceAll("[^\\x00-\\x7F]", "") ).           // remove non-ASCII
                                   map( _.replaceAll("[^a-zA-Z\\d:'\\s\\.]", "") ).     // remove non-essential chars
                                   map( _.replaceAll("(\\s|\\.|'|:)(?=\\1)", "") )      // remove repetitive non-alphanumeric

                                   .cache()



    // Initializing the dictionary
    val freq_dict = init_dictionary()


    // Actual analysis
    for ( (gameID, (regex, freq)) <- freq_dict) {
        freq_dict(gameID) = (
                            null,
                            analyze_warcs.
                                        flatMap( regex findAllMatchIn (_) ).
                                        count().
                                        toInt
                            )
    }

    prettyPrint_winners ( freq_dict.map{ case (k, v) => (k, v._2)} )





    spark.stop
  }

  
  
  /**
  * Encapuslation for initializing the hard-coded datastructure(s)
  */
  def init_dictionary() : collection.mutable.Map[String, (Regex, Int)] = {
    return collection.mutable.Map(
                                        "cs:go" -> ("(?i)cs:go|Counter Strike|csgo|c\\.s\\.g\\.o".r, 0),
                                        "halo" -> ("(?i)\\sHalo".r, 0),
                                        "doom" -> ("(?i)\\sDoom".r, 0),
                                        "mario" -> ("(?i)\\sMario".r, 0),
                                        "pop" -> ("(?i)Prince of Persia|P\\.o\\.P".r, 0),
                                        "mortal_kombat" -> ("(?i)Mortal Kombat|m\\.k".r, 0),
                                        "smash" -> ("(?i)Super Smash Bros|Super Smash Bros\\.|SSB".r, 0),
                                        "minecraft" -> ("(?i)Minecraft".r, 0),
                                        "dayz" -> ("(?i)\\sDayZ".r, 0),
                                        "fortnite" -> ("(?i)Fortnite".r, 0),
                                        "pubg" -> ("(?i)\\sPUBG|PlayerUnknown's Battlegrounds|PlayerUnknowns Battlegrounds|p\\.u\\.b\\.g".r, 0),

                                        "w.r.o.e.f" -> ("(?i)What Remains of Edith Finch|w\\.r\\.o\\.e\\.f|wroef".r, 0),
                                        "gone_home" -> ("(?i)Gone Home".r, 0),
                                        "s_and_w" -> ("(?i)Spider and Web".r, 0),
                                        "zork" -> ("(?i)\\sZork".r, 0),
                                        "b.o.t.d" -> ("(?i)Book of the Dead|B\\.o\\.t\\.D".r, 0),
                                        "mirrors_edge" -> ("(?i)Mirror's Edge|Mirrors Edge".r, 0),

                                        "lol" -> ("(?i)League of Legends|L\\.o\\.L".r, 0),
                                        "runescape" -> ("(?i)Runescape".r, 0),
                                        "m&b" -> ("(?i)Mount and Blade|Mount & Blade|m&b|m & b".r, 0),
                                        "gta" -> ("(?i)GTA|Grand Theft Auto|G\\.T\\.A".r, 0),
                                        "wasteland" -> ("(?i)Wasteland".r, 0),
                                        "star_ren" -> ("(?i)Star Renegades".r, 0),
                                        "final_fant" -> ("(?i)Final Fantasy|ff|f\\.f".r, 0),
                                        "king_hearts" -> ("(?i)Kingdom Hearts".r, 0),

                                        "sims" -> ("(?i)Sims".r, 0),
                                        "animal_cross" -> ("(?i)Animal Crossing".r, 0),
                                        "flight_sim" -> ("(?i)Microsoft Flight Simulator|MSFS|FSX|m\\.s\\.f\\.s".r, 0),
                                        "trackmania" -> ("(?i)Trackmania".r, 0),
                                        "farm_sim" -> ("(?i)Farming Simulator".r, 0),
                                        "rim_world" -> ("(?i)RimWorld".r, 0),

                                        "a.o.e" -> ("(?i)Age of Empires|AOE|a\\.o\\.e".r, 0),
                                        "star_craft" -> ("(?i)StarCraft".r, 0),
                                        "civ" -> ("(?i)Civilization|Civ|C(I+)V".r, 0),
                                        "total_war" -> ("(?i)Total War".r, 0),
                                        "xcom" -> ("(?i)XCom".r, 0),
                                        "dota" -> ("(?i)DotA|Defense of the Ancients|D\\.o\\.t\\.A".r, 0),
                                        "h.o.t.s" -> ("(?i)Heroes of the Storm".r, 0),

                                        "forza" -> ("(?i)Forza".r, 0),
                                        "nfs" -> ("(?i)NFS|Need for Speed|n\\.f\\.s".r, 0),
                                        "pro_skate" -> ("(?i)Tony Hawk Pro Skater|THPS|t\\.h\\.p\\.s".r, 0),
                                        "fifa" -> ("(?i)FIFA \\d+|F\\.I\\.F\\.A d+".r, 0),
                                        "ea_s_ufc" -> ("(?i)EAS UFC|EASports UFC".r, 0),
                                        "fight_night" -> ("(?i)Fight Night".r, 0)
                                  )
  }


  /**
  * Calculates the winning game from each of the pre-defined genre, and prints it out to stdout
  */
  def prettyPrint_winners (dict: collection.mutable.Map[String, Int]) : Unit = {
    var action_winner, advent_winner, rpg_winner, simu_winner, strat_winner, sport_winner = ("", 0)

    for ( (gameID, count) <- dict ) {
        if ( List("cs:go", "halo", "doom", "mario", "mortal_kombat", "smash", "minecraft", "dayz", "fortnite", "pubg").contains(gameID) && count > action_winner._2 ) action_winner = (gameID, count)
        else if ( List("w.r.o.e.f", "gone_home", "s_and_w", "zork", "b.o.t.d", "mirrors_edge").contains(gameID) && count > advent_winner._2 ) advent_winner = (gameID, count)
        else if ( List("lol", "runescape", "m&b", "gta", "wasteland", "star_ren", "final_fant", "king_hearts").contains(gameID) && count > rpg_winner._2 ) rpg_winner = (gameID, count)
        else if ( List("sims", "animal_cross", "flight_sim", "trackmania", "farm_sim", "rim_world").contains(gameID) && count > simu_winner._2 ) simu_winner = (gameID, count)
        else if ( List("a.o.e", "starcraft", "civ", "total_war", "xcom", "dota", "h.o.t.s").contains(gameID) && count > strat_winner._2 ) strat_winner = (gameID, count)
        else if ( List("forza", "nfs", "pro_skate", "fifa", "ea_s_ufc", "fight_night").contains(gameID) && count > sport_winner._2 ) sport_winner = (gameID, count)
    }


    val nf = NumberFormat.getInstance()

    println("Action winner: " + action_winner._1 + "(" + nf.format(action_winner._2) + " references found)")
    println("Adventure winner: " + advent_winner._1 + "(" + nf.format(advent_winner._2) + " references found)")
    println("RPG winner: " + rpg_winner._1 + "(" + nf.format(rpg_winner._2) + " references found)")
    println("Simulation winner: " + simu_winner._1 + "(" + nf.format(simu_winner._2) + " references found)")
    println("Strategy winner: " + strat_winner._1 + "(" + nf.format(strat_winner._2) + " references found)")
    println("Sports winner: " + sport_winner._1 + "(" + nf.format(sport_winner._2) + " references found)")
  }
}