Browse Source

Use a filesystem-based queue

Daoud Clarke 4 years ago
parent
commit
cc841c8b7e
4 changed files with 127 additions and 20 deletions
  1. 19 13
      domains/domain_titles.py
  2. 9 5
      domains/queue_domains.py
  3. 97 0
      fsqueue.py
  4. 2 2
      paths.py

+ 19 - 13
domains/domain_titles.py

@@ -1,17 +1,15 @@
 """
 Retrieve titles for each domain in the list of top domains
 """
-import pickle
 from multiprocessing import Process
 from time import sleep
 from urllib.parse import urlsplit, urlunsplit
 
 import bs4
 import requests
-from persistqueue import SQLiteAckQueue
-
-from paths import DOMAINS_QUEUE_PATH, DOMAINS_TITLES_QUEUE_PATH
 
+from fsqueue import FSQueue, ZstdJsonSerializer
+from paths import DATA_DIR, DOMAINS_QUEUE_NAME, DOMAINS_TITLES_QUEUE_NAME
 
 NUM_PROCESSES = 10
 
@@ -35,21 +33,29 @@ def get_redirect_no_cookies(url, max_redirects=5):
 
 
 def get_domain_titles():
-    domains_queue = SQLiteAckQueue(DOMAINS_QUEUE_PATH)
-    titles_queue = SQLiteAckQueue(DOMAINS_TITLES_QUEUE_PATH, multithreading=True)
+    domains_queue = FSQueue(DATA_DIR, DOMAINS_QUEUE_NAME, ZstdJsonSerializer())
+    titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer())
     while True:
-        item = domains_queue.get()
+        items_id, items = domains_queue.get()
+        titles = retrieve_titles(items)
         # print("Item", item)
+        # print("Title", type(title))
+        # print("Title item", str(title_item))
+        # print("Dump", pickle.dumps(title_item))
+        titles_queue.put(titles)
+        domains_queue.done(items_id)
+        print("Done titles", len(titles))
+
+
+def retrieve_titles(items):
+    titles = []
+    for item in items:
         rank, domain = item
         print("Domain", domain, rank)
         status, title, url = retrieve_title(domain)
-        # print("Title", type(title))
         title_item = dict(rank=rank, domain=domain, status=status, url=url, title=title)
-        # print("Title item", str(title_item))
-        # print("Dump", pickle.dumps(title_item))
-        titles_queue.put(title_item)
-        domains_queue.ack(item)
-        print("Queued", titles_queue.size)
+        titles.append(title_item)
+    return titles
 
 
 def retrieve_title(domain):

+ 9 - 5
domains/queue_domains.py

@@ -4,9 +4,10 @@ Add domains to the queue to be retrieved
 import csv
 import gzip
 
-from persistqueue import SQLiteQueue, SQLiteAckQueue
+from fsqueue import FSQueue, ZstdJsonSerializer
+from paths import DOMAINS_PATH, DOMAINS_QUEUE_NAME, DATA_DIR
 
-from paths import DOMAINS_QUEUE_PATH, DOMAINS_PATH
+BATCH_SIZE = 10000
 
 
 def get_domains():
@@ -17,12 +18,15 @@ def get_domains():
 
 
 def queue_domains():
-    queue = SQLiteAckQueue(DOMAINS_QUEUE_PATH)
+    queue = FSQueue(DATA_DIR, DOMAINS_QUEUE_NAME, ZstdJsonSerializer())
     queued = 0
+    batch = []
     for rank, domain in get_domains():
-        queue.put((rank, domain))
+        batch.append((rank, domain))
         queued += 1
-        if queued % 1000 == 0:
+        if queued % BATCH_SIZE == 0:
+            queue.put(batch)
+            batch = []
             print("Queued:", queued)
 
 

+ 97 - 0
fsqueue.py

