diff --git a/analyse/inspect_index.py b/analyse/inspect_index.py index 18a5a96..00c0273 100644 --- a/analyse/inspect_index.py +++ b/analyse/inspect_index.py @@ -1,20 +1,50 @@ -from mwmbl.tinysearchengine.indexer import TinyIndex, NUM_PAGES, PAGE_SIZE, Document +import logging +import sys + +import spacy + +from mwmbl.indexer.index import tokenize_document from mwmbl.indexer.paths import INDEX_PATH +from mwmbl.tinysearchengine.indexer import TinyIndex, Document + + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) +nlp = spacy.load("en_core_web_sm") + + +def store(): + document = Document( + title='A nation in search of the new black | Theatre | The Guardian', + url='https://www.theguardian.com/stage/2007/nov/18/theatre', + extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…", + score=1.0 + ) + with TinyIndex(Document, INDEX_PATH, 'w') as tiny_index: + tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp) + print("Tokenized", tokenized) + # for token in tokenized.tokens: + # + # tiny_index.index(token, document) def get_items(): - tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE) - items = tiny_index.retrieve('soup') - if items: - for item in items: - print("Items", item) + with TinyIndex(Document, INDEX_PATH) as tiny_index: + items = tiny_index.retrieve('search') + if items: + for item in items: + print("Items", item) def run(): - tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE) - for i in range(100): - tiny_index.get_page(i) + with TinyIndex(Document, INDEX_PATH) as tiny_index: + for i in range(100000): + page = tiny_index.get_page(i) + for item in page: + if ' search' in item.title: + print("Page", i, item) if __name__ == '__main__': - run() + # store() + # run() + get_items() diff --git a/analyse/send_batch.py b/analyse/send_batch.py new file mode 100644 index 0000000..9191834 --- /dev/null +++ b/analyse/send_batch.py @@ -0,0 +1,27 @@ +""" +Send a batch to a running instance. +""" +import requests + +from mwmbl.crawler.batch import Batch, Item, ItemContent + + +URL = 'http://localhost:5000/crawler/batches/' + + +def run(): + batch = Batch(user_id='test_user_id111111111111111111111111', items=[Item( + url='https://www.theguardian.com/stage/2007/nov/18/theatre', + content=ItemContent( + title='A nation in search of the new black | Theatre | The Guardian', + extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…", + links=[]), + timestamp=123456, + status=200, + )]) + result = requests.post(URL, data=batch.json()) + print("Result", result.content) + + +if __name__ == '__main__': + run() diff --git a/mwmbl/background.py b/mwmbl/background.py index 89f4829..09b92da 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -13,18 +13,18 @@ logger = getLogger(__name__) def run(index_path: str): - historical.run() + # historical.run() while True: - try: - retrieve_batches() - except Exception: - logger.exception("Error retrieving batches") + # try: + # retrieve_batches() + # except Exception: + # logger.exception("Error retrieving batches") try: run_preprocessing(index_path) except Exception: logger.exception("Error preprocessing") - try: - run_update(index_path) - except Exception: - logger.exception("Error running index update") + # try: + # run_update(index_path) + # except Exception: + # logger.exception("Error running index update") sleep(10) diff --git a/mwmbl/crawler/app.py b/mwmbl/crawler/app.py index c3e94bb..987bc52 100644 --- a/mwmbl/crawler/app.py +++ b/mwmbl/crawler/app.py @@ -277,7 +277,6 @@ def status(): def queue_batch(batch: HashedBatch): # TODO: get the score from the URLs database - # TODO: also queue documents for batches sent through the API documents = [Document(item.content.title, item.url, item.content.extract, 1) for item in batch.items if item.content is not None] with Database() as db: diff --git a/mwmbl/indexer/preprocess.py b/mwmbl/indexer/preprocess.py index fbe524c..88f267c 100644 --- a/mwmbl/indexer/preprocess.py +++ b/mwmbl/indexer/preprocess.py @@ -2,6 +2,7 @@ Preprocess local documents for indexing. """ import traceback +from logging import getLogger from time import sleep import spacy @@ -12,6 +13,9 @@ from mwmbl.indexer.index import tokenize_document from mwmbl.tinysearchengine.indexer import TinyIndex, Document +logger = getLogger(__name__) + + def run(index_path): while True: try: @@ -34,7 +38,9 @@ def run_preprocessing(index_path): with TinyIndex(Document, index_path, 'w') as indexer: for document in documents: tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp) + logger.debug(f"Tokenized: {tokenized}") page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens] + logger.debug(f"Page indexes: {page_indexes}") index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes]) diff --git a/mwmbl/indexer/update_pages.py b/mwmbl/indexer/update_pages.py index 56ac8d4..a11a216 100644 --- a/mwmbl/indexer/update_pages.py +++ b/mwmbl/indexer/update_pages.py @@ -30,6 +30,7 @@ def run_update(index_path): except ValueError: documents = documents[:len(documents)//2] if len(documents) == 0: + print("No more space") break print(f"Not enough space, adding {len(documents)}") index_db.clear_queued_documents_for_page(i) diff --git a/mwmbl/main.py b/mwmbl/main.py index f56c3da..3b5b19b 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -1,6 +1,7 @@ import argparse import logging import os +import sys from multiprocessing import Process import uvicorn @@ -14,7 +15,7 @@ from mwmbl.tinysearchengine.completer import Completer from mwmbl.tinysearchengine.indexer import TinyIndex, Document, NUM_PAGES, PAGE_SIZE from mwmbl.tinysearchengine.rank import HeuristicRanker -logging.basicConfig() +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) def setup_args(): diff --git a/mwmbl/tinysearchengine/indexer.py b/mwmbl/tinysearchengine/indexer.py index 00ebd9e..c68f96e 100644 --- a/mwmbl/tinysearchengine/indexer.py +++ b/mwmbl/tinysearchengine/indexer.py @@ -2,6 +2,7 @@ import json import os from dataclasses import astuple, dataclass, asdict from io import UnsupportedOperation +from logging import getLogger from mmap import mmap, PROT_READ, PROT_WRITE from typing import TypeVar, Generic, Callable, List @@ -16,6 +17,9 @@ NUM_PAGES = 5_120_000 PAGE_SIZE = 4096 +logger = getLogger(__name__) + + @dataclass class Document: title: str @@ -92,6 +96,7 @@ class TinyIndex(Generic[T]): self.page_size = metadata.page_size self.compressor = ZstdCompressor() self.decompressor = ZstdDecompressor() + logger.info(f"Loaded index with {self.num_pages} pages and {self.page_size} page size") self.index_file = None self.mmap = None @@ -107,13 +112,14 @@ class TinyIndex(Generic[T]): def retrieve(self, key: str) -> List[T]: index = self.get_key_page_index(key) + logger.debug(f"Retrieving index {index}") return self.get_page(index) def get_key_page_index(self, key) -> int: key_hash = mmh3.hash(key, signed=False) return key_hash % self.num_pages - def get_page(self, i): + def get_page(self, i) -> list[T]: """ Get the page at index i, decompress and deserialise it using JSON """ @@ -123,6 +129,7 @@ class TinyIndex(Generic[T]): def _get_page_tuples(self, i): page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size] decompressed_data = self.decompressor.decompress(page_data) + # logger.debug(f"Decompressed data: {decompressed_data}") return json.loads(decompressed_data.decode('utf8')) def index(self, key: str, value: T):