diff --git a/analyse/add_term_info.py b/analyse/add_term_info.py new file mode 100644 index 0000000..9cf9795 --- /dev/null +++ b/analyse/add_term_info.py @@ -0,0 +1,51 @@ +""" +Investigate adding term information to the database. + +How much extra space will it take? +""" +import os +from pathlib import Path +from random import Random + +import numpy as np +from scipy.stats import sem + +from mwmbl.tinysearchengine.indexer import TinyIndex, Document, _trim_items_to_page, astuple + +from zstandard import ZstdCompressor + +from mwmbl.utils import add_term_info + +random = Random(1) + +INDEX_PATH = Path(__file__).parent.parent / "devdata" / "index-v2.tinysearch" + + +def run(): + compressor = ZstdCompressor() + with TinyIndex(Document, INDEX_PATH) as index: + # Get some random integers between 0 and index.num_pages: + pages = random.sample(range(index.num_pages), 10000) + + old_sizes = [] + new_sizes = [] + + for i in pages: + page = index.get_page(i) + term_documents = [] + for document in page: + term_document = add_term_info(document, index, i) + term_documents.append(term_document) + + value_tuples = [astuple(value) for value in term_documents] + num_fitting, compressed = _trim_items_to_page(compressor, index.page_size, value_tuples) + + new_sizes.append(num_fitting) + old_sizes.append(len(page)) + + print("Old sizes mean", np.mean(old_sizes), sem(old_sizes)) + print("New sizes mean", np.mean(new_sizes), sem(new_sizes)) + + +if __name__ == '__main__': + run() diff --git a/analyse/index_local.py b/analyse/index_local.py deleted file mode 100644 index 334868d..0000000 --- a/analyse/index_local.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -Index batches stored locally on the filesystem for the purpose of evaluation. -""" -import glob -import gzip -import json -import logging -import os -import sys -from datetime import datetime - -import spacy - -from mwmbl.crawler import HashedBatch -from mwmbl.crawler.urls import URLDatabase -from mwmbl.database import Database -from mwmbl.indexer import index_batches -from mwmbl.tinysearchengine import TinyIndex, Document - -LOCAL_BATCHES_PATH = f'{os.environ["HOME"]}/data/mwmbl/file/**/*.json.gz' -NUM_BATCHES = 10000 -EVALUATE_INDEX_PATH = f'{os.environ["HOME"]}/data/mwmbl/evaluate-index.tinysearch' -NUM_PAGES = 1_024_000 -PAGE_SIZE = 4096 - - -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - - -def get_batches(): - for path in sorted(glob.glob(LOCAL_BATCHES_PATH, recursive=True))[:NUM_BATCHES]: - data = json.load(gzip.open(path)) - yield HashedBatch.parse_obj(data) - - -def run(): - try: - os.remove(EVALUATE_INDEX_PATH) - except FileNotFoundError: - pass - TinyIndex.create(item_factory=Document, index_path=EVALUATE_INDEX_PATH, num_pages=NUM_PAGES, page_size=PAGE_SIZE) - - batches = get_batches() - - start = datetime.now() - with Database() as db: - nlp = spacy.load("en_core_web_sm") - url_db = URLDatabase(db.connection) - index_batches(batches, EVALUATE_INDEX_PATH, nlp, url_db) - end = datetime.now() - - total_time = (end - start).total_seconds() - print("total_seconds:", total_time) - - -if __name__ == '__main__': - run() diff --git a/analyse/inspect_index.py b/analyse/inspect_index.py deleted file mode 100644 index c48ad22..0000000 --- a/analyse/inspect_index.py +++ /dev/null @@ -1,60 +0,0 @@ -import logging -import sys - -import numpy as np -import spacy - -from analyse.index_local import EVALUATE_INDEX_PATH -from mwmbl.indexer import tokenize_document -from mwmbl.indexer import INDEX_PATH -from mwmbl.tinysearchengine import TinyIndex, Document - - -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) -nlp = spacy.load("en_core_web_sm") - - -def store(): - document = Document( - title='A nation in search of the new black | Theatre | The Guardian', - url='https://www.theguardian.com/stage/2007/nov/18/theatre', - extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…", - score=1.0 - ) - with TinyIndex(Document, INDEX_PATH, 'w') as tiny_index: - tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp) - print("Tokenized", tokenized) - # for token in tokenized.tokens: - # - # tiny_index.index(token, document) - - -def get_items(): - with TinyIndex(Document, INDEX_PATH) as tiny_index: - items = tiny_index.retrieve('wikipedia') - if items: - for item in items: - print("Items", item) - - -def run(index_path): - with TinyIndex(Document, index_path) as tiny_index: - sizes = {} - for i in range(tiny_index.num_pages): - page = tiny_index.get_page(i) - if page: - sizes[i] = len(page) - if len(page) > 50: - print("Page", len(page), page) - # for item in page: - # if ' search' in item.title: - # print("Page", i, item) - print("Max", max(sizes.values())) - print("Top", sorted(sizes.values())[-100:]) - print("Mean", np.mean(list(sizes.values()))) - - -if __name__ == '__main__': - # store() - run(EVALUATE_INDEX_PATH) - # get_items() diff --git a/devdata/index-v2.tinysearch b/devdata/index-v2.tinysearch index ebf9ba9..d7a3690 100644 Binary files a/devdata/index-v2.tinysearch and b/devdata/index-v2.tinysearch differ diff --git a/mwmbl/apps.py b/mwmbl/apps.py index 30a95a7..d19c46d 100644 --- a/mwmbl/apps.py +++ b/mwmbl/apps.py @@ -6,6 +6,10 @@ from pathlib import Path from django.apps import AppConfig from django.conf import settings +from mwmbl.crawler.urls import URLDatabase +from mwmbl.database import Database +from mwmbl.indexer.indexdb import IndexDatabase + class MwmblConfig(AppConfig): name = "mwmbl" @@ -31,6 +35,12 @@ class MwmblConfig(AppConfig): TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=settings.NUM_PAGES, page_size=PAGE_SIZE) + with Database() as db: + url_db = URLDatabase(db.connection) + url_db.create_tables() + index_db = IndexDatabase(db.connection) + index_db.create_tables() + if settings.RUN_BACKGROUND_PROCESSES: new_item_queue = Queue() Process(target=background.run, args=(settings.DATA_PATH,)).start() diff --git a/mwmbl/background.py b/mwmbl/background.py index f10f0d2..723dd18 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -1,7 +1,9 @@ """ Script that updates data in a background process. """ -from logging import getLogger +import logging +import sys +from logging import getLogger, basicConfig from pathlib import Path from time import sleep @@ -11,6 +13,8 @@ from mwmbl.indexer import index_batches, historical from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME + +basicConfig(stream=sys.stdout, level=logging.INFO) logger = getLogger(__name__) diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index fb61405..ce94c5c 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -49,7 +49,7 @@ def prepare_url_for_tokenizing(url: str): def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedDocument]: for i, (title_cleaned, url, extract) in enumerate(titles_urls_and_extracts): score = link_counts.get(url, DEFAULT_SCORE) - yield tokenize_document(url, title_cleaned, extract, score, nlp) + yield tokenize_document(url, title_cleaned, extract, score) if i % 1000 == 0: print("Processed", i) @@ -61,7 +61,7 @@ def get_index_tokens(tokens): return set(first_tokens + bigrams) -def tokenize_document(url, title_cleaned, extract, score, nlp): +def tokenize_document(url, title_cleaned, extract, score): title_tokens = tokenize(title_cleaned) prepared_url = prepare_url_for_tokenizing(unquote(url)) url_tokens = tokenize(prepared_url) diff --git a/mwmbl/indexer/index_batches.py b/mwmbl/indexer/index_batches.py index a6e0488..248b749 100644 --- a/mwmbl/indexer/index_batches.py +++ b/mwmbl/indexer/index_batches.py @@ -16,6 +16,7 @@ from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.index import tokenize_document from mwmbl.indexer.indexdb import BatchStatus from mwmbl.tinysearchengine.indexer import Document, TinyIndex +from mwmbl.utils import add_term_info, add_term_infos logger = getLogger(__name__) @@ -31,22 +32,20 @@ def run(batch_cache: BatchCache, index_path: str): def process(batches: Collection[HashedBatch]): with Database() as db: - nlp = spacy.load("en_core_web_sm") url_db = URLDatabase(db.connection) - index_batches(batches, index_path, nlp, url_db) + index_batches(batches, index_path, url_db) logger.info("Indexed pages") process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process) -def index_batches(batch_data: Collection[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase): +def index_batches(batch_data: Collection[HashedBatch], index_path: str, url_db: URLDatabase): document_tuples = list(get_documents_from_batches(batch_data)) urls = [url for title, url, extract in document_tuples] - logger.info(f"Got {len(urls)} document tuples") url_scores = url_db.get_url_scores(urls) - logger.info(f"Got {len(url_scores)} scores") + logger.info(f"Indexing {len(urls)} document tuples and {len(url_scores)} URL scores") documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples] - page_documents = preprocess_documents(documents, index_path, nlp) + page_documents = preprocess_documents(documents, index_path) index_pages(index_path, page_documents) @@ -58,24 +57,27 @@ def index_pages(index_path, page_documents): seen_urls = set() seen_titles = set() sorted_documents = sorted(documents + existing_documents, key=lambda x: x.score, reverse=True) - for document in sorted_documents: + # TODO: for now we add the term here, until all the documents in the index have terms + sorted_documents_with_terms = add_term_infos(sorted_documents, indexer, page) + for document in sorted_documents_with_terms: if document.title in seen_titles or document.url in seen_urls: continue new_documents.append(document) seen_urls.add(document.url) seen_titles.add(document.title) + logger.info(f"Storing {len(new_documents)} documents for page {page}, originally {len(existing_documents)}") indexer.store_in_page(page, new_documents) -def preprocess_documents(documents, index_path, nlp): +def preprocess_documents(documents, index_path): page_documents = defaultdict(list) with TinyIndex(Document, index_path, 'w') as indexer: for document in documents: - tokenized = tokenize_document(document.url, document.title, document.extract, document.score, nlp) - # logger.debug(f"Tokenized: {tokenized}") - page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens] - for page in page_indexes: - page_documents[page].append(document) + tokenized = tokenize_document(document.url, document.title, document.extract, document.score) + for token in tokenized.tokens: + page = indexer.get_key_page_index(token) + term_document = Document(document.title, document.url, document.extract, document.score, token) + page_documents[page].append(term_document) print(f"Preprocessed for {len(page_documents)} pages") return page_documents diff --git a/mwmbl/indexer/update_urls.py b/mwmbl/indexer/update_urls.py index 69605ba..fc8c2d0 100644 --- a/mwmbl/indexer/update_urls.py +++ b/mwmbl/indexer/update_urls.py @@ -86,7 +86,7 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu def process_link(user_id_hash, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool, blacklist_domains): parsed_link = urlparse(link) if is_domain_blacklisted(parsed_link.netloc, blacklist_domains): - logger.info(f"Excluding link for blacklisted domain: {parsed_link}") + logger.debug(f"Excluding link for blacklisted domain: {parsed_link}") return extra_multiplier = EXTRA_LINK_MULTIPLIER if is_extra else 1.0 diff --git a/mwmbl/platform/curate.py b/mwmbl/platform/curate.py index b9aedaa..fb83d28 100644 --- a/mwmbl/platform/curate.py +++ b/mwmbl/platform/curate.py @@ -1,3 +1,4 @@ +from logging import getLogger from typing import Any from urllib.parse import parse_qs @@ -9,11 +10,15 @@ from mwmbl.platform.data import CurateBegin, CurateMove, CurateDelete, CurateAdd make_curation_type from mwmbl.tinysearchengine.indexer import TinyIndex, Document from mwmbl.tokenizer import tokenize +from mwmbl.utils import add_term_info, add_term_infos RESULT_URL = "https://mwmbl.org/?q=" MAX_CURATED_SCORE = 1_111_111.0 +logger = getLogger(__name__) + + def create_router(index_path: str) -> Router: router = Router(tags=["user"]) @@ -58,20 +63,24 @@ def create_router(index_path: str) -> Router: raise ValueError(f"Should be one query value in the URL: {curation.url}") query = queries[0] - print("Query", query) tokens = tokenize(query) - print("Tokens", tokens) term = " ".join(tokens) - print("Key", term) documents = [ Document(result.title, result.url, result.extract, MAX_CURATED_SCORE - i, term, result.curated) for i, result in enumerate(curation.results) ] + page_index = indexer.get_key_page_index(term) - print("Page index", page_index) - print("Storing documents", documents) - indexer.store_in_page(page_index, documents) + existing_documents_no_terms = indexer.get_page(page_index) + existing_documents = add_term_infos(existing_documents_no_terms, indexer, page_index) + other_documents = [doc for doc in existing_documents if doc.term != term] + logger.info(f"Found {len(other_documents)} other documents for term {term} at page {page_index} " + f"with terms { {doc.term for doc in other_documents} }") + + all_documents = documents + other_documents + logger.info(f"Storing {len(all_documents)} documents at page {page_index}") + indexer.store_in_page(page_index, all_documents) return {"curation": "ok"} diff --git a/mwmbl/tinysearchengine/indexer.py b/mwmbl/tinysearchengine/indexer.py index ffe508b..4605247 100644 --- a/mwmbl/tinysearchengine/indexer.py +++ b/mwmbl/tinysearchengine/indexer.py @@ -79,6 +79,7 @@ class TinyIndexMetadata: values = json.loads(data[constant_length:].decode('utf8')) return TinyIndexMetadata(**values) + # Find the optimal amount of data that fits onto a page # We do this by leveraging binary search to quickly find the index where: # - index+1 cannot fit onto a page @@ -106,10 +107,12 @@ def _binary_search_fitting_size(compressor: ZstdCompressor, page_size: int, item # No better match, use our index return mid, compressed_data + def _trim_items_to_page(compressor: ZstdCompressor, page_size: int, items:list[T]): # Find max number of items that fit on a page return _binary_search_fitting_size(compressor, page_size, items, 0, len(items)) + def _get_page_data(compressor: ZstdCompressor, page_size: int, items: list[T]): num_fitting, serialised_data = _trim_items_to_page(compressor, page_size, items) @@ -186,7 +189,6 @@ class TinyIndex(Generic[T]): except ZstdError: logger.exception(f"Error decompressing page data, content: {page_data}") return [] - # logger.debug(f"Decompressed data: {decompressed_data}") return json.loads(decompressed_data.decode('utf8')) def store_in_page(self, page_index: int, values: list[T]): diff --git a/mwmbl/utils.py b/mwmbl/utils.py index 98e15e3..12cc051 100644 --- a/mwmbl/utils.py +++ b/mwmbl/utils.py @@ -1,5 +1,8 @@ import re +from mwmbl.indexer.index import tokenize_document +from mwmbl.tinysearchengine.indexer import Document, TinyIndex + DOMAIN_REGEX = re.compile(r".*://([^/]*)") @@ -17,3 +20,23 @@ def get_domain(url): if results is None or len(results.groups()) == 0: raise ValueError(f"Unable to parse domain from URL {url}") return results.group(1) + + +def add_term_info(document: Document, index: TinyIndex, page_index: int): + tokenized = tokenize_document(document.url, document.title, document.extract, document.score) + for token in tokenized.tokens: + token_page_index = index.get_key_page_index(token) + if token_page_index == page_index: + return Document(document.title, document.url, document.extract, document.score, token) + raise ValueError("Could not find token in page index") + + +def add_term_infos(documents: list[Document], index: TinyIndex, page_index: int): + for document in documents: + if document.term is not None: + yield document + continue + try: + yield add_term_info(document, index, page_index) + except ValueError: + continue