Output processed items to an output queue

This commit is contained in:
Daoud Clarke 2021-12-11 17:18:00 +00:00
parent c46257c6d1
commit 34dc50a6ed

View file

@ -1,9 +1,7 @@
import gzip
import json
import multiprocessing
import os
from glob import glob
from itertools import islice
from pathlib import Path
from extract_process import fetch_process_warc_records
@ -28,17 +26,22 @@ def process(record):
def run():
queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
path, records = queue.get()
for record in records:
result = process(record)
print("Result", result)
input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
input_queue.unlock_all()
# with gzip.open(EXTRACTS_PATH / 'data.json.gz', 'wt') as output_file:
# for row in processed:
# output_file.write(json.dumps(row) + '\n')
# print("Processed", row)
while True:
queue_item = input_queue.get()
if queue_item is None:
break
item_id, records = queue_item
search_items = []
for record in records:
search_items += list(fetch_process_warc_records([record]))
if search_items:
output_queue.put(search_items)
input_queue.done(item_id)
if __name__ == '__main__':