You can use the sample code on this page in self-contained Spark Apps, following the instructions in part II.

The CC-INDEX files contain the following metadata for every crawl:

   root                                                                            
    |-- url_surtkey: string (nullable = true)
    |-- url: string (nullable = true)
    |-- url_host_name: string (nullable = true)
    |-- url_host_tld: string (nullable = true)
    |-- url_host_2nd_last_part: string (nullable = true)
    |-- url_host_3rd_last_part: string (nullable = true)
    |-- url_host_4th_last_part: string (nullable = true)
    |-- url_host_5th_last_part: string (nullable = true)
    |-- url_host_registry_suffix: string (nullable = true)
    |-- url_host_registered_domain: string (nullable = true)
    |-- url_host_private_suffix: string (nullable = true)
    |-- url_host_private_domain: string (nullable = true)
    |-- url_protocol: string (nullable = true)
    |-- url_port: integer (nullable = true)
    |-- url_path: string (nullable = true)
    |-- url_query: string (nullable = true)
    |-- fetch_time: timestamp (nullable = true)
    |-- fetch_status: short (nullable = true)
    |-- fetch_redirect: string (nullable = true)
    |-- content_digest: string (nullable = true)
    |-- content_mime_type: string (nullable = true)
    |-- content_mime_detected: string (nullable = true)
    |-- content_charset: string (nullable = true)
    |-- content_languages: string (nullable = true)
    |-- content_truncated: string (nullable = true)
    |-- warc_filename: string (nullable = true)
    |-- warc_record_offset: integer (nullable = true)
    |-- warc_record_length: integer (nullable = true)
    |-- warc_segment: string (nullable = true)

Directory /cc-index-subset/subset=warc/ contains a copy of the CC-INDEX data for CC-MAIN-2021-17, the crawl that we have copied onto the cluster.

We ran the following queries to select useful subsets of WARCs. First, we identified the WARCs that most likely contain Dutch text, and wrote a selection of the metadata of interest to a new dataset on the cluster, available at hdfs:///user/core/cc-index-nl:

import spark.implicits._
val df = spark.read
  .option("mergeSchema", "true")
  .option("enable.summary-metadata", "false")
  .option("enable.dictionary", "true")
  .option("filterPushdown", "true")
  .parquet("hdfs://gelre/cc-index-subset/subset=warc/")

df.createOrReplaceTempView("ccindex")

// Query CC-INDEX to find WARCs with HTML or XHTML data, that contain most likely Dutch text.
val nldf = spark.sql("SELECT url, warc_filename, warc_record_offset, warc_record_length "+
                     "FROM ccindex "+
                     "WHERE content_languages=\"nld\" "+
                     "AND (content_mime_detected=\"text/html\" OR content_mime_detected=\"application/xhtml+xml\")")
val nldfc = nldf.coalesce(100)
nldfc.write.parquet("hdfs:///user/core/cc-index-nl")

The coalesce reduces the original 2K partitions to 100 only, so that future queries on this subset take less time to load. The schema of the resulting dataset:

nldfc.printSchema()
    root
    |-- url: string (nullable = true)
    |-- warc_filename: string (nullable = true)
    |-- warc_record_offset: integer (nullable = true)
    |-- warc_record_length: integer (nullable = true)

The following query reduces our snapshot of the CC-INDEX to those WARC files that are part of the single segment that we first copied onto the cluster:

// Select the CC-INDEX subset corresponding to the single segment that was copied to HDFS already
val sdf = spark.sql("SELECT * FROM ccindex WHERE warc_filename LIKE 'crawl-data/CC-MAIN-2021-17/segments/1618038056869.3%'")
val sdfc = sdf.coalesce(20)
sdfc.write.parquet("hdfs:///user/core/cc-index-single-segment")

Of course, we can combine these queries to identify the (likely) Dutch content in the randomly chosen crawl segment:

import spark.implicits._
val df = spark.read
  .option("mergeSchema", "true")
  .option("enable.summary-metadata", "false")
  .option("enable.dictionary", "true")
  .option("filterPushdown", "true")
  .parquet("hdfs://gelre/user/core/cc-index-single-segment")

df.createOrReplaceTempView("ccindex")

// Query CC-INDEX to find WARCs with HTML or XHTML data, that contain most likely Dutch text.
val nldf = spark.sql("SELECT url, warc_filename, warc_record_offset, warc_record_length "+
                     "FROM ccindex "+
                     "WHERE content_languages=\"nld\" "+
                     "AND (content_mime_detected=\"text/html\" OR content_mime_detected=\"application/xhtml+xml\")")
nldf.count
nldf.show(20,false)

Back to assignments overview / part I / part II / part III