Remove unused code

This commit is contained in:
Daoud Clarke 2022-02-23 21:59:24 +00:00
parent d19e0e51f7
commit 00d18c3474
12 changed files with 1 additions and 483 deletions

View file

@ -39,4 +39,5 @@ COPY data /app/data
COPY config /app/config
# Using the mwmbl-tinysearchengine binary/entrypoint which comes packaged with mwmbl
# TODO: fix the arguments for the recent changes
CMD ["/venv/bin/mwmbl-tinysearchengine", "--config", "config/tinysearchengine.yaml"]

View file

@ -1,15 +0,0 @@
#!/bin/bash -xe
sudo python3 -m pip uninstall numpy -y
sudo python3 -m pip uninstall numpy -y
sudo python3 -m pip uninstall numpy -y
sudo python3 -m pip install boto3==1.19.7 botocore==1.22.7 jusText==3.0.0 langdetect==1.0.9 \
lxml==4.6.3 numpy==1.21.3 pandas==1.2.5 pyarrow==6.0.0 spacy==2.3.5 \
warcio==1.7.4 zstandard==0.16.0
sudo python3 -m spacy download en_core_web_sm
echo "========================"
echo "Normal python pip freeze"
python3 -m pip freeze

View file

@ -1,45 +0,0 @@
"""
Crawl the web
"""
import gzip
import hashlib
import os
import sys
from traceback import print_tb, print_exc
import pandas as pd
import requests
from .paths import DATA_DIR, HN_TOP_PATH, CRAWL_PREFIX
def crawl():
data = pd.read_csv(HN_TOP_PATH)
for url in data['url']:
filename = hashlib.md5(url.encode('utf8')).hexdigest()
path = os.path.join(DATA_DIR, f"{CRAWL_PREFIX}{filename}.html.gz")
if os.path.isfile(path):
print("Path already exists, skipping", url)
continue
print("Fetching", url)
try:
html = fetch(url)
except Exception:
print_exc(file=sys.stderr)
print("Unable to fetch", url)
continue
with gzip.open(path, 'wt') as output:
output.write(url + '\n')
output.write(html)
def fetch(url):
page_data = requests.get(url, timeout=10)
return page_data.text
if __name__ == '__main__':
crawl()

View file

@ -1,20 +0,0 @@
cat hn-top-domains-filtered.py extract.py > runextract.py
aws s3 cp runextract.py s3://tinysearch/code/
aws s3 cp bootstrap.sh s3://tinysearch/code/
aws emr create-cluster \
--applications Name=Spark Name=Zeppelin \
--ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-03c33360c68f73a48"}' \
--service-role EMR_DefaultRole \
--enable-debugging \
--release-label emr-5.33.1 \
--log-uri 's3n://tinysearch/pyspark-logs/' \
--bootstrap-actions '{"Path": "s3://tinysearch/code/bootstrap.sh"}' \
--steps '[{"Args":["spark-submit","--deploy-mode","cluster","s3n://tinysearch/code/runextract.py"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"command-runner.jar","Properties":"","Name":"Spark application"}]' \
--name 'TinySearch' \
--instance-groups '[{"InstanceCount":2,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"CORE","InstanceType":"m4.large","Name":"Core Instance Group"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"MASTER","InstanceType":"m4.large","Name":"Master Instance Group"}]' \
--configurations '[{"Classification":"spark","Properties":{}}]' \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 \
--auto-terminate

View file

@ -1,73 +0,0 @@
"""
Extract content from HTML files and store it as compressed JSON
"""
from urllib.parse import urlparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
RECORDS_PATH = 's3://tinysearch/outputs/records'
OUTPUT_PATH = 's3://tinysearch/outputs/index'
index_schema = StructType([
StructField("term_hash", LongType(), False),
StructField("data", StringType(), False),
StructField("top", StringType(), False),
])
output_schema = StructType([
StructField("uri", StringType(), False),
StructField("title", StringType(), False),
StructField("extract", StringType(), False),
])
record_schema = StructType([
StructField("url", StringType(), False),
StructField("warc_filename", StringType(), False),
StructField("warc_record_offset", IntegerType(), False),
StructField("warc_record_length", IntegerType(), False),
])
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
def run():
# sqlc = SQLContext(sparkContext=spark)
df = spark.read.load('s3://commoncrawl/cc-index/table/cc-main/warc/')
df.createOrReplaceTempView('ccindex')
sqldf = spark.sql('''SELECT url, warc_filename, warc_record_offset,
warc_record_length
FROM ccindex
WHERE crawl = 'CC-MAIN-2021-43'
AND subset = 'warc'
''')
sqldf = sqldf.sample(fraction=0.01)
sqldf = sqldf.filter(col('url_host_name').isin(list(DOMAINS.keys())))
# print("Got rows", sqldf.take(10))
# print("Num rows", sqldf.count())
sqldf.write.option('compression', 'gzip').format('json').mode('overwrite').save(RECORDS_PATH)
# warc_recs = sqldf.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd
# rdd = warc_recs.mapPartitions(fetch_process_warc_records)
# output = sqlc.createDataFrame(rdd, schema=output_schema)
# output.write.option('compression', 'gzip').format('json').mode('overwrite').save(OUTPUT_PATH)
def get_domain_rating(url):
domain = urlparse(url).netloc
return DOMAINS.get(domain)
if __name__ == '__main__':
run()

