From 34dc50a6ed26be523903dc167ca8d45293f84d13 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 11 Dec 2021 17:18:00 +0000 Subject: [PATCH] Output processed items to an output queue --- extract_local.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/extract_local.py b/extract_local.py index 045b5f0..9e0446d 100644 --- a/extract_local.py +++ b/extract_local.py @@ -1,9 +1,7 @@ import gzip import json -import multiprocessing import os from glob import glob -from itertools import islice from pathlib import Path from extract_process import fetch_process_warc_records @@ -28,17 +26,22 @@ def process(record): def run(): - queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) - path, records = queue.get() - for record in records: - result = process(record) - print("Result", result) + input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer()) + output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer()) + input_queue.unlock_all() - # with gzip.open(EXTRACTS_PATH / 'data.json.gz', 'wt') as output_file: - # for row in processed: - # output_file.write(json.dumps(row) + '\n') - # print("Processed", row) + while True: + queue_item = input_queue.get() + if queue_item is None: + break + item_id, records = queue_item + search_items = [] + for record in records: + search_items += list(fetch_process_warc_records([record])) + if search_items: + output_queue.put(search_items) + input_queue.done(item_id) if __name__ == '__main__':