Add an error state

This commit is contained in:
Daoud Clarke 2021-12-14 19:59:31 +00:00
parent 2844c1df75
commit 869127c6ec
3 changed files with 40 additions and 6 deletions

View file

@ -14,11 +14,18 @@ from pathlib import Path
from zstandard import ZstdCompressor, ZstdDecompressor
class FSQueueError(Exception):
def __init__(self, item_id, message):
super().__init__(message)
self.item_id = item_id
class FSState(Enum):
CREATING = 'creating'
READY = 'ready'
LOCKED = 'locked'
DONE = 'done'
ERROR = 'error'
class Serializer(ABC):
@ -107,15 +114,23 @@ class FSQueue:
with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
print("Opening file", path.name)
return path.name, self.serializer.deserialize(item_file.read())
try:
return path.name, self.serializer.deserialize(item_file.read())
except Exception as e:
raise FSQueueError(path.name, 'Error deserializing item') from e
def done(self, item_id: str):
"""
Mark a task/file as done
"""
self._move(item_id, FSState.LOCKED, FSState.DONE)
def error(self, item_id: str):
"""
Mark a task/file as in error state
"""
self._move(item_id, FSState.LOCKED, FSState.ERROR)
def unlock_all(self):
paths = sorted(Path(self._get_dir(FSState.LOCKED)).iterdir(), key=os.path.getmtime)

View file

@ -16,8 +16,11 @@ import mmh3
import pandas as pd
from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError
NUM_PAGES = 8192
PAGE_SIZE = 512
# NUM_PAGES = 8192
# PAGE_SIZE = 512
NUM_PAGES = 25600
PAGE_SIZE = 4096
NUM_INITIAL_TOKENS = 50

View file

@ -1,14 +1,21 @@
"""
Index data downloaded from Common Crawl
"""
import logging
import sys
from logging import getLogger
import spacy
from fsqueue import FSQueue, GzipJsonRowSerializer
from fsqueue import FSQueue, GzipJsonRowSerializer, FSQueueError
from index import TinyIndexer, index_titles_and_urls, PAGE_SIZE, NUM_PAGES, Document
from paths import INDEX_PATH, DATA_DIR, COMMON_CRAWL_TERMS_PATH
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logger = getLogger(__name__)
def index_common_craw_data():
nlp = spacy.load("en_core_web_sm")
@ -19,13 +26,22 @@ def index_common_craw_data():
def get_common_crawl_titles_and_urls():
input_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
input_queue.unlock_all()
while True:
next_item = input_queue.get()
try:
next_item = input_queue.get()
except FSQueueError as e:
logger.exception(f'Error with item {e.item_id}')
input_queue.error(e.item_id)
continue
if next_item is None:
logger.info('Not more items to process, stopping')
break
item_id, items = next_item
logger.info(f'Processing item {item_id}')
for url, title, extract in items:
yield title, url
input_queue.done(item_id)
if __name__ == '__main__':