diff --git a/domains/domain_titles.py b/domains/domain_titles.py index 13c9291..62232b3 100644 --- a/domains/domain_titles.py +++ b/domains/domain_titles.py @@ -1,17 +1,15 @@ """ Retrieve titles for each domain in the list of top domains """ -import pickle from multiprocessing import Process from time import sleep from urllib.parse import urlsplit, urlunsplit import bs4 import requests -from persistqueue import SQLiteAckQueue - -from paths import DOMAINS_QUEUE_PATH, DOMAINS_TITLES_QUEUE_PATH +from fsqueue import FSQueue, ZstdJsonSerializer +from paths import DATA_DIR, DOMAINS_QUEUE_NAME, DOMAINS_TITLES_QUEUE_NAME NUM_PROCESSES = 10 @@ -35,21 +33,29 @@ def get_redirect_no_cookies(url, max_redirects=5): def get_domain_titles(): - domains_queue = SQLiteAckQueue(DOMAINS_QUEUE_PATH) - titles_queue = SQLiteAckQueue(DOMAINS_TITLES_QUEUE_PATH, multithreading=True) + domains_queue = FSQueue(DATA_DIR, DOMAINS_QUEUE_NAME, ZstdJsonSerializer()) + titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer()) while True: - item = domains_queue.get() + items_id, items = domains_queue.get() + titles = retrieve_titles(items) # print("Item", item) + # print("Title", type(title)) + # print("Title item", str(title_item)) + # print("Dump", pickle.dumps(title_item)) + titles_queue.put(titles) + domains_queue.done(items_id) + print("Done titles", len(titles)) + + +def retrieve_titles(items): + titles = [] + for item in items: rank, domain = item print("Domain", domain, rank) status, title, url = retrieve_title(domain) - # print("Title", type(title)) title_item = dict(rank=rank, domain=domain, status=status, url=url, title=title) - # print("Title item", str(title_item)) - # print("Dump", pickle.dumps(title_item)) - titles_queue.put(title_item) - domains_queue.ack(item) - print("Queued", titles_queue.size) + titles.append(title_item) + return titles def retrieve_title(domain): diff --git a/domains/queue_domains.py b/domains/queue_domains.py index ac627fb..9fe7685 100644 --- a/domains/queue_domains.py +++ b/domains/queue_domains.py @@ -4,9 +4,10 @@ Add domains to the queue to be retrieved import csv import gzip -from persistqueue import SQLiteQueue, SQLiteAckQueue +from fsqueue import FSQueue, ZstdJsonSerializer +from paths import DOMAINS_PATH, DOMAINS_QUEUE_NAME, DATA_DIR -from paths import DOMAINS_QUEUE_PATH, DOMAINS_PATH +BATCH_SIZE = 10000 def get_domains(): @@ -17,12 +18,15 @@ def get_domains(): def queue_domains(): - queue = SQLiteAckQueue(DOMAINS_QUEUE_PATH) + queue = FSQueue(DATA_DIR, DOMAINS_QUEUE_NAME, ZstdJsonSerializer()) queued = 0 + batch = [] for rank, domain in get_domains(): - queue.put((rank, domain)) + batch.append((rank, domain)) queued += 1 - if queued % 1000 == 0: + if queued % BATCH_SIZE == 0: + queue.put(batch) + batch = [] print("Queued:", queued) diff --git a/fsqueue.py b/fsqueue.py new file mode 100644 index 0000000..2482feb --- /dev/null +++ b/fsqueue.py @@ -0,0 +1,97 @@ +""" +Filesystem-based queue that uses os.rename as an atomic operation to ensure +that items are handled correctly. +""" + +import json +import os +from abc import ABC +from enum import Enum +from uuid import uuid4 +from pathlib import Path + +from zstandard import ZstdCompressor, ZstdDecompressor + + +class FSState(Enum): + CREATING = 'creating' + READY = 'ready' + LOCKED = 'locked' + DONE = 'done' + + +class Serializer(ABC): + def serialize(self, item) -> bytes: + pass + + def deserialize(self, serialized_item: bytes): + pass + + +class ZstdJsonSerializer(Serializer): + def __init__(self): + self.compressor = ZstdCompressor() + self.decompressor = ZstdDecompressor() + + def serialize(self, item) -> bytes: + return self.compressor.compress(json.dumps(item).encode('utf8')) + + def deserialize(self, serialized_item: bytes): + return json.loads(self.decompressor.decompress(serialized_item).decode('utf8')) + + +class FSQueue: + def __init__(self, directory: str, name: str, serializer: Serializer): + self.directory = directory + self.name = name + self.serializer = serializer + + if not os.path.isdir(self.directory): + raise ValueError("Given path is not a directory") + + if '/' in name: + raise ValueError("Name should not contain '/'") + + os.makedirs(os.path.join(self.directory, self.name), exist_ok=True) + for state in FSState: + os.makedirs(self._get_dir(state), exist_ok=True) + + def _get_dir(self, state: FSState): + return os.path.join(self.directory, self.name, state.value) + + def _get_path(self, state: FSState, name: str): + return os.path.join(self._get_dir(state), name) + + def _move(self, name: str, old_state: FSState, new_state: FSState): + os.rename(self._get_path(old_state, name), self._get_path(new_state, name)) + + def put(self, item: object): + """ + Push a new item into the ready state + """ + item_id = str(uuid4()) + with open(self._get_path(FSState.CREATING, item_id), 'wb') as output_file: + output_file.write(self.serializer.serialize(item)) + + self._move(item_id, FSState.CREATING, FSState.READY) + + def get(self) -> (str, object): + """ + Get the next priority item from the queue, returning the item ID and the object + """ + + paths = sorted(Path(self._get_dir(FSState.READY)).iterdir(), key=os.path.getmtime) + + for path in paths: + # Try and lock the file + self._move(path.name, FSState.READY, FSState.LOCKED) + + with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file: + return path.name, self.serializer.deserialize(item_file.read()) + + def done(self, item_id: str): + """ + Mark a task/file as done + """ + + self._move(item_id, FSState.LOCKED, FSState.DONE) diff --git a/paths.py b/paths.py index 4c19dcc..4b39f20 100644 --- a/paths.py +++ b/paths.py @@ -10,6 +10,6 @@ TEST_INDEX_PATH = os.path.join(DATA_DIR, 'index-test.tinysearch') WIKI_DATA_PATH = os.path.join(DATA_DIR, 'enwiki-20210301-pages-articles1.xml-p1p41242.bz2') WIKI_TITLES_PATH = os.path.join(DATA_DIR, 'abstract-titles-sorted.txt.gz') -DOMAINS_QUEUE_PATH = os.path.join(DATA_DIR, 'domains-queue') -DOMAINS_TITLES_QUEUE_PATH = os.path.join(DATA_DIR, 'domains-title-queue') +DOMAINS_QUEUE_NAME = 'domains-queue-fs' +DOMAINS_TITLES_QUEUE_NAME = 'domains-title-queue-fs' DOMAINS_PATH = os.path.join(DATA_DIR, 'top10milliondomains.csv.gz')