View file

@ -1,63 +0,0 @@
import gzip
import json
import os
from glob import glob
from multiprocessing import Process, Lock
from .extract_process import fetch_process_warc_records
from .fsqueue import FSQueue, GzipJsonRowSerializer
from .paths import DATA_DIR
ARCHIVE_INFO_GLOB = 'outputs/records/*.gz'
NUM_PROCESSES = 8
def get_records():
for path in glob(ARCHIVE_INFO_GLOB):
with gzip.open(path) as data_file:
for line in data_file:
yield json.loads(line)
def process(record):
print("Record", record)
return list(fetch_process_warc_records([record]))
def run(lock: Lock):
input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
while True:
with lock:
queue_item = input_queue.get()
if queue_item is None:
print("All finished, stopping:", os.getpid())
break
item_id, records = queue_item
print("Got item: ", item_id, os.getpid())
search_items = []
for record in records:
search_items += list(fetch_process_warc_records([record]))
if search_items:
output_queue.put(search_items)
input_queue.done(item_id)
def run_multiprocessing():
input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
input_queue.unlock_all()
processes = []
lock = Lock()
for i in range(NUM_PROCESSES):
new_process = Process(target=run, args=(lock,))
new_process.start()
processes.append(new_process)
for running_process in processes:
running_process.join()
if __name__ == '__main__':
run_multiprocessing()

View file

@ -1,137 +0,0 @@
from io import BytesIO
import boto3
from justext import get_stoplist
from justext.core import LENGTH_LOW_DEFAULT, LENGTH_HIGH_DEFAULT, STOPWORDS_LOW_DEFAULT, STOPWORDS_HIGH_DEFAULT, \
MAX_LINK_DENSITY_DEFAULT, MAX_HEADING_DISTANCE_DEFAULT, NO_HEADINGS_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, \
preprocessor, html_to_dom, ParagraphMaker, classify_paragraphs, revise_paragraph_classification
from langdetect import detect
from lxml.etree import ParserError
from warcio import ArchiveIterator
MAX_URI_LENGTH = 150
NUM_CHARS_TO_ANALYSE = 1000
NUM_TITLE_CHARS = 65
NUM_EXTRACT_CHARS = 155
def fetch_process_warc_records(rows):
"""Fetch all WARC records defined by filenames and offsets in rows,
parse the records and the contained HTML, split the text into words
and emit pairs <word, 1>"""
s3client = boto3.client('s3')
for row in rows:
warc_path = row['warc_filename']
offset = int(row['warc_record_offset'])
length = int(row['warc_record_length'])
rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
response = s3client.get_object(Bucket='commoncrawl',
Key=warc_path,
Range=rangereq)
record_stream = BytesIO(response["Body"].read())
for record in ArchiveIterator(record_stream):
for result in process_record(record):
yield result
def is_html(record):
"""Return true if (detected) MIME type of a record is HTML"""
html_types = ['text/html', 'application/xhtml+xml']
if (('WARC-Identified-Payload-Type' in record.rec_headers) and
(record.rec_headers['WARC-Identified-Payload-Type'] in
html_types)):
return True
content_type = record.http_headers.get_header('content-type', None)
if content_type:
for html_type in html_types:
if html_type in content_type:
return True
return False
def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT,
length_high=LENGTH_HIGH_DEFAULT, stopwords_low=STOPWORDS_LOW_DEFAULT,
stopwords_high=STOPWORDS_HIGH_DEFAULT, max_link_density=MAX_LINK_DENSITY_DEFAULT,
max_heading_distance=MAX_HEADING_DISTANCE_DEFAULT, no_headings=NO_HEADINGS_DEFAULT,
encoding=None, default_encoding=DEFAULT_ENCODING,
enc_errors=DEFAULT_ENC_ERRORS, preprocessor=preprocessor):
"""
Converts an HTML page into a list of classified paragraphs. Each paragraph
is represented as instance of class ˙˙justext.paragraph.Paragraph˙˙.
"""
dom = html_to_dom(html_text, default_encoding, encoding, enc_errors)
print("Parsed HTML")
try:
title = dom.find(".//title").text
except AttributeError:
title = None
preprocessed_dom = preprocessor(dom)
paragraphs = ParagraphMaker.make_paragraphs(preprocessed_dom)
print("Got paragraphs")
classify_paragraphs(paragraphs, stoplist, length_low, length_high,
stopwords_low, stopwords_high, max_link_density, no_headings)
revise_paragraph_classification(paragraphs, max_heading_distance)
return paragraphs, title
def process_record(record):
# print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream,
# record.http_headers, record.content_type, record.length)
if record.rec_type != 'response':
# skip over WARC request or metadata records
return
if not is_html(record):
return
uri = record.rec_headers.get_header('WARC-Target-URI')
if len(uri) > MAX_URI_LENGTH:
print("URI too long", len(uri))
return
# rating = get_domain_rating(uri)
# print("Rating", rating)
# if rating is None:
# return
content = record.content_stream().read().strip()
# print("Content", uri, content[:100])
if not content:
return
try:
all_paragraphs, full_title = justext(content, get_stoplist('English'))
except UnicodeDecodeError:
print("Unable to decode unicode")
return
except ParserError:
print("Unable to parse")
return
if full_title is None:
print("Missing title")
return
title = full_title[:NUM_TITLE_CHARS] + '' \
if len(full_title) > NUM_TITLE_CHARS else full_title
text = '\n'.join([p.text for p in all_paragraphs
if not p.is_boilerplate])[:NUM_CHARS_TO_ANALYSE]
print("Paragraphs", text)
if len(text) < NUM_EXTRACT_CHARS:
return
language = detect(text)
print("Got language", language)
if language != 'en':
return
extract = text[:NUM_EXTRACT_CHARS]
yield uri, title, extract

View file

@ -1,47 +0,0 @@
import gzip
from glob import glob
import bs4
from spacy.lang.en import English
from .index import tokenize
from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE
from .paths import INDEX_PATH, CRAWL_GLOB
def run():
# TODO: item_factory argument is unfilled.
indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE)
indexer.create()
nlp = English()
for path in glob(CRAWL_GLOB):
print("Path", path)
with gzip.open(path, 'rt') as html_file:
url = html_file.readline().strip()
content = html_file.read()
if indexer.document_indexed(url):
print("Page exists, skipping", url)
continue
cleaned_text = clean(content)
try:
title = bs4.BeautifulSoup(content, features="lxml").find('title').string
except AttributeError:
title = cleaned_text[:80]
tokens = tokenize(nlp, cleaned_text)
print("URL", url)
print("Tokens", tokens)
print("Title", title)
indexer.index(tokens, url, title)
if __name__ == '__main__':
run()
def clean(content):
text = justext.justext(content, justext.get_stoplist("English"))
pars = [par.text for par in text if not par.is_boilerplate]
cleaned_text = ' '.join(pars)
return cleaned_text

View file

@ -1,31 +0,0 @@
"""
Index items in the file-system queue
"""
from spacy.lang.en import English
from .fsqueue import FSQueue, ZstdJsonSerializer
from .index import index_titles_urls_and_extracts
from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE
from .paths import DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, INDEX_PATH
def get_queue_items():
titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer())
titles_queue.unlock_all()
while True:
items_id, items = titles_queue.get()
for item in items:
if item['title'] is None:
continue
yield item['title'], item['url']
def index_queue_items():
nlp = English()
with TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer:
titles_and_urls = get_queue_items()
index_titles_urls_and_extracts(indexer, nlp, titles_and_urls)
if __name__ == '__main__':
index_queue_items()

View file

@ -1,49 +0,0 @@
"""
Index data downloaded from Common Crawl
"""
import logging
import sys
from logging import getLogger
import spacy
from .fsqueue import FSQueue, GzipJsonRowSerializer, FSQueueError
from .index import index_titles_urls_and_extracts
from mwmbl.tinysearchengine.indexer import TinyIndexer, NUM_PAGES, PAGE_SIZE, Document
from .paths import INDEX_PATH, DATA_DIR, COMMON_CRAWL_TERMS_PATH
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logger = getLogger(__name__)
def index_common_craw_data():
nlp = spacy.load("en_core_web_sm")
with TinyIndexer(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer:
titles_urls_and_extracts = get_common_crawl_titles_urls_and_extracts()
index_titles_urls_and_extracts(indexer, nlp, titles_urls_and_extracts, COMMON_CRAWL_TERMS_PATH)
def get_common_crawl_titles_urls_and_extracts():
input_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
input_queue.unlock_all()
while True:
try:
next_item = input_queue.get()
except FSQueueError as e:
logger.exception(f'Error with item {e.item_id}')
input_queue.error(e.item_id)
continue
if next_item is None:
logger.info('Not more items to process, stopping')
break
item_id, items = next_item
logger.info(f'Processing item {item_id}')
for url, title, extract in items:
yield title, url, extract
input_queue.done(item_id)
if __name__ == '__main__':
index_common_craw_data()

View file

@ -6,9 +6,6 @@ HOME = os.getenv('HOME')
DATA_DIR = Path(os.environ['HOME']) / 'data' / 'tinysearch'
COMMON_CRAWL_TERMS_PATH = DATA_DIR / 'common-craw-terms.csv'
HN_TOP_PATH = os.path.join(DATA_DIR, 'hn-top.csv')
CRAWL_PREFIX = 'crawl_'
CRAWL_GLOB = os.path.join(DATA_DIR, f"{CRAWL_PREFIX}*")
TEST_INDEX_PATH = os.path.join(DATA_DIR, 'index-test.tinysearch')
TEST_TERMS_PATH = os.path.join(DATA_DIR, 'index-terms.csv')
WIKI_DATA_PATH = os.path.join(DATA_DIR, 'enwiki-20210301-pages-articles1.xml-p1p41242.bz2')