bigdata-blog-2021-LahaLuhem created by GitHub Classroom
This project is maintained by rubigdata
package org.rubigdata
// Class definitions we need in the remainder:
import org.apache.hadoop.io.NullWritable
import de.l3s.concatgz.io.warc.{WarcGzInputFormat,WarcWritable}
import de.l3s.concatgz.data.WarcRecord
import collection.JavaConverters._
import org.jsoup.Jsoup
import org.jsoup.nodes.{Document,Element}
import scala.util.matching.Regex
import java.text.NumberFormat
object RUBigDataApp {
def main(args: Array[String]) {
// Overrule default settings in order to use WARC
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val sparkConf = new SparkConf()
.setAppName("RUBigDataApp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[WarcRecord]))
// .set("spark.dynamicAllocation.enabled", "true")
implicit val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
//Initialize and load WARC file
//Use s"hdfs:///cc-index-subset"
val warcfile_name = //"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-00613.warc.gz"
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-000**.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-001**.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0021*.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0022*.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0023*.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0024*.warc.gz,"+
"hdfs:///single-warc-segment/CC-MAIN-20210410105831-20210410135831-0025*.warc.gz"
val warcs = sc.newAPIHadoopFile(
warcfile_name,
classOf[WarcGzInputFormat], // InputFormat
classOf[NullWritable], // Key
classOf[WarcWritable] // Value
)
// Preliminary filtering out
val filt_warcs = warcs.
map{ wr => wr._2}.
filter{ _.isValid() }.
map( _.getRecord() ).
filter( _ != null ).
filter( _.getHeader.getHeaderValue("WARC-Type") == "response").
filter(
wr => {
val body: String = wr.getHttpHeaders.get("Content-Type");
body match {
case null => false
case _ => body.startsWith("text/html")
}
}
)
// Use JSoup to work with wr.getRecord.getHttpStringBody()...
val analyze_warcs = filt_warcs.
filter( _.getHttpStringBody != null ).
map( wr =>
try {
val http_body = Jsoup.parse(wr.getHttpStringBody).body()
val body_text = http_body.text()
if (http_body != null && body_text != null) {
body_text
}
else {
""
}
} catch {
case e: java.lang.NullPointerException => ""
case _: Throwable => ""
}
).
filter( _ != "" ).
map( _.replaceAll("<([^>]*)>", "") ). // remove full html-tags
map( _.replaceAll("[^\\x00-\\x7F]", "") ). // remove non-ASCII
map( _.replaceAll("[^a-zA-Z\\d:'\\s\\.]", "") ). // remove non-essential chars
map( _.replaceAll("(\\s|\\.|'|:)(?=\\1)", "") ) // remove repetitive non-alphanumeric
.cache()
// Initializing the dictionary
val freq_dict = init_dictionary()
// Actual analysis
for ( (gameID, (regex, freq)) <- freq_dict) {
freq_dict(gameID) = (
null,
analyze_warcs.
flatMap( regex findAllMatchIn (_) ).
count().
toInt
)
}
prettyPrint_winners ( freq_dict.map{ case (k, v) => (k, v._2)} )
spark.stop
}
/**
* Encapuslation for initializing the hard-coded datastructure(s)
*/
def init_dictionary() : collection.mutable.Map[String, (Regex, Int)] = {
return collection.mutable.Map(
"cs:go" -> ("(?i)cs:go|Counter Strike|csgo|c\\.s\\.g\\.o".r, 0),
"halo" -> ("(?i)\\sHalo".r, 0),
"doom" -> ("(?i)\\sDoom".r, 0),
"mario" -> ("(?i)\\sMario".r, 0),
"pop" -> ("(?i)Prince of Persia|P\\.o\\.P".r, 0),
"mortal_kombat" -> ("(?i)Mortal Kombat|m\\.k".r, 0),
"smash" -> ("(?i)Super Smash Bros|Super Smash Bros\\.|SSB".r, 0),
"minecraft" -> ("(?i)Minecraft".r, 0),
"dayz" -> ("(?i)\\sDayZ".r, 0),
"fortnite" -> ("(?i)Fortnite".r, 0),
"pubg" -> ("(?i)\\sPUBG|PlayerUnknown's Battlegrounds|PlayerUnknowns Battlegrounds|p\\.u\\.b\\.g".r, 0),
"w.r.o.e.f" -> ("(?i)What Remains of Edith Finch|w\\.r\\.o\\.e\\.f|wroef".r, 0),
"gone_home" -> ("(?i)Gone Home".r, 0),
"s_and_w" -> ("(?i)Spider and Web".r, 0),
"zork" -> ("(?i)\\sZork".r, 0),
"b.o.t.d" -> ("(?i)Book of the Dead|B\\.o\\.t\\.D".r, 0),
"mirrors_edge" -> ("(?i)Mirror's Edge|Mirrors Edge".r, 0),
"lol" -> ("(?i)League of Legends|L\\.o\\.L".r, 0),
"runescape" -> ("(?i)Runescape".r, 0),
"m&b" -> ("(?i)Mount and Blade|Mount & Blade|m&b|m & b".r, 0),
"gta" -> ("(?i)GTA|Grand Theft Auto|G\\.T\\.A".r, 0),
"wasteland" -> ("(?i)Wasteland".r, 0),
"star_ren" -> ("(?i)Star Renegades".r, 0),
"final_fant" -> ("(?i)Final Fantasy|ff|f\\.f".r, 0),
"king_hearts" -> ("(?i)Kingdom Hearts".r, 0),
"sims" -> ("(?i)Sims".r, 0),
"animal_cross" -> ("(?i)Animal Crossing".r, 0),
"flight_sim" -> ("(?i)Microsoft Flight Simulator|MSFS|FSX|m\\.s\\.f\\.s".r, 0),
"trackmania" -> ("(?i)Trackmania".r, 0),
"farm_sim" -> ("(?i)Farming Simulator".r, 0),
"rim_world" -> ("(?i)RimWorld".r, 0),
"a.o.e" -> ("(?i)Age of Empires|AOE|a\\.o\\.e".r, 0),
"star_craft" -> ("(?i)StarCraft".r, 0),
"civ" -> ("(?i)Civilization|Civ|C(I+)V".r, 0),
"total_war" -> ("(?i)Total War".r, 0),
"xcom" -> ("(?i)XCom".r, 0),
"dota" -> ("(?i)DotA|Defense of the Ancients|D\\.o\\.t\\.A".r, 0),
"h.o.t.s" -> ("(?i)Heroes of the Storm".r, 0),
"forza" -> ("(?i)Forza".r, 0),
"nfs" -> ("(?i)NFS|Need for Speed|n\\.f\\.s".r, 0),
"pro_skate" -> ("(?i)Tony Hawk Pro Skater|THPS|t\\.h\\.p\\.s".r, 0),
"fifa" -> ("(?i)FIFA \\d+|F\\.I\\.F\\.A d+".r, 0),
"ea_s_ufc" -> ("(?i)EAS UFC|EASports UFC".r, 0),
"fight_night" -> ("(?i)Fight Night".r, 0)
)
}
/**
* Calculates the winning game from each of the pre-defined genre, and prints it out to stdout
*/
def prettyPrint_winners (dict: collection.mutable.Map[String, Int]) : Unit = {
var action_winner, advent_winner, rpg_winner, simu_winner, strat_winner, sport_winner = ("", 0)
for ( (gameID, count) <- dict ) {
if ( List("cs:go", "halo", "doom", "mario", "mortal_kombat", "smash", "minecraft", "dayz", "fortnite", "pubg").contains(gameID) && count > action_winner._2 ) action_winner = (gameID, count)
else if ( List("w.r.o.e.f", "gone_home", "s_and_w", "zork", "b.o.t.d", "mirrors_edge").contains(gameID) && count > advent_winner._2 ) advent_winner = (gameID, count)
else if ( List("lol", "runescape", "m&b", "gta", "wasteland", "star_ren", "final_fant", "king_hearts").contains(gameID) && count > rpg_winner._2 ) rpg_winner = (gameID, count)
else if ( List("sims", "animal_cross", "flight_sim", "trackmania", "farm_sim", "rim_world").contains(gameID) && count > simu_winner._2 ) simu_winner = (gameID, count)
else if ( List("a.o.e", "starcraft", "civ", "total_war", "xcom", "dota", "h.o.t.s").contains(gameID) && count > strat_winner._2 ) strat_winner = (gameID, count)
else if ( List("forza", "nfs", "pro_skate", "fifa", "ea_s_ufc", "fight_night").contains(gameID) && count > sport_winner._2 ) sport_winner = (gameID, count)
}
val nf = NumberFormat.getInstance()
println("Action winner: " + action_winner._1 + "(" + nf.format(action_winner._2) + " references found)")
println("Adventure winner: " + advent_winner._1 + "(" + nf.format(advent_winner._2) + " references found)")
println("RPG winner: " + rpg_winner._1 + "(" + nf.format(rpg_winner._2) + " references found)")
println("Simulation winner: " + simu_winner._1 + "(" + nf.format(simu_winner._2) + " references found)")
println("Strategy winner: " + strat_winner._1 + "(" + nf.format(strat_winner._2) + " references found)")
println("Sports winner: " + sport_winner._1 + "(" + nf.format(sport_winner._2) + " references found)")
}
}