瀏覽代碼

Run multiple processes in parallel

Daoud Clarke 3 年之前
父節點
當前提交
16a8356a23
共有 2 個文件被更改,包括 35 次插入7 次删除
  1. 24 5
      extract_local.py
  2. 11 2
      fsqueue.py

+ 24 - 5
extract_local.py

@@ -2,7 +2,9 @@ import gzip
 import json
 import json
 import os
 import os
 from glob import glob
 from glob import glob
+from multiprocessing import Process, Lock
 from pathlib import Path
 from pathlib import Path
+from time import sleep
 
 
 from extract_process import fetch_process_warc_records
 from extract_process import fetch_process_warc_records
 from fsqueue import FSQueue, GzipJsonRowSerializer
 from fsqueue import FSQueue, GzipJsonRowSerializer
@@ -12,6 +14,8 @@ EXTRACTS_PATH = DATA_DIR / 'extracts'
 
 
 ARCHIVE_INFO_GLOB = 'outputs/records/*.gz'
 ARCHIVE_INFO_GLOB = 'outputs/records/*.gz'
 
 
+NUM_PROCESSES = 8
+
 
 
 def get_records():
 def get_records():
     for path in glob(ARCHIVE_INFO_GLOB):
     for path in glob(ARCHIVE_INFO_GLOB):
@@ -25,17 +29,18 @@ def process(record):
     return list(fetch_process_warc_records([record]))
     return list(fetch_process_warc_records([record]))
 
 
 
 
-def run():
+def run(lock: Lock):
     input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
     input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
     output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
     output_queue = FSQueue(DATA_DIR, 'search-items', GzipJsonRowSerializer())
 
 
-    input_queue.unlock_all()
-
     while True:
     while True:
-        queue_item = input_queue.get()
+        with lock:
+            queue_item = input_queue.get()
         if queue_item is None:
         if queue_item is None:
+            print("All finished, stopping:", os.getpid())
             break
             break
         item_id, records = queue_item
         item_id, records = queue_item
+        print("Got item: ", item_id, os.getpid())
         search_items = []
         search_items = []
         for record in records:
         for record in records:
             search_items += list(fetch_process_warc_records([record]))
             search_items += list(fetch_process_warc_records([record]))
@@ -44,5 +49,19 @@ def run():
         input_queue.done(item_id)
         input_queue.done(item_id)
 
 
 
 
+def run_multiprocessing():
+    input_queue = FSQueue(DATA_DIR, 'records', GzipJsonRowSerializer())
+    input_queue.unlock_all()
+    processes = []
+    lock = Lock()
+    for i in range(NUM_PROCESSES):
+        new_process = Process(target=run, args=(lock,))
+        new_process.start()
+        processes.append(new_process)
+
+    for running_process in processes:
+        running_process.join()
+
+
 if __name__ == '__main__':
 if __name__ == '__main__':
-    run()
+    run_multiprocessing()

+ 11 - 2
fsqueue.py

@@ -91,13 +91,22 @@ class FSQueue:
         Get the next priority item from the queue, returning the item ID and the object
         Get the next priority item from the queue, returning the item ID and the object
         """
         """
 
 
-        paths = sorted(Path(self._get_dir(FSState.READY)).iterdir(), key=os.path.getmtime)
+        directory = self._get_dir(FSState.READY)
+        print("Getting directory", directory)
+        paths = list(Path(directory).iterdir())
+        print("Top paths", paths[:10])
 
 
         for path in paths:
         for path in paths:
             # Try and lock the file
             # Try and lock the file
-            self._move(path.name, FSState.READY, FSState.LOCKED)
+            try:
+                print("Moving file", path.name)
+                self._move(path.name, FSState.READY, FSState.LOCKED)
+            except FileNotFoundError:
+                print("File not found", path.name)
+                continue
 
 
             with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
             with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
+                print("Opening file", path.name)
                 return path.name, self.serializer.deserialize(item_file.read())
                 return path.name, self.serializer.deserialize(item_file.read())
 
 
     def done(self, item_id: str):
     def done(self, item_id: str):