From 16a8356a23c0f0ff0a192f71a736fcc23c9420be Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sun, 12 Dec 2021 09:09:44 +0000 Subject: [PATCH] Run multiple processes in parallel --- extract_local.py | 29 ++++++++++++++++++++++++----- fsqueue.py | 13 +++++++++++-- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/extract_local.py b/extract_local.py index 9e0446d..0aef839 100644 --- a/extract_local.py +++ b/extract_local.py @@ -2,7 +2,9 @@ import gzip import json import os from glob import glob +from multiprocessing import Process, Lock from pathlib import Path +from time import sleep from extract_process import fetch_process_warc_records from fsqueue import FSQueue, GzipJsonRowSerializer @@ -12,6 +14,8 @@ EXTRACTS_PATH = DATA_DIR / 'extracts' ARCHIVE_INFO_GLOB = 'outputs/records/*.gz' +NUM_PROCESSES = 8 + def get_records(): for path in glob(ARCHIVE_INFO_GLOB): @@ -25,17 +29,18 @@ def process(record): return list(fetch_process_warc_records([record])) -def run(): +def run(lock: Lock): input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer()) - input_queue.unlock_all() - while True: - queue_item = input_queue.get() + with lock: + queue_item = input_queue.get() if queue_item is None: + print("All finished, stopping:", os.getpid()) break item_id, records = queue_item + print("Got item: ", item_id, os.getpid()) search_items = [] for record in records: search_items += list(fetch_process_warc_records([record])) @@ -44,5 +49,19 @@ def run(): input_queue.done(item_id) +def run_multiprocessing(): + input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) + input_queue.unlock_all() + processes = [] + lock = Lock() + for i in range(NUM_PROCESSES): + new_process = Process(target=run, args=(lock,)) + new_process.start() + processes.append(new_process) + + for running_process in processes: + running_process.join() + + if __name__ == '__main__': - run() + run_multiprocessing() diff --git a/fsqueue.py b/fsqueue.py index c90e349..018f62b 100644 --- a/fsqueue.py +++ b/fsqueue.py @@ -91,13 +91,22 @@ class FSQueue: Get the next priority item from the queue, returning the item ID and the object """ - paths = sorted(Path(self._get_dir(FSState.READY)).iterdir(), key=os.path.getmtime) + directory = self._get_dir(FSState.READY) + print("Getting directory", directory) + paths = list(Path(directory).iterdir()) + print("Top paths", paths[:10]) for path in paths: # Try and lock the file - self._move(path.name, FSState.READY, FSState.LOCKED) + try: + print("Moving file", path.name) + self._move(path.name, FSState.READY, FSState.LOCKED) + except FileNotFoundError: + print("File not found", path.name) + continue with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file: + print("Opening file", path.name) return path.name, self.serializer.deserialize(item_file.read()) def done(self, item_id: str):