@@ -0,0 +1,97 @@
+"""
+Filesystem-based queue that uses os.rename as an atomic operation to ensure
+that items are handled correctly.
+"""
+
+import json
+import os
+from abc import ABC
+from enum import Enum
+from uuid import uuid4
+from pathlib import Path
+
+from zstandard import ZstdCompressor, ZstdDecompressor
+
+
+class FSState(Enum):
+    CREATING = 'creating'
+    READY = 'ready'
+    LOCKED = 'locked'
+    DONE = 'done'
+
+
+class Serializer(ABC):
+    def serialize(self, item) -> bytes:
+        pass
+
+    def deserialize(self, serialized_item: bytes):
+        pass
+
+
+class ZstdJsonSerializer(Serializer):
+    def __init__(self):
+        self.compressor = ZstdCompressor()
+        self.decompressor = ZstdDecompressor()
+
+    def serialize(self, item) -> bytes:
+        return self.compressor.compress(json.dumps(item).encode('utf8'))
+
+    def deserialize(self, serialized_item: bytes):
+        return json.loads(self.decompressor.decompress(serialized_item).decode('utf8'))
+
+
+class FSQueue:
+    def __init__(self, directory: str, name: str, serializer: Serializer):
+        self.directory = directory
+        self.name = name
+        self.serializer = serializer
+
+        if not os.path.isdir(self.directory):
+            raise ValueError("Given path is not a directory")
+
+        if '/' in name:
+            raise ValueError("Name should not contain '/'")
+
+        os.makedirs(os.path.join(self.directory, self.name), exist_ok=True)
+        for state in FSState:
+            os.makedirs(self._get_dir(state), exist_ok=True)
+
+    def _get_dir(self, state: FSState):
+        return os.path.join(self.directory, self.name, state.value)
+
+    def _get_path(self, state: FSState, name: str):
+        return os.path.join(self._get_dir(state), name)
+
+    def _move(self, name: str, old_state: FSState, new_state: FSState):
+        os.rename(self._get_path(old_state, name), self._get_path(new_state, name))
+
+    def put(self, item: object):
+        """
+        Push a new item into the ready state
+        """
+        item_id = str(uuid4())
+        with open(self._get_path(FSState.CREATING, item_id), 'wb') as output_file:
+            output_file.write(self.serializer.serialize(item))
+
+        self._move(item_id, FSState.CREATING, FSState.READY)
+
+    def get(self) -> (str, object):
+        """
+        Get the next priority item from the queue, returning the item ID and the object
+        """
+
+        paths = sorted(Path(self._get_dir(FSState.READY)).iterdir(), key=os.path.getmtime)
+
+        for path in paths:
+            # Try and lock the file
+            self._move(path.name, FSState.READY, FSState.LOCKED)
+
+            with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
+                return path.name, self.serializer.deserialize(item_file.read())
+
+    def done(self, item_id: str):
+        """
+        Mark a task/file as done
+        """
+
+        self._move(item_id, FSState.LOCKED, FSState.DONE)

+ 2 - 2
paths.py

@@ -10,6 +10,6 @@ TEST_INDEX_PATH = os.path.join(DATA_DIR, 'index-test.tinysearch')
 WIKI_DATA_PATH = os.path.join(DATA_DIR, 'enwiki-20210301-pages-articles1.xml-p1p41242.bz2')
 WIKI_TITLES_PATH = os.path.join(DATA_DIR, 'abstract-titles-sorted.txt.gz')
 
-DOMAINS_QUEUE_PATH = os.path.join(DATA_DIR, 'domains-queue')
-DOMAINS_TITLES_QUEUE_PATH = os.path.join(DATA_DIR, 'domains-title-queue')
+DOMAINS_QUEUE_NAME = 'domains-queue-fs'
+DOMAINS_TITLES_QUEUE_NAME = 'domains-title-queue-fs'
 DOMAINS_PATH = os.path.join(DATA_DIR, 'top10milliondomains.csv.gz')