From 2562a5257a4e95d0caf1640295d806d9683e59c8 Mon Sep 17 00:00:00 2001
From: Daoud Clarke <daoud.clarke@gmail.com>
Date: Sun, 5 Dec 2021 22:25:37 +0000
Subject: [PATCH] Extract locally

---
 extract.py         | 138 +--------------------------------------------
 extract_local.py   |  26 +++++++++
 extract_process.py | 137 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 164 insertions(+), 137 deletions(-)
 create mode 100644 extract_local.py
 create mode 100644 extract_process.py

diff --git a/extract.py b/extract.py
index 11a968e..24aff83 100644
--- a/extract.py
+++ b/extract.py
@@ -2,29 +2,15 @@
 Extract content from HTML files and store it as compressed JSON
 """
 
-from io import BytesIO
 from urllib.parse import urlparse
 
-import boto3
-from justext import get_stoplist
-from justext.core import LENGTH_LOW_DEFAULT, LENGTH_HIGH_DEFAULT, STOPWORDS_LOW_DEFAULT, \
-    STOPWORDS_HIGH_DEFAULT, MAX_LINK_DENSITY_DEFAULT, NO_HEADINGS_DEFAULT, \
-    MAX_HEADING_DISTANCE_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, preprocessor, html_to_dom, \
-    ParagraphMaker, classify_paragraphs, revise_paragraph_classification
-from langdetect import detect
-from lxml.etree import ParserError
-from pyspark.sql import SparkSession, SQLContext
+from pyspark.sql import SparkSession
 from pyspark.sql.functions import col
 from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
-from warcio import ArchiveIterator
 
 RECORDS_PATH = 's3://tinysearch/outputs/records'
 OUTPUT_PATH = 's3://tinysearch/outputs/index'
 
-MAX_URI_LENGTH = 150
-NUM_CHARS_TO_ANALYSE = 1000
-NUM_TITLE_CHARS = 65
-NUM_EXTRACT_CHARS = 155
 NUM_PAGES = 1024
 MAX_RESULTS_PER_HASH = 200
 PAGE_SIZE = 4096
@@ -82,132 +68,10 @@ def run():
     # output.write.option('compression', 'gzip').format('json').mode('overwrite').save(OUTPUT_PATH)
 
 
-def fetch_process_warc_records(rows):
-    """Fetch all WARC records defined by filenames and offsets in rows,
-    parse the records and the contained HTML, split the text into words
-    and emit pairs <word, 1>"""
-    s3client = boto3.client('s3')
-    for row in rows:
-        warc_path = row['warc_filename']
-        offset = int(row['warc_record_offset'])
-        length = int(row['warc_record_length'])
-        rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
-        response = s3client.get_object(Bucket='commoncrawl',
-        Key=warc_path,
-        Range=rangereq)
-        record_stream = BytesIO(response["Body"].read())
-        for record in ArchiveIterator(record_stream):
-            for result in process_record(record):
-                yield result
-
-
 def get_domain_rating(url):
     domain = urlparse(url).netloc
     return DOMAINS.get(domain)
 
 
-def is_html(record):
-    """Return true if (detected) MIME type of a record is HTML"""
-    html_types = ['text/html', 'application/xhtml+xml']
-    if (('WARC-Identified-Payload-Type' in record.rec_headers) and
-        (record.rec_headers['WARC-Identified-Payload-Type'] in
-         html_types)):
-        return True
-    content_type = record.http_headers.get_header('content-type', None)
-    if content_type:
-        for html_type in html_types:
-            if html_type in content_type:
-                return True
-    return False
-
-
-def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT,
-            length_high=LENGTH_HIGH_DEFAULT, stopwords_low=STOPWORDS_LOW_DEFAULT,
-            stopwords_high=STOPWORDS_HIGH_DEFAULT, max_link_density=MAX_LINK_DENSITY_DEFAULT,
-            max_heading_distance=MAX_HEADING_DISTANCE_DEFAULT, no_headings=NO_HEADINGS_DEFAULT,
-            encoding=None, default_encoding=DEFAULT_ENCODING,
-            enc_errors=DEFAULT_ENC_ERRORS, preprocessor=preprocessor):
-    """
-    Converts an HTML page into a list of classified paragraphs. Each paragraph
-    is represented as instance of class ˙˙justext.paragraph.Paragraph˙˙.
-    """
-    dom = html_to_dom(html_text, default_encoding, encoding, enc_errors)
-    print("Parsed HTML")
-
-    try:
-        title = dom.find(".//title").text
-    except AttributeError:
-        title = None
-
-    preprocessed_dom = preprocessor(dom)
-
-    paragraphs = ParagraphMaker.make_paragraphs(preprocessed_dom)
-    print("Got paragraphs")
-
-    classify_paragraphs(paragraphs, stoplist, length_low, length_high,
-                        stopwords_low, stopwords_high, max_link_density, no_headings)
-    revise_paragraph_classification(paragraphs, max_heading_distance)
-
-    return paragraphs, title
-
-
-def process_record(record):
-    # print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream,
-    #       record.http_headers, record.content_type, record.length)
-
-    if record.rec_type != 'response':
-        # skip over WARC request or metadata records
-        return
-    if not is_html(record):
-        return
-
-    uri = record.rec_headers.get_header('WARC-Target-URI')
-    if len(uri) > MAX_URI_LENGTH:
-        print("URI too long", len(uri))
-        return
-
-    # rating = get_domain_rating(uri)
-    # print("Rating", rating)
-    # if rating is None:
-    #     return
-
-    content = record.content_stream().read().strip()
-    # print("Content", uri, content[:100])
-
-    if not content:
-        return
-
-    try:
-        all_paragraphs, full_title = justext(content, get_stoplist('English'))
-    except UnicodeDecodeError:
-        print("Unable to decode unicode")
-        return
-    except ParserError:
-        print("Unable to parse")
-        return
-
-    if full_title is None:
-        print("Missing title")
-        return
-
-    title = full_title[:NUM_TITLE_CHARS] + '…' \
-        if len(full_title) > NUM_TITLE_CHARS else full_title
-
-    text = '\n'.join([p.text for p in all_paragraphs
-                      if not p.is_boilerplate])[:NUM_CHARS_TO_ANALYSE]
-    print("Paragraphs", text)
-
-    if len(text) < NUM_EXTRACT_CHARS:
-        return
-
-    language = detect(text)
-    print("Got language", language)
-    if language != 'en':
-        return
-
-    extract = text[:NUM_EXTRACT_CHARS]
-    yield uri, title, extract
-
-
 if __name__ == '__main__':
     run()
diff --git a/extract_local.py b/extract_local.py
new file mode 100644
index 0000000..ea4bc01
--- /dev/null
+++ b/extract_local.py
@@ -0,0 +1,26 @@
+import gzip
+import json
+from glob import glob
+from itertools import islice
+
+from extract_process import fetch_process_warc_records
+
+ARCHIVE_INFO_GLOB = 'outputs/records/*.gz'
+
+
+def get_records():
+    for path in glob(ARCHIVE_INFO_GLOB):
+        with gzip.open(path) as data_file:
+            for line in data_file:
+                yield json.loads(line)
+
+
+def run():
+    records = get_records()
+    processed = fetch_process_warc_records(islice(records, 10))
+    for row in processed:
+        print("Processed", row)
+
+
+if __name__ == '__main__':
+    run()
diff --git a/extract_process.py b/extract_process.py
new file mode 100644
index 0000000..54eb247
--- /dev/null
+++ b/extract_process.py
@@ -0,0 +1,137 @@
+from io import BytesIO
+
+import boto3
+from justext import get_stoplist
+from justext.core import LENGTH_LOW_DEFAULT, LENGTH_HIGH_DEFAULT, STOPWORDS_LOW_DEFAULT, STOPWORDS_HIGH_DEFAULT, \
+    MAX_LINK_DENSITY_DEFAULT, MAX_HEADING_DISTANCE_DEFAULT, NO_HEADINGS_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, \
+    preprocessor, html_to_dom, ParagraphMaker, classify_paragraphs, revise_paragraph_classification
+from langdetect import detect
+from lxml.etree import ParserError
+from warcio import ArchiveIterator
+
+MAX_URI_LENGTH = 150
+NUM_CHARS_TO_ANALYSE = 1000
+NUM_TITLE_CHARS = 65
+NUM_EXTRACT_CHARS = 155
+
+
+def fetch_process_warc_records(rows):
+    """Fetch all WARC records defined by filenames and offsets in rows,
+    parse the records and the contained HTML, split the text into words
+    and emit pairs <word, 1>"""
+    s3client = boto3.client('s3')
+    for row in rows:
+        warc_path = row['warc_filename']
+        offset = int(row['warc_record_offset'])
+        length = int(row['warc_record_length'])
+        rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
+        response = s3client.get_object(Bucket='commoncrawl',
+        Key=warc_path,
+        Range=rangereq)
+        record_stream = BytesIO(response["Body"].read())
+        for record in ArchiveIterator(record_stream):
+            for result in process_record(record):
+                yield result
+
+
+def is_html(record):
+    """Return true if (detected) MIME type of a record is HTML"""
+    html_types = ['text/html', 'application/xhtml+xml']
+    if (('WARC-Identified-Payload-Type' in record.rec_headers) and
+        (record.rec_headers['WARC-Identified-Payload-Type'] in
+         html_types)):
+        return True
+    content_type = record.http_headers.get_header('content-type', None)
+    if content_type:
+        for html_type in html_types:
+            if html_type in content_type:
+                return True
+    return False
+
+
+def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT,
+            length_high=LENGTH_HIGH_DEFAULT, stopwords_low=STOPWORDS_LOW_DEFAULT,
+            stopwords_high=STOPWORDS_HIGH_DEFAULT, max_link_density=MAX_LINK_DENSITY_DEFAULT,
+            max_heading_distance=MAX_HEADING_DISTANCE_DEFAULT, no_headings=NO_HEADINGS_DEFAULT,
+            encoding=None, default_encoding=DEFAULT_ENCODING,
+            enc_errors=DEFAULT_ENC_ERRORS, preprocessor=preprocessor):
+    """
+    Converts an HTML page into a list of classified paragraphs. Each paragraph
+    is represented as instance of class ˙˙justext.paragraph.Paragraph˙˙.
+    """
+    dom = html_to_dom(html_text, default_encoding, encoding, enc_errors)
+    print("Parsed HTML")
+
+    try:
+        title = dom.find(".//title").text
+    except AttributeError:
+        title = None
+
+    preprocessed_dom = preprocessor(dom)
+
+    paragraphs = ParagraphMaker.make_paragraphs(preprocessed_dom)
+    print("Got paragraphs")
+
+    classify_paragraphs(paragraphs, stoplist, length_low, length_high,
+                        stopwords_low, stopwords_high, max_link_density, no_headings)
+    revise_paragraph_classification(paragraphs, max_heading_distance)
+
+    return paragraphs, title
+
+
+def process_record(record):
+    # print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream,
+    #       record.http_headers, record.content_type, record.length)
+
+    if record.rec_type != 'response':
+        # skip over WARC request or metadata records
+        return
+    if not is_html(record):
+        return
+
+    uri = record.rec_headers.get_header('WARC-Target-URI')
+    if len(uri) > MAX_URI_LENGTH:
+        print("URI too long", len(uri))
+        return
+
+    # rating = get_domain_rating(uri)
+    # print("Rating", rating)
+    # if rating is None:
+    #     return
+
+    content = record.content_stream().read().strip()
+    # print("Content", uri, content[:100])
+
+    if not content:
+        return
+
+    try:
+        all_paragraphs, full_title = justext(content, get_stoplist('English'))
+    except UnicodeDecodeError:
+        print("Unable to decode unicode")
+        return
+    except ParserError:
+        print("Unable to parse")
+        return
+
+    if full_title is None:
+        print("Missing title")
+        return
+
+    title = full_title[:NUM_TITLE_CHARS] + '…' \
+        if len(full_title) > NUM_TITLE_CHARS else full_title
+
+    text = '\n'.join([p.text for p in all_paragraphs
+                      if not p.is_boilerplate])[:NUM_CHARS_TO_ANALYSE]
+    print("Paragraphs", text)
+
+    if len(text) < NUM_EXTRACT_CHARS:
+        return
+
+    language = detect(text)
+    print("Got language", language)
+    if language != 'en':
+        return
+
+    extract = text[:NUM_EXTRACT_CHARS]
+    yield uri, title, extract
\ No newline at end of file