diff --git a/fsqueue.py b/fsqueue.py index 018f62b..d81d869 100644 --- a/fsqueue.py +++ b/fsqueue.py @@ -14,11 +14,18 @@ from pathlib import Path from zstandard import ZstdCompressor, ZstdDecompressor +class FSQueueError(Exception): + def __init__(self, item_id, message): + super().__init__(message) + self.item_id = item_id + + class FSState(Enum): CREATING = 'creating' READY = 'ready' LOCKED = 'locked' DONE = 'done' + ERROR = 'error' class Serializer(ABC): @@ -107,15 +114,23 @@ class FSQueue: with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file: print("Opening file", path.name) - return path.name, self.serializer.deserialize(item_file.read()) + try: + return path.name, self.serializer.deserialize(item_file.read()) + except Exception as e: + raise FSQueueError(path.name, 'Error deserializing item') from e def done(self, item_id: str): """ Mark a task/file as done """ - self._move(item_id, FSState.LOCKED, FSState.DONE) + def error(self, item_id: str): + """ + Mark a task/file as in error state + """ + self._move(item_id, FSState.LOCKED, FSState.ERROR) + def unlock_all(self): paths = sorted(Path(self._get_dir(FSState.LOCKED)).iterdir(), key=os.path.getmtime) diff --git a/index.py b/index.py index 5ab8313..8db9b4e 100644 --- a/index.py +++ b/index.py @@ -16,8 +16,11 @@ import mmh3 import pandas as pd from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError -NUM_PAGES = 8192 -PAGE_SIZE = 512 +# NUM_PAGES = 8192 +# PAGE_SIZE = 512 +NUM_PAGES = 25600 +PAGE_SIZE = 4096 + NUM_INITIAL_TOKENS = 50 diff --git a/indexcc.py b/indexcc.py index 574be5a..efd6299 100644 --- a/indexcc.py +++ b/indexcc.py @@ -1,14 +1,21 @@ """ Index data downloaded from Common Crawl """ +import logging +import sys +from logging import getLogger import spacy -from fsqueue import FSQueue, GzipJsonRowSerializer +from fsqueue import FSQueue, GzipJsonRowSerializer, FSQueueError from index import TinyIndexer, index_titles_and_urls, PAGE_SIZE, NUM_PAGES, Document from paths import INDEX_PATH, DATA_DIR, COMMON_CRAWL_TERMS_PATH +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) +logger = getLogger(__name__) + + def index_common_craw_data(): nlp = spacy.load("en_core_web_sm") @@ -19,13 +26,22 @@ def index_common_craw_data(): def get_common_crawl_titles_and_urls(): input_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer()) + input_queue.unlock_all() while True: - next_item = input_queue.get() + try: + next_item = input_queue.get() + except FSQueueError as e: + logger.exception(f'Error with item {e.item_id}') + input_queue.error(e.item_id) + continue if next_item is None: + logger.info('Not more items to process, stopping') break item_id, items = next_item + logger.info(f'Processing item {item_id}') for url, title, extract in items: yield title, url + input_queue.done(item_id) if __name__ == '__main__':