Merge pull request #37 from mwmbl/index-mwmbl-crawl

Index mwmbl crawl
This commit is contained in:
Daoud Clarke 2022-01-30 13:12:06 +00:00 committed by GitHub
commit 66696ad76b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 8 deletions

View file

@ -4,6 +4,7 @@ See how many unique URLs and root domains we have crawled.
import glob import glob
import gzip import gzip
import json import json
from collections import defaultdict, Counter
from urllib.parse import urlparse from urllib.parse import urlparse
CRAWL_GLOB = "../../data/mwmbl/b2/*/*/*/*/*/*.json.gz" CRAWL_GLOB = "../../data/mwmbl/b2/*/*/*/*/*/*.json.gz"
@ -12,24 +13,34 @@ CRAWL_GLOB = "../../data/mwmbl/b2/*/*/*/*/*/*.json.gz"
def get_urls(): def get_urls():
for path in glob.glob(CRAWL_GLOB): for path in glob.glob(CRAWL_GLOB):
data = json.load(gzip.open(path)) data = json.load(gzip.open(path))
user = data['user_id_hash']
for item in data['items']: for item in data['items']:
yield item['url'] yield user, item['url']
def analyse_urls(urls): def analyse_urls(urls):
url_set = set() url_set = defaultdict(list)
domains = set() domains = set()
count = 0 for user, url in urls:
for url in urls: url_set[url].append(user)
count += 1
url_set.add(url)
parsed_url = urlparse(url) parsed_url = urlparse(url)
path = parsed_url.path.strip('/') path = parsed_url.path.strip('/')
if path == '': if path == '':
domains.add(parsed_url.netloc) domains.add(parsed_url.netloc)
count = sum(len(x) for x in url_set.values())
print("Root pages crawled", sorted(domains)) print("Root pages crawled", sorted(domains))
find_worst_pages(url_set)
print(f"Got {len(url_set)} URLs and {len(domains)} root pages from {count} items") print(f"Got {len(url_set)} URLs and {len(domains)} root pages from {count} items")
url_list_size = len(json.dumps(list(url_set.keys())))
print("Length of all URLs", url_list_size)
def find_worst_pages(url_set):
worst = sorted(((len(users), url) for url, users in url_set.items()), reverse=True)[:50]
for count, url in worst:
print("Worst", count, url, Counter(url_set[url]))
def run(): def run():

View file

@ -58,6 +58,15 @@ class GzipJsonRowSerializer(Serializer):
return [json.loads(line) for line in lines.strip().split('\n')] return [json.loads(line) for line in lines.strip().split('\n')]
class GzipJsonBlobSerializer(Serializer):
def serialize(self, items: list[object]) -> bytes:
raise NotImplementedError("Serializer not needed - blob is generated by browser extension")
def deserialize(self, serialized_items: bytes) -> list[object]:
data = gzip.decompress(serialized_items).decode('utf8')
return json.loads(data)
class FSQueue: class FSQueue:
def __init__(self, directory: Union[str, Path], name: str, serializer: Serializer): def __init__(self, directory: Union[str, Path], name: str, serializer: Serializer):
self.directory = str(directory) self.directory = str(directory)

View file

@ -24,7 +24,8 @@ def is_content_token(nlp, token):
return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop
def tokenize(nlp, cleaned_text): def tokenize(nlp, input_text):
cleaned_text = input_text.encode('utf8', 'replace').decode('utf8')
tokens = nlp.tokenizer(cleaned_text) tokens = nlp.tokenizer(cleaned_text)
content_tokens = [token for token in tokens[:NUM_INITIAL_TOKENS] content_tokens = [token for token in tokens[:NUM_INITIAL_TOKENS]
if is_content_token(nlp, token)] if is_content_token(nlp, token)]

View file

@ -1,5 +1,46 @@
""" """
Index data crawled through the Mwmbl crawler. Index data crawled through the Mwmbl crawler.
""" """
from logging import getLogger
import spacy
from mwmbl.indexer.fsqueue import FSQueue, GzipJsonBlobSerializer, FSQueueError
from mwmbl.indexer.index import index_titles_urls_and_extracts
from mwmbl.indexer.paths import INDEX_PATH, MWMBL_CRAWL_TERMS_PATH, DATA_DIR
from mwmbl.tinysearchengine.indexer import TinyIndexer, Document, NUM_PAGES, PAGE_SIZE
logger = getLogger(__name__)
def index_mwmbl_craw_data():
nlp = spacy.load("en_core_web_sm")
with TinyIndexer(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer:
titles_urls_and_extracts = get_mwmbl_crawl_titles_urls_and_extracts()
index_titles_urls_and_extracts(indexer, nlp, titles_urls_and_extracts, MWMBL_CRAWL_TERMS_PATH)
def get_mwmbl_crawl_titles_urls_and_extracts():
input_queue = FSQueue(DATA_DIR, 'mwmbl-search-items', GzipJsonBlobSerializer())
input_queue.unlock_all()
while True:
try:
next_item = input_queue.get()
except FSQueueError as e:
logger.exception(f'Error with item {e.item_id}')
input_queue.error(e.item_id)
continue
if next_item is None:
logger.info('Not more items to process, stopping')
break
item_id, item_data = next_item
logger.info(f'Processing item {item_id}')
for item in item_data['items']:
yield item['title'], item['url'], item['extract']
input_queue.done(item_id)
if __name__ == '__main__':
index_mwmbl_craw_data()

View file

@ -5,6 +5,7 @@ HOME = os.getenv('HOME')
DATA_DIR = Path(os.environ['HOME']) / 'data' / 'tinysearch' DATA_DIR = Path(os.environ['HOME']) / 'data' / 'tinysearch'
COMMON_CRAWL_TERMS_PATH = DATA_DIR / 'common-craw-terms.csv' COMMON_CRAWL_TERMS_PATH = DATA_DIR / 'common-craw-terms.csv'
MWMBL_CRAWL_TERMS_PATH = DATA_DIR / 'mwmbl-craw-terms.csv'
HN_TOP_PATH = os.path.join(DATA_DIR, 'hn-top.csv') HN_TOP_PATH = os.path.join(DATA_DIR, 'hn-top.csv')
CRAWL_PREFIX = 'crawl_' CRAWL_PREFIX = 'crawl_'
@ -19,6 +20,6 @@ DOMAINS_QUEUE_NAME = 'domains-queue-fs'
DOMAINS_TITLES_QUEUE_NAME = 'domains-title-queue-fs' DOMAINS_TITLES_QUEUE_NAME = 'domains-title-queue-fs'
DOMAINS_PATH = os.path.join(DATA_DIR, 'top10milliondomains.csv.gz') DOMAINS_PATH = os.path.join(DATA_DIR, 'top10milliondomains.csv.gz')
INDEX_PATH = Path(__file__).parent / 'data' / 'index.tinysearch' INDEX_PATH = Path(__file__).parent.parent.parent / 'data' / 'index.tinysearch'
TOP_DOMAINS_JSON_PATH = DATA_DIR / 'hn-top-domains.json' TOP_DOMAINS_JSON_PATH = DATA_DIR / 'hn-top-domains.json'