From b1eea2457f8026bb9dea84ff06ecfec27245784e Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Mon, 22 Aug 2022 22:47:42 +0100 Subject: [PATCH 1/6] Script to index local batch for evaluation --- analyse/index_local.py | 51 ++++++++++++++++++++++++++++++++++ analyse/inspect_index.py | 26 +++++++++++------ mwmbl/indexer/index_batches.py | 25 +++++++++-------- 3 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 analyse/index_local.py diff --git a/analyse/index_local.py b/analyse/index_local.py new file mode 100644 index 0000000..9db9dbf --- /dev/null +++ b/analyse/index_local.py @@ -0,0 +1,51 @@ +""" +Index batches stored locally on the filesystem for the purpose of evaluation. +""" +import glob +import gzip +import json +import logging +import os +import sys +from pathlib import Path + +import spacy + +from mwmbl.crawler.batch import HashedBatch +from mwmbl.crawler.urls import URLDatabase +from mwmbl.database import Database +from mwmbl.indexer.index_batches import index_batches +from mwmbl.tinysearchengine.indexer import TinyIndex, Document + +LOCAL_BATCHES_PATH = f'{os.environ["HOME"]}/data/mwmbl/file/**/*.json.gz' +NUM_BATCHES = 10000 +EVALUATE_INDEX_PATH = f'{os.environ["HOME"]}/data/mwmbl/evaluate-index.tinysearch' +NUM_PAGES = 1_024_000 +PAGE_SIZE = 4096 + + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + + +def get_batches(): + for path in sorted(glob.glob(LOCAL_BATCHES_PATH, recursive=True))[:NUM_BATCHES]: + data = json.load(gzip.open(path)) + yield HashedBatch.parse_obj(data) + + +def run(): + try: + os.remove(EVALUATE_INDEX_PATH) + except FileNotFoundError: + pass + TinyIndex.create(item_factory=Document, index_path=EVALUATE_INDEX_PATH, num_pages=NUM_PAGES, page_size=PAGE_SIZE) + + batches = get_batches() + with Database() as db: + nlp = spacy.load("en_core_web_sm") + url_db = URLDatabase(db.connection) + index_batches(batches, EVALUATE_INDEX_PATH, nlp, url_db) + + +if __name__ == '__main__': + run() diff --git a/analyse/inspect_index.py b/analyse/inspect_index.py index 2ebebe7..20b0619 100644 --- a/analyse/inspect_index.py +++ b/analyse/inspect_index.py @@ -1,8 +1,10 @@ import logging import sys +import numpy as np import spacy +from analyse.index_local import EVALUATE_INDEX_PATH from mwmbl.indexer.index import tokenize_document from mwmbl.indexer.paths import INDEX_PATH from mwmbl.tinysearchengine.indexer import TinyIndex, Document @@ -35,16 +37,24 @@ def get_items(): print("Items", item) -def run(): - with TinyIndex(Document, INDEX_PATH) as tiny_index: - for i in range(100000): +def run(index_path): + with TinyIndex(Document, index_path) as tiny_index: + sizes = {} + for i in range(tiny_index.num_pages): page = tiny_index.get_page(i) - for item in page: - if ' search' in item.title: - print("Page", i, item) + if page: + sizes[i] = len(page) + if len(page) > 50: + print("Page", len(page), page) + # for item in page: + # if ' search' in item.title: + # print("Page", i, item) + print("Max", max(sizes.values())) + print("Top", sorted(sizes.values())[-100:]) + print("Mean", np.mean(list(sizes.values()))) if __name__ == '__main__': # store() - # run() - get_items() + run(EVALUATE_INDEX_PATH) + # get_items() diff --git a/mwmbl/indexer/index_batches.py b/mwmbl/indexer/index_batches.py index 6d28586..7c85da2 100644 --- a/mwmbl/indexer/index_batches.py +++ b/mwmbl/indexer/index_batches.py @@ -8,6 +8,7 @@ from typing import Iterable from urllib.parse import urlparse import spacy +from spacy import Language from mwmbl.crawler.batch import HashedBatch, Item from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL @@ -49,23 +50,23 @@ def run(batch_cache: BatchCache, index_path: str): record_urls_in_database(batch_data.values()) - document_tuples = list(get_documents_from_batches(batch_data.values())) - urls = [url for title, url, extract in document_tuples] - - logger.info(f"Got {len(urls)} document tuples") - url_db = URLDatabase(db.connection) - url_scores = url_db.get_url_scores(urls) - - logger.info(f"Got {len(url_scores)} scores") - documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples] - - page_documents = preprocess_documents(documents, index_path, nlp) - index_pages(index_path, page_documents) + index_batches(batch_data.values(), index_path, nlp, url_db) logger.info("Indexed pages") index_db.update_batch_status([batch.url for batch in batches], BatchStatus.INDEXED) +def index_batches(batch_data: Iterable[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase): + document_tuples = list(get_documents_from_batches(batch_data)) + urls = [url for title, url, extract in document_tuples] + logger.info(f"Got {len(urls)} document tuples") + url_scores = url_db.get_url_scores(urls) + logger.info(f"Got {len(url_scores)} scores") + documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples] + page_documents = preprocess_documents(documents, index_path, nlp) + index_pages(index_path, page_documents) + + def index_pages(index_path, page_documents): with TinyIndex(Document, index_path, 'w') as indexer: for page, documents in page_documents.items(): From 4779371cf37786b9d98cb9d943a21f2cdb7b0b69 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Tue, 23 Aug 2022 21:57:38 +0100 Subject: [PATCH 2/6] Use a custom tokenizer --- analyse/index_local.py | 7 ++++++ mwmbl/indexer/index.py | 39 +++++++++++++++++++++---------- mwmbl/tinysearchengine/indexer.py | 6 ++++- mwmbl/tinysearchengine/rank.py | 3 ++- 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/analyse/index_local.py b/analyse/index_local.py index 9db9dbf..24628a5 100644 --- a/analyse/index_local.py +++ b/analyse/index_local.py @@ -8,6 +8,7 @@ import logging import os import sys from pathlib import Path +from datetime import datetime import spacy @@ -41,10 +42,16 @@ def run(): TinyIndex.create(item_factory=Document, index_path=EVALUATE_INDEX_PATH, num_pages=NUM_PAGES, page_size=PAGE_SIZE) batches = get_batches() + + start = datetime.now() with Database() as db: nlp = spacy.load("en_core_web_sm") url_db = URLDatabase(db.connection) index_batches(batches, EVALUATE_INDEX_PATH, nlp, url_db) + end = datetime.now() + + total_time = (end - start).total_seconds() + print("total_seconds:", total_time) if __name__ == '__main__': diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index 45793ee..67ae164 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -16,20 +16,35 @@ HTTPS_START = 'https://' BATCH_SIZE = 100 -def is_content_token(nlp, token): - lexeme = nlp.vocab[token.orth] - return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop +STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against,all,almost,alone,along,already,also," \ + "although,always,am,among,an,and,another,any,anyone,anything,anywhere,are,aren't,around,as,at,b,B,back," \ + "be,became,because,become,becomes,been,before,behind,being,below,between,both,but,by,c,C,can,cannot,can't," \ + "could,couldn't,d,D,did,didn't,do,does,doesn't,doing,done,don't,down,during,e,E,each,either,enough,even," \ + "ever,every,everyone,everything,everywhere,f,F,few,find,first,for,four,from,full,further,g,G,get,give,go," \ + "h,H,had,hadn't,has,hasn't,have,haven't,having,he,he'd,he'll,her,here,here's,hers,herself,he's,him," \ + "himself,his,how,however,how's,i,I,i'd,if,i'll,i'm,in,interest,into,is,isn't,it,it's,its,itself,i've," \ + "j,J,k,K,keep,l,L,last,least,less,let's,m,M,made,many,may,me,might,more,most,mostly,much,must,mustn't," \ + "my,myself,n,N,never,next,no,nobody,noone,nor,not,nothing,now,nowhere,o,O,of,off,often,on,once,one,only," \ + "or,other,others,ought,our,ours,ourselves,out,over,own,p,P,part,per,perhaps,put,q,Q,r,R,rather,s,S,same," \ + "see,seem,seemed,seeming,seems,several,shan't,she,she'd,she'll,she's,should,shouldn't,show,side,since,so," \ + "some,someone,something,somewhere,still,such,t,T,take,than,that,that's,the,their,theirs,them,themselves," \ + "then,there,therefore,there's,these,they,they'd,they'll,they're,they've,this,those,though,three,through," \ + "thus,to,together,too,toward,two,u,U,under,until,up,upon,us,v,V,very,w,W,was,wasn't,we,we'd,we'll,well," \ + "we're,were,weren't,we've,what,what's,when,when's,where,where's,whether,which,while,who,whole,whom,who's," \ + "whose,why,why's,will,with,within,without,won't,would,wouldn't,x,X,y,Y,yet,you,you'd,you'll,your,you're," \ + "yours,yourself,yourselves,you've,z,Z".split(',')) -def tokenize(nlp, input_text): +def tokenize(input_text): cleaned_text = input_text.encode('utf8', 'replace').decode('utf8') - tokens = nlp.tokenizer(cleaned_text) + tokens = cleaned_text.lower().replace('.', ' ').replace(',', ' ').split() + # tokens = nlp.tokenizer(cleaned_text) if input_text.endswith('…'): # Discard the last two tokens since there will likely be a word cut in two tokens = tokens[:-2] - content_tokens = [token for token in tokens if is_content_token(nlp, token)] - lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens} - return lowered + content_tokens = [token for token in tokens if not token in STOPWORDS] + # lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens} + return content_tokens def prepare_url_for_tokenizing(url: str): @@ -53,12 +68,12 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD def tokenize_document(url, title_cleaned, extract, score, nlp): - title_tokens = tokenize(nlp, title_cleaned) + title_tokens = tokenize(title_cleaned) prepared_url = prepare_url_for_tokenizing(unquote(url)) - url_tokens = tokenize(nlp, prepared_url) - extract_tokens = tokenize(nlp, extract) + url_tokens = tokenize(prepared_url) + extract_tokens = tokenize(extract) # print("Extract tokens", extract_tokens) - tokens = title_tokens | url_tokens | extract_tokens + tokens = set(title_tokens) | set(url_tokens) | set(extract_tokens) document = TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract, score=score) return document diff --git a/mwmbl/tinysearchengine/indexer.py b/mwmbl/tinysearchengine/indexer.py index 405cc17..253faab 100644 --- a/mwmbl/tinysearchengine/indexer.py +++ b/mwmbl/tinysearchengine/indexer.py @@ -1,6 +1,6 @@ import json import os -from dataclasses import astuple, dataclass, asdict +from dataclasses import dataclass, asdict from io import UnsupportedOperation, BytesIO from logging import getLogger from mmap import mmap, PROT_READ, PROT_WRITE @@ -20,6 +20,10 @@ PAGE_SIZE = 4096 logger = getLogger(__name__) +def astuple(dc): + return tuple(dc.__dict__.values()) + + @dataclass class Document: title: str diff --git a/mwmbl/tinysearchengine/rank.py b/mwmbl/tinysearchengine/rank.py index 75246d8..68562d7 100644 --- a/mwmbl/tinysearchengine/rank.py +++ b/mwmbl/tinysearchengine/rank.py @@ -5,6 +5,7 @@ from logging import getLogger from operator import itemgetter from urllib.parse import urlparse +from mwmbl.indexer.index import tokenize from mwmbl.tinysearchengine.completer import Completer from mwmbl.hn_top_domains_filtered import DOMAINS from mwmbl.tinysearchengine.indexer import TinyIndex, Document @@ -171,7 +172,7 @@ class Ranker: return [q, urls + completed] def get_results(self, q): - terms = [x.lower() for x in q.replace('.', ' ').split()] + terms = tokenize(q) is_complete = q.endswith(' ') if len(terms) > 0 and not is_complete: completions = self.completer.complete(terms[-1]) From 578b70560937da34c92b208569c70e7f38825386 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Tue, 23 Aug 2022 22:06:43 +0100 Subject: [PATCH 3/6] Don't replace full stops and commas --- mwmbl/indexer/index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index 67ae164..fb52fe1 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -37,7 +37,7 @@ STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against, def tokenize(input_text): cleaned_text = input_text.encode('utf8', 'replace').decode('utf8') - tokens = cleaned_text.lower().replace('.', ' ').replace(',', ' ').split() + tokens = cleaned_text.lower().split() # tokens = nlp.tokenizer(cleaned_text) if input_text.endswith('…'): # Discard the last two tokens since there will likely be a word cut in two From 619b6c3a932e8ae4fb8e59515c438776d0e77904 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Wed, 24 Aug 2022 21:08:33 +0100 Subject: [PATCH 4/6] Don't remove stopwords --- mwmbl/indexer/index.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index fb52fe1..52ada57 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -38,13 +38,12 @@ STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against, def tokenize(input_text): cleaned_text = input_text.encode('utf8', 'replace').decode('utf8') tokens = cleaned_text.lower().split() - # tokens = nlp.tokenizer(cleaned_text) if input_text.endswith('…'): # Discard the last two tokens since there will likely be a word cut in two tokens = tokens[:-2] - content_tokens = [token for token in tokens if not token in STOPWORDS] - # lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens} - return content_tokens + # content_tokens = [token for token in tokens if not token in STOPWORDS] + # return content_tokens + return tokens def prepare_url_for_tokenizing(url: str): From f4fb9f831a13f3c442960e4999d796f9931ce58a Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Fri, 26 Aug 2022 17:20:11 +0100 Subject: [PATCH 5/6] Use terms and bigrams from the beginning of the string only --- mwmbl/indexer/index.py | 26 ++++++++++++++------------ mwmbl/tinysearchengine/rank.py | 18 ++++++++++-------- mwmbl/tokenizer.py | 13 +++++++++++++ 3 files changed, 37 insertions(+), 20 deletions(-) create mode 100644 mwmbl/tokenizer.py diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index 52ada57..1dfa3af 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -8,12 +8,15 @@ from urllib.parse import unquote import pandas as pd from mwmbl.tinysearchengine.indexer import Document, TokenizedDocument, TinyIndex +from mwmbl.tokenizer import tokenize, get_bigrams DEFAULT_SCORE = 0 HTTP_START = 'http://' HTTPS_START = 'https://' BATCH_SIZE = 100 +NUM_FIRST_TOKENS = 3 +NUM_BIGRAMS = 5 STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against,all,almost,alone,along,already,also," \ @@ -35,17 +38,6 @@ STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against, "yours,yourself,yourselves,you've,z,Z".split(',')) -def tokenize(input_text): - cleaned_text = input_text.encode('utf8', 'replace').decode('utf8') - tokens = cleaned_text.lower().split() - if input_text.endswith('…'): - # Discard the last two tokens since there will likely be a word cut in two - tokens = tokens[:-2] - # content_tokens = [token for token in tokens if not token in STOPWORDS] - # return content_tokens - return tokens - - def prepare_url_for_tokenizing(url: str): if url.startswith(HTTP_START): url = url[len(HTTP_START):] @@ -66,13 +58,23 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD print("Processed", i) +def get_index_tokens(tokens): + first_tokens = tokens[:NUM_FIRST_TOKENS] + bigrams = get_bigrams(NUM_BIGRAMS, tokens) + return set(first_tokens + bigrams) + + def tokenize_document(url, title_cleaned, extract, score, nlp): title_tokens = tokenize(title_cleaned) prepared_url = prepare_url_for_tokenizing(unquote(url)) url_tokens = tokenize(prepared_url) extract_tokens = tokenize(extract) # print("Extract tokens", extract_tokens) - tokens = set(title_tokens) | set(url_tokens) | set(extract_tokens) + tokens = get_index_tokens(title_tokens) | get_index_tokens(url_tokens) | get_index_tokens(extract_tokens) + # doc = Document(title_cleaned, url, extract, score) + # token_scores = {token: score_result([token], doc, True) for token in tokens} + # high_scoring_tokens = [k for k, v in token_scores.items() if v > 0.5] + # print("High scoring", len(high_scoring_tokens), token_scores, doc) document = TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract, score=score) return document diff --git a/mwmbl/tinysearchengine/rank.py b/mwmbl/tinysearchengine/rank.py index 68562d7..59d8f70 100644 --- a/mwmbl/tinysearchengine/rank.py +++ b/mwmbl/tinysearchengine/rank.py @@ -5,7 +5,7 @@ from logging import getLogger from operator import itemgetter from urllib.parse import urlparse -from mwmbl.indexer.index import tokenize +from mwmbl.tokenizer import tokenize, get_bigrams from mwmbl.tinysearchengine.completer import Completer from mwmbl.hn_top_domains_filtered import DOMAINS from mwmbl.tinysearchengine.indexer import TinyIndex, Document @@ -35,7 +35,7 @@ def _get_query_regex(terms, is_complete, is_url): return pattern -def _score_result(terms: list[str], result: Document, is_complete: bool): +def score_result(terms: list[str], result: Document, is_complete: bool): features = get_features(terms, result.title, result.url, result.extract, result.score, is_complete) length_penalty = math.e ** (-LENGTH_PENALTY * len(result.url)) @@ -116,7 +116,7 @@ def order_results(terms: list[str], results: list[Document], is_complete: bool) if len(results) == 0: return [] - results_and_scores = [(_score_result(terms, result, is_complete), result) for result in results] + results_and_scores = [(score_result(terms, result, is_complete), result) for result in results] ordered_results = sorted(results_and_scores, key=itemgetter(0), reverse=True) filtered_results = [result for score, result in ordered_results if score > SCORE_THRESHOLD] return filtered_results @@ -181,16 +181,18 @@ class Ranker: completions = [] retrieval_terms = set(terms) + bigrams = set(get_bigrams(len(terms), terms)) + pages = [] seen_items = set() - for term in retrieval_terms: + for term in retrieval_terms | bigrams: items = self.tiny_index.retrieve(term) if items is not None: for item in items: - if term in item.title.lower() or term in item.extract.lower(): - if item.title not in seen_items: - pages.append(item) - seen_items.add(item.title) + # if term in item.title.lower() or term in item.extract.lower(): + if item.title not in seen_items: + pages.append(item) + seen_items.add(item.title) ordered_results = self.order_results(terms, pages, is_complete) return ordered_results, terms, completions diff --git a/mwmbl/tokenizer.py b/mwmbl/tokenizer.py new file mode 100644 index 0000000..c695a0d --- /dev/null +++ b/mwmbl/tokenizer.py @@ -0,0 +1,13 @@ +def tokenize(input_text): + cleaned_text = input_text.encode('utf8', 'replace').decode('utf8') + tokens = cleaned_text.lower().split() + if input_text.endswith('…'): + # Discard the last two tokens since there will likely be a word cut in two + tokens = tokens[:-2] + return tokens + + +def get_bigrams(num_bigrams, tokens): + num_bigrams = min(num_bigrams, len(tokens) - 1) + bigrams = [f'{tokens[i]} {tokens[i + 1]}' for i in range(num_bigrams)] + return bigrams From cf253ae524971612ecb4c655f5a277efdd0023b6 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Fri, 26 Aug 2022 22:20:35 +0100 Subject: [PATCH 6/6] Split out URL updating from indexing --- mwmbl/background.py | 6 ++- mwmbl/indexer/index_batches.py | 86 +++++----------------------------- mwmbl/indexer/indexdb.py | 7 +-- mwmbl/indexer/process_batch.py | 29 ++++++++++++ mwmbl/indexer/update_urls.py | 67 ++++++++++++++++++++++++++ mwmbl/settings.py | 1 + 6 files changed, 119 insertions(+), 77 deletions(-) create mode 100644 mwmbl/indexer/process_batch.py create mode 100644 mwmbl/indexer/update_urls.py diff --git a/mwmbl/background.py b/mwmbl/background.py index b1218d4..cdb7489 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -6,7 +6,7 @@ from multiprocessing import Queue from pathlib import Path from time import sleep -from mwmbl.indexer import index_batches, historical +from mwmbl.indexer import index_batches, historical, update_urls from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME from mwmbl.url_queue import update_url_queue, initialize_url_queue @@ -28,6 +28,10 @@ def run(data_path: str, url_queue: Queue): batch_cache.retrieve_batches(num_batches=10000) except Exception: logger.exception("Error retrieving batches") + try: + update_urls.run(batch_cache) + except Exception: + logger.exception("Error updating URLs") try: index_batches.run(batch_cache, index_path) except Exception: diff --git a/mwmbl/indexer/index_batches.py b/mwmbl/indexer/index_batches.py index 7c85da2..24880bb 100644 --- a/mwmbl/indexer/index_batches.py +++ b/mwmbl/indexer/index_batches.py @@ -2,32 +2,25 @@ Index batches that are stored locally. """ from collections import defaultdict -from datetime import datetime, timezone, timedelta from logging import getLogger -from typing import Iterable -from urllib.parse import urlparse +from typing import Collection, Iterable import spacy +from mwmbl.indexer import process_batch from spacy import Language from mwmbl.crawler.batch import HashedBatch, Item -from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL +from mwmbl.crawler.urls import URLDatabase, URLStatus from mwmbl.database import Database -from mwmbl.hn_top_domains_filtered import DOMAINS from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.index import tokenize_document -from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase -from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, SCORE_FOR_SAME_DOMAIN, SCORE_FOR_DIFFERENT_DOMAIN, \ - SCORE_FOR_ROOT_PATH +from mwmbl.indexer.indexdb import BatchStatus from mwmbl.tinysearchengine.indexer import Document, TinyIndex logger = getLogger(__name__) -EXCLUDED_DOMAINS = {'web.archive.org'} - - -def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple[str, str, str]]: +def get_documents_from_batches(batches: Collection[HashedBatch]) -> Iterable[tuple[str, str, str]]: for batch in batches: for item in batch.items: if item.content is not None: @@ -35,28 +28,18 @@ def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple def run(batch_cache: BatchCache, index_path: str): - nlp = spacy.load("en_core_web_sm") - with Database() as db: - index_db = IndexDatabase(db.connection) - logger.info("Getting local batches") - batches = index_db.get_batches_by_status(BatchStatus.LOCAL, 10000) - logger.info(f"Got {len(batches)} batch urls") - if len(batches) == 0: - return + def process(batches: Collection[HashedBatch]): + with Database() as db: + nlp = spacy.load("en_core_web_sm") + url_db = URLDatabase(db.connection) + index_batches(batches, index_path, nlp, url_db) + logger.info("Indexed pages") - batch_data = batch_cache.get_cached([batch.url for batch in batches]) - logger.info(f"Got {len(batch_data)} cached batches") - - record_urls_in_database(batch_data.values()) - - url_db = URLDatabase(db.connection) - index_batches(batch_data.values(), index_path, nlp, url_db) - logger.info("Indexed pages") - index_db.update_batch_status([batch.url for batch in batches], BatchStatus.INDEXED) + process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process) -def index_batches(batch_data: Iterable[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase): +def index_batches(batch_data: Collection[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase): document_tuples = list(get_documents_from_batches(batch_data)) urls = [url for title, url, extract in document_tuples] logger.info(f"Got {len(urls)} document tuples") @@ -108,49 +91,6 @@ def get_url_error_status(item: Item): return URLStatus.ERROR_OTHER -def record_urls_in_database(batches: Iterable[HashedBatch]): - with Database() as db: - url_db = URLDatabase(db.connection) - url_scores = defaultdict(float) - url_users = {} - url_timestamps = {} - url_statuses = defaultdict(lambda: URLStatus.NEW) - for batch in batches: - for item in batch.items: - timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0) - url_timestamps[item.url] = timestamp - url_users[item.url] = batch.user_id_hash - if item.content is None: - url_statuses[item.url] = get_url_error_status(item) - else: - url_statuses[item.url] = URLStatus.CRAWLED - crawled_page_domain = urlparse(item.url).netloc - score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER - for link in item.content.links: - if parsed_link.netloc in EXCLUDED_DOMAINS: - continue - - parsed_link = urlparse(link) - score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN - url_scores[link] += score * score_multiplier - url_users[link] = batch.user_id_hash - url_timestamps[link] = timestamp - domain = f'{parsed_link.scheme}://{parsed_link.netloc}/' - url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier - url_users[domain] = batch.user_id_hash - url_timestamps[domain] = timestamp - - found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url]) - for url in url_scores.keys() | url_statuses.keys()] - - url_db.update_found_urls(found_urls) - - -def get_datetime_from_timestamp(timestamp: float) -> datetime: - batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp) - return batch_datetime - - # TODO: clean unicode at some point def clean_unicode(s: str) -> str: return s.encode('utf-8', 'ignore').decode('utf-8') \ No newline at end of file diff --git a/mwmbl/indexer/indexdb.py b/mwmbl/indexer/indexdb.py index 8bff0ec..b31f097 100644 --- a/mwmbl/indexer/indexdb.py +++ b/mwmbl/indexer/indexdb.py @@ -8,9 +8,10 @@ from psycopg2.extras import execute_values class BatchStatus(Enum): - REMOTE = 0 # The batch only exists in long term storage - LOCAL = 1 # We have a copy of the batch locally in Postgresql - INDEXED = 2 + REMOTE = 0 # The batch only exists in long term storage + LOCAL = 10 # We have a copy of the batch locally in Postgresql + URLS_UPDATED = 20 # We've updated URLs from the batch + INDEXED = 30 # The batch has been indexed @dataclass diff --git a/mwmbl/indexer/process_batch.py b/mwmbl/indexer/process_batch.py new file mode 100644 index 0000000..f9e12a7 --- /dev/null +++ b/mwmbl/indexer/process_batch.py @@ -0,0 +1,29 @@ +from logging import getLogger +from typing import Callable, Collection + +from mwmbl.crawler.batch import HashedBatch +from mwmbl.database import Database +from mwmbl.indexer.batch_cache import BatchCache +from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase + +logger = getLogger(__name__) + + +def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus, + process: Callable[[Collection[HashedBatch]], None]): + + with Database() as db: + index_db = IndexDatabase(db.connection) + + logger.info(f"Getting batches with status {start_status}") + batches = index_db.get_batches_by_status(start_status, 10000) + logger.info(f"Got {len(batches)} batch urls") + if len(batches) == 0: + return + + batch_data = batch_cache.get_cached([batch.url for batch in batches]) + logger.info(f"Got {len(batch_data)} cached batches") + + process(batch_data.values()) + + index_db.update_batch_status([batch.url for batch in batches], end_status) diff --git a/mwmbl/indexer/update_urls.py b/mwmbl/indexer/update_urls.py new file mode 100644 index 0000000..a93084a --- /dev/null +++ b/mwmbl/indexer/update_urls.py @@ -0,0 +1,67 @@ +from collections import defaultdict +from datetime import datetime, timezone, timedelta +from logging import getLogger +from typing import Iterable, Collection +from urllib.parse import urlparse + +from mwmbl.crawler.batch import HashedBatch +from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL +from mwmbl.database import Database +from mwmbl.hn_top_domains_filtered import DOMAINS +from mwmbl.indexer import process_batch +from mwmbl.indexer.batch_cache import BatchCache +from mwmbl.indexer.index_batches import get_url_error_status +from mwmbl.indexer.indexdb import BatchStatus +from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \ + SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH + + +logger = getLogger(__name__) + + +def run(batch_cache: BatchCache): + process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database) + + +def record_urls_in_database(batches: Collection[HashedBatch]): + logger.info(f"Recording URLs in database for {len(batches)} batches") + with Database() as db: + url_db = URLDatabase(db.connection) + url_scores = defaultdict(float) + url_users = {} + url_timestamps = {} + url_statuses = defaultdict(lambda: URLStatus.NEW) + for batch in batches: + for item in batch.items: + timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0) + url_timestamps[item.url] = timestamp + url_users[item.url] = batch.user_id_hash + if item.content is None: + url_statuses[item.url] = get_url_error_status(item) + else: + url_statuses[item.url] = URLStatus.CRAWLED + crawled_page_domain = urlparse(item.url).netloc + score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER + for link in item.content.links: + parsed_link = urlparse(link) + if parsed_link.netloc in EXCLUDED_DOMAINS: + continue + + score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN + url_scores[link] += score * score_multiplier + url_users[link] = batch.user_id_hash + url_timestamps[link] = timestamp + domain = f'{parsed_link.scheme}://{parsed_link.netloc}/' + url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier + url_users[domain] = batch.user_id_hash + url_timestamps[domain] = timestamp + + found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url]) + for url in url_scores.keys() | url_statuses.keys()] + + url_db.update_found_urls(found_urls) + + +def get_datetime_from_timestamp(timestamp: float) -> datetime: + batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp) + return batch_datetime diff --git a/mwmbl/settings.py b/mwmbl/settings.py index 83b8032..8b63bb5 100644 --- a/mwmbl/settings.py +++ b/mwmbl/settings.py @@ -27,3 +27,4 @@ SCORE_FOR_ROOT_PATH = 0.1 SCORE_FOR_DIFFERENT_DOMAIN = 1.0 SCORE_FOR_SAME_DOMAIN = 0.01 UNKNOWN_DOMAIN_MULTIPLIER = 0.001 +EXCLUDED_DOMAINS = {'web.archive.org'}