From 00d18c34749714eda6ce5df712433e4227903a40 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Wed, 23 Feb 2022 21:59:24 +0000 Subject: [PATCH] Remove unused code --- Dockerfile | 1 + mwmbl/indexer/bootstrap.sh | 15 -- mwmbl/indexer/crawl.py | 45 ------ mwmbl/indexer/deploy.sh | 20 --- mwmbl/indexer/extract.py | 73 ---------- mwmbl/indexer/extract_local.py | 63 -------- mwmbl/indexer/extract_process.py | 137 ------------------ mwmbl/indexer/index_glob.py | 47 ------ mwmbl/indexer/index_queue.py | 31 ---- mwmbl/indexer/indexcc.py | 49 ------- .../top_links.py => mwmbl/indexer/links.py | 0 mwmbl/indexer/paths.py | 3 - 12 files changed, 1 insertion(+), 483 deletions(-) delete mode 100644 mwmbl/indexer/bootstrap.sh delete mode 100644 mwmbl/indexer/crawl.py delete mode 100644 mwmbl/indexer/deploy.sh delete mode 100644 mwmbl/indexer/extract.py delete mode 100644 mwmbl/indexer/extract_local.py delete mode 100644 mwmbl/indexer/extract_process.py delete mode 100644 mwmbl/indexer/index_glob.py delete mode 100644 mwmbl/indexer/index_queue.py delete mode 100644 mwmbl/indexer/indexcc.py rename analyse/top_links.py => mwmbl/indexer/links.py (100%) diff --git a/Dockerfile b/Dockerfile index 4ac39f6..bebe590 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,4 +39,5 @@ COPY data /app/data COPY config /app/config # Using the mwmbl-tinysearchengine binary/entrypoint which comes packaged with mwmbl +# TODO: fix the arguments for the recent changes CMD ["/venv/bin/mwmbl-tinysearchengine", "--config", "config/tinysearchengine.yaml"] diff --git a/mwmbl/indexer/bootstrap.sh b/mwmbl/indexer/bootstrap.sh deleted file mode 100644 index 6186fae..0000000 --- a/mwmbl/indexer/bootstrap.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -xe - -sudo python3 -m pip uninstall numpy -y -sudo python3 -m pip uninstall numpy -y -sudo python3 -m pip uninstall numpy -y - -sudo python3 -m pip install boto3==1.19.7 botocore==1.22.7 jusText==3.0.0 langdetect==1.0.9 \ - lxml==4.6.3 numpy==1.21.3 pandas==1.2.5 pyarrow==6.0.0 spacy==2.3.5 \ - warcio==1.7.4 zstandard==0.16.0 - -sudo python3 -m spacy download en_core_web_sm - -echo "========================" -echo "Normal python pip freeze" -python3 -m pip freeze diff --git a/mwmbl/indexer/crawl.py b/mwmbl/indexer/crawl.py deleted file mode 100644 index 11405d0..0000000 --- a/mwmbl/indexer/crawl.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Crawl the web -""" -import gzip -import hashlib -import os -import sys -from traceback import print_tb, print_exc - -import pandas as pd -import requests - -from .paths import DATA_DIR, HN_TOP_PATH, CRAWL_PREFIX - - -def crawl(): - data = pd.read_csv(HN_TOP_PATH) - - for url in data['url']: - filename = hashlib.md5(url.encode('utf8')).hexdigest() - path = os.path.join(DATA_DIR, f"{CRAWL_PREFIX}{filename}.html.gz") - if os.path.isfile(path): - print("Path already exists, skipping", url) - continue - - print("Fetching", url) - try: - html = fetch(url) - except Exception: - print_exc(file=sys.stderr) - print("Unable to fetch", url) - continue - - with gzip.open(path, 'wt') as output: - output.write(url + '\n') - output.write(html) - - -def fetch(url): - page_data = requests.get(url, timeout=10) - return page_data.text - - -if __name__ == '__main__': - crawl() diff --git a/mwmbl/indexer/deploy.sh b/mwmbl/indexer/deploy.sh deleted file mode 100644 index b17ddb2..0000000 --- a/mwmbl/indexer/deploy.sh +++ /dev/null @@ -1,20 +0,0 @@ -cat hn-top-domains-filtered.py extract.py > runextract.py - -aws s3 cp runextract.py s3://tinysearch/code/ -aws s3 cp bootstrap.sh s3://tinysearch/code/ - - -aws emr create-cluster \ - --applications Name=Spark Name=Zeppelin \ - --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-03c33360c68f73a48"}' \ - --service-role EMR_DefaultRole \ - --enable-debugging \ - --release-label emr-5.33.1 \ - --log-uri 's3n://tinysearch/pyspark-logs/' \ - --bootstrap-actions '{"Path": "s3://tinysearch/code/bootstrap.sh"}' \ - --steps '[{"Args":["spark-submit","--deploy-mode","cluster","s3n://tinysearch/code/runextract.py"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"command-runner.jar","Properties":"","Name":"Spark application"}]' \ - --name 'TinySearch' \ - --instance-groups '[{"InstanceCount":2,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"CORE","InstanceType":"m4.large","Name":"Core Instance Group"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"MASTER","InstanceType":"m4.large","Name":"Master Instance Group"}]' \ - --configurations '[{"Classification":"spark","Properties":{}}]' \ - --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 \ - --auto-terminate diff --git a/mwmbl/indexer/extract.py b/mwmbl/indexer/extract.py deleted file mode 100644 index a397e05..0000000 --- a/mwmbl/indexer/extract.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Extract content from HTML files and store it as compressed JSON -""" - -from urllib.parse import urlparse - -from pyspark.sql import SparkSession -from pyspark.sql.functions import col -from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType - -RECORDS_PATH = 's3://tinysearch/outputs/records' -OUTPUT_PATH = 's3://tinysearch/outputs/index' - - -index_schema = StructType([ - StructField("term_hash", LongType(), False), - StructField("data", StringType(), False), - StructField("top", StringType(), False), -]) - - -output_schema = StructType([ - StructField("uri", StringType(), False), - StructField("title", StringType(), False), - StructField("extract", StringType(), False), -]) - - -record_schema = StructType([ - StructField("url", StringType(), False), - StructField("warc_filename", StringType(), False), - StructField("warc_record_offset", IntegerType(), False), - StructField("warc_record_length", IntegerType(), False), -]) - - -spark = SparkSession \ - .builder \ - .appName("Python Spark SQL basic example") \ - .config("spark.some.config.option", "some-value") \ - .getOrCreate() - - -def run(): - # sqlc = SQLContext(sparkContext=spark) - - df = spark.read.load('s3://commoncrawl/cc-index/table/cc-main/warc/') - df.createOrReplaceTempView('ccindex') - sqldf = spark.sql('''SELECT url, warc_filename, warc_record_offset, - warc_record_length - FROM ccindex - WHERE crawl = 'CC-MAIN-2021-43' - AND subset = 'warc' - ''') - sqldf = sqldf.sample(fraction=0.01) - sqldf = sqldf.filter(col('url_host_name').isin(list(DOMAINS.keys()))) - # print("Got rows", sqldf.take(10)) - # print("Num rows", sqldf.count()) - sqldf.write.option('compression', 'gzip').format('json').mode('overwrite').save(RECORDS_PATH) - - # warc_recs = sqldf.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd - # rdd = warc_recs.mapPartitions(fetch_process_warc_records) - # output = sqlc.createDataFrame(rdd, schema=output_schema) - # output.write.option('compression', 'gzip').format('json').mode('overwrite').save(OUTPUT_PATH) - - -def get_domain_rating(url): - domain = urlparse(url).netloc - return DOMAINS.get(domain) - - -if __name__ == '__main__': - run() diff --git a/mwmbl/indexer/extract_local.py b/mwmbl/indexer/extract_local.py deleted file mode 100644 index b293f08..0000000 --- a/mwmbl/indexer/extract_local.py +++ /dev/null @@ -1,63 +0,0 @@ -import gzip -import json -import os -from glob import glob -from multiprocessing import Process, Lock - -from .extract_process import fetch_process_warc_records -from .fsqueue import FSQueue, GzipJsonRowSerializer -from .paths import DATA_DIR - -ARCHIVE_INFO_GLOB = 'outputs/records/*.gz' - -NUM_PROCESSES = 8 - - -def get_records(): - for path in glob(ARCHIVE_INFO_GLOB): - with gzip.open(path) as data_file: - for line in data_file: - yield json.loads(line) - - -def process(record): - print("Record", record) - return list(fetch_process_warc_records([record])) - - -def run(lock: Lock): - input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) - output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer()) - - while True: - with lock: - queue_item = input_queue.get() - if queue_item is None: - print("All finished, stopping:", os.getpid()) - break - item_id, records = queue_item - print("Got item: ", item_id, os.getpid()) - search_items = [] - for record in records: - search_items += list(fetch_process_warc_records([record])) - if search_items: - output_queue.put(search_items) - input_queue.done(item_id) - - -def run_multiprocessing(): - input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) - input_queue.unlock_all() - processes = [] - lock = Lock() - for i in range(NUM_PROCESSES): - new_process = Process(target=run, args=(lock,)) - new_process.start() - processes.append(new_process) - - for running_process in processes: - running_process.join() - - -if __name__ == '__main__': - run_multiprocessing() diff --git a/mwmbl/indexer/extract_process.py b/mwmbl/indexer/extract_process.py deleted file mode 100644 index 54eb247..0000000 --- a/mwmbl/indexer/extract_process.py +++ /dev/null @@ -1,137 +0,0 @@ -from io import BytesIO - -import boto3 -from justext import get_stoplist -from justext.core import LENGTH_LOW_DEFAULT, LENGTH_HIGH_DEFAULT, STOPWORDS_LOW_DEFAULT, STOPWORDS_HIGH_DEFAULT, \ - MAX_LINK_DENSITY_DEFAULT, MAX_HEADING_DISTANCE_DEFAULT, NO_HEADINGS_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, \ - preprocessor, html_to_dom, ParagraphMaker, classify_paragraphs, revise_paragraph_classification -from langdetect import detect -from lxml.etree import ParserError -from warcio import ArchiveIterator - -MAX_URI_LENGTH = 150 -NUM_CHARS_TO_ANALYSE = 1000 -NUM_TITLE_CHARS = 65 -NUM_EXTRACT_CHARS = 155 - - -def fetch_process_warc_records(rows): - """Fetch all WARC records defined by filenames and offsets in rows, - parse the records and the contained HTML, split the text into words - and emit pairs """ - s3client = boto3.client('s3') - for row in rows: - warc_path = row['warc_filename'] - offset = int(row['warc_record_offset']) - length = int(row['warc_record_length']) - rangereq = 'bytes={}-{}'.format(offset, (offset+length-1)) - response = s3client.get_object(Bucket='commoncrawl', - Key=warc_path, - Range=rangereq) - record_stream = BytesIO(response["Body"].read()) - for record in ArchiveIterator(record_stream): - for result in process_record(record): - yield result - - -def is_html(record): - """Return true if (detected) MIME type of a record is HTML""" - html_types = ['text/html', 'application/xhtml+xml'] - if (('WARC-Identified-Payload-Type' in record.rec_headers) and - (record.rec_headers['WARC-Identified-Payload-Type'] in - html_types)): - return True - content_type = record.http_headers.get_header('content-type', None) - if content_type: - for html_type in html_types: - if html_type in content_type: - return True - return False - - -def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT, - length_high=LENGTH_HIGH_DEFAULT, stopwords_low=STOPWORDS_LOW_DEFAULT, - stopwords_high=STOPWORDS_HIGH_DEFAULT, max_link_density=MAX_LINK_DENSITY_DEFAULT, - max_heading_distance=MAX_HEADING_DISTANCE_DEFAULT, no_headings=NO_HEADINGS_DEFAULT, - encoding=None, default_encoding=DEFAULT_ENCODING, - enc_errors=DEFAULT_ENC_ERRORS, preprocessor=preprocessor): - """ - Converts an HTML page into a list of classified paragraphs. Each paragraph - is represented as instance of class ˙˙justext.paragraph.Paragraph˙˙. - """ - dom = html_to_dom(html_text, default_encoding, encoding, enc_errors) - print("Parsed HTML") - - try: - title = dom.find(".//title").text - except AttributeError: - title = None - - preprocessed_dom = preprocessor(dom) - - paragraphs = ParagraphMaker.make_paragraphs(preprocessed_dom) - print("Got paragraphs") - - classify_paragraphs(paragraphs, stoplist, length_low, length_high, - stopwords_low, stopwords_high, max_link_density, no_headings) - revise_paragraph_classification(paragraphs, max_heading_distance) - - return paragraphs, title - - -def process_record(record): - # print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream, - # record.http_headers, record.content_type, record.length) - - if record.rec_type != 'response': - # skip over WARC request or metadata records - return - if not is_html(record): - return - - uri = record.rec_headers.get_header('WARC-Target-URI') - if len(uri) > MAX_URI_LENGTH: - print("URI too long", len(uri)) - return - - # rating = get_domain_rating(uri) - # print("Rating", rating) - # if rating is None: - # return - - content = record.content_stream().read().strip() - # print("Content", uri, content[:100]) - - if not content: - return - - try: - all_paragraphs, full_title = justext(content, get_stoplist('English')) - except UnicodeDecodeError: - print("Unable to decode unicode") - return - except ParserError: - print("Unable to parse") - return - - if full_title is None: - print("Missing title") - return - - title = full_title[:NUM_TITLE_CHARS] + '…' \ - if len(full_title) > NUM_TITLE_CHARS else full_title - - text = '\n'.join([p.text for p in all_paragraphs - if not p.is_boilerplate])[:NUM_CHARS_TO_ANALYSE] - print("Paragraphs", text) - - if len(text) < NUM_EXTRACT_CHARS: - return - - language = detect(text) - print("Got language", language) - if language != 'en': - return - - extract = text[:NUM_EXTRACT_CHARS] - yield uri, title, extract \ No newline at end of file diff --git a/mwmbl/indexer/index_glob.py b/mwmbl/indexer/index_glob.py deleted file mode 100644 index 9bd8b96..0000000 --- a/mwmbl/indexer/index_glob.py +++ /dev/null @@ -1,47 +0,0 @@ -import gzip -from glob import glob - -import bs4 -from spacy.lang.en import English - -from .index import tokenize -from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE -from .paths import INDEX_PATH, CRAWL_GLOB - - -def run(): - # TODO: item_factory argument is unfilled. - indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) - indexer.create() - nlp = English() - for path in glob(CRAWL_GLOB): - print("Path", path) - with gzip.open(path, 'rt') as html_file: - url = html_file.readline().strip() - content = html_file.read() - - if indexer.document_indexed(url): - print("Page exists, skipping", url) - continue - - cleaned_text = clean(content) - try: - title = bs4.BeautifulSoup(content, features="lxml").find('title').string - except AttributeError: - title = cleaned_text[:80] - tokens = tokenize(nlp, cleaned_text) - print("URL", url) - print("Tokens", tokens) - print("Title", title) - indexer.index(tokens, url, title) - - -if __name__ == '__main__': - run() - - -def clean(content): - text = justext.justext(content, justext.get_stoplist("English")) - pars = [par.text for par in text if not par.is_boilerplate] - cleaned_text = ' '.join(pars) - return cleaned_text \ No newline at end of file diff --git a/mwmbl/indexer/index_queue.py b/mwmbl/indexer/index_queue.py deleted file mode 100644 index f048e28..0000000 --- a/mwmbl/indexer/index_queue.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Index items in the file-system queue -""" -from spacy.lang.en import English - -from .fsqueue import FSQueue, ZstdJsonSerializer -from .index import index_titles_urls_and_extracts -from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE -from .paths import DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, INDEX_PATH - - -def get_queue_items(): - titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer()) - titles_queue.unlock_all() - while True: - items_id, items = titles_queue.get() - for item in items: - if item['title'] is None: - continue - yield item['title'], item['url'] - - -def index_queue_items(): - nlp = English() - with TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer: - titles_and_urls = get_queue_items() - index_titles_urls_and_extracts(indexer, nlp, titles_and_urls) - - -if __name__ == '__main__': - index_queue_items() diff --git a/mwmbl/indexer/indexcc.py b/mwmbl/indexer/indexcc.py deleted file mode 100644 index 4f68025..0000000 --- a/mwmbl/indexer/indexcc.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -Index data downloaded from Common Crawl -""" -import logging -import sys -from logging import getLogger - -import spacy - -from .fsqueue import FSQueue, GzipJsonRowSerializer, FSQueueError -from .index import index_titles_urls_and_extracts -from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE, Document -from .paths import INDEX_PATH, DATA_DIR, COMMON_CRAWL_TERMS_PATH - - -logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) -logger = getLogger(__name__) - - -def index_common_craw_data(): - nlp = spacy.load("en_core_web_sm") - - with TinyIndexer(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer: - titles_urls_and_extracts = get_common_crawl_titles_urls_and_extracts() - index_titles_urls_and_extracts(indexer, nlp, titles_urls_and_extracts, COMMON_CRAWL_TERMS_PATH) - - -def get_common_crawl_titles_urls_and_extracts(): - input_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer()) - input_queue.unlock_all() - while True: - try: - next_item = input_queue.get() - except FSQueueError as e: - logger.exception(f'Error with item {e.item_id}') - input_queue.error(e.item_id) - continue - if next_item is None: - logger.info('Not more items to process, stopping') - break - item_id, items = next_item - logger.info(f'Processing item {item_id}') - for url, title, extract in items: - yield title, url, extract - input_queue.done(item_id) - - -if __name__ == '__main__': - index_common_craw_data() diff --git a/analyse/top_links.py b/mwmbl/indexer/links.py similarity index 100% rename from analyse/top_links.py rename to mwmbl/indexer/links.py diff --git a/mwmbl/indexer/paths.py b/mwmbl/indexer/paths.py index c372021..f8686a2 100644 --- a/mwmbl/indexer/paths.py +++ b/mwmbl/indexer/paths.py @@ -6,9 +6,6 @@ HOME = os.getenv('HOME') DATA_DIR = Path(os.environ['HOME']) / 'data' / 'tinysearch' COMMON_CRAWL_TERMS_PATH = DATA_DIR / 'common-craw-terms.csv' -HN_TOP_PATH = os.path.join(DATA_DIR, 'hn-top.csv') -CRAWL_PREFIX = 'crawl_' -CRAWL_GLOB = os.path.join(DATA_DIR, f"{CRAWL_PREFIX}*") TEST_INDEX_PATH = os.path.join(DATA_DIR, 'index-test.tinysearch') TEST_TERMS_PATH = os.path.join(DATA_DIR, 'index-terms.csv') WIKI_DATA_PATH = os.path.join(DATA_DIR, 'enwiki-20210301-pages-articles1.xml-p1p41242.bz2')