Daoud Clarke 2 лет назад
Родитель
Сommit
603fcd4eb2
4 измененных файлов с 65 добавлено и 38 удалено
  1. 5 5
      mwmbl/background.py
  2. 4 4
      mwmbl/crawler/app.py
  3. 5 3
      mwmbl/main.py
  4. 51 26
      mwmbl/url_queue.py

+ 5 - 5
mwmbl/background.py

@@ -21,15 +21,15 @@ def run(data_path: str):
         url_db = URLDatabase(db.connection)
         url_db.create_tables()
 
-    historical.run()
+    # historical.run()
     index_path = Path(data_path) / INDEX_NAME
     batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
 
     while True:
-        try:
-            batch_cache.retrieve_batches(num_batches=10000)
-        except Exception:
-            logger.exception("Error retrieving batches")
+        # try:
+        #     batch_cache.retrieve_batches(num_batches=10000)
+        # except Exception:
+        #     logger.exception("Error retrieving batches")
         try:
             update_urls.run(batch_cache)
         except Exception:

+ 4 - 4
mwmbl/crawler/app.py

@@ -2,8 +2,7 @@ import gzip
 import hashlib
 import json
 from datetime import datetime, timezone, date
-from multiprocessing import Queue
-from queue import Empty
+from queue import Queue, Empty
 from typing import Union
 from uuid import uuid4
 
@@ -28,6 +27,7 @@ from mwmbl.settings import (
     PUBLIC_USER_ID_LENGTH,
     FILE_NAME_SUFFIX,
     DATE_REGEX)
+from mwmbl.url_queue import URLQueue
 
 
 def get_bucket(name):
@@ -45,7 +45,7 @@ def upload(data: bytes, name: str):
 last_batch = None
 
 
-def get_router(batch_cache: BatchCache, url_queue: Queue):
+def get_router(batch_cache: BatchCache, queued_batches: Queue):
     router = APIRouter(prefix="/crawler", tags=["crawler"])
 
     @router.post('/batches/')
@@ -103,7 +103,7 @@ def get_router(batch_cache: BatchCache, url_queue: Queue):
     def request_new_batch(batch_request: NewBatchRequest) -> list[str]:
         user_id_hash = _get_user_id_hash(batch_request)
         try:
-            urls = url_queue.get(block=False)
+            urls = queued_batches.get(block=False)
         except Empty:
             return []
 

+ 5 - 3
mwmbl/main.py

@@ -16,6 +16,7 @@ from mwmbl.tinysearchengine import search
 from mwmbl.tinysearchengine.completer import Completer
 from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
 from mwmbl.tinysearchengine.rank import HeuristicRanker
+from mwmbl.url_queue import URLQueue, update_queue_continuously
 
 logging.basicConfig(stream=sys.stdout, level=logging.INFO)
 
@@ -46,11 +47,12 @@ def run():
         print("Creating a new index")
         TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE)
 
-    queue = Queue()
+    new_item_queue = Queue()
+    queued_batches = Queue()
 
     if args.background:
         Process(target=background.run, args=(args.data,)).start()
-        Process(target=url_queue.run, args=(queue,)).start()
+        Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start()
 
     completer = Completer()
 
@@ -66,7 +68,7 @@ def run():
         app.include_router(search_router)
 
         batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME)
-        crawler_router = crawler.get_router(batch_cache, queue)
+        crawler_router = crawler.get_router(batch_cache, queued_batches)
         app.include_router(crawler_router)
 
         # Initialize uvicorn server using global app instance and server config params

+ 51 - 26
mwmbl/url_queue.py

@@ -1,8 +1,11 @@
+import time
+from datetime import datetime, timedelta
 from logging import getLogger
 from multiprocessing import Queue
+from queue import Empty
 from time import sleep
 
-from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus
+from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS
 from mwmbl.database import Database
 from mwmbl.utils import batch
 
@@ -14,35 +17,57 @@ MAX_QUEUE_SIZE = 5000
 MIN_QUEUE_SIZE = 1000
 
 
-def run(url_queue: Queue):
-    initialize_url_queue(url_queue)
-    while True:
-        update_url_queue(url_queue)
+class URLQueue:
+    def __init__(self, new_item_queue: Queue, queued_batches: Queue):
+        """
+        new_item_queue: each item in the queue is a list of FoundURLs
+        queued_batches: each item in the queue is a list of URLs (strings)
+        """
+        self._new_item_queue = new_item_queue
+        self._queued_batches = queued_batches
+
+    def initialize(self):
+        with Database() as db:
+            url_db = URLDatabase(db.connection)
+            urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE)
+            self._queue_urls(urls)
+            logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {self.num_queued_batches}")
+
+    def update(self):
+        num_processed = 0
+        while True:
+            try:
+                new_batch = self._new_item_queue.get_nowait()
+                num_processed += 1
+            except Empty:
+                break
+            self.process_found_urls(new_batch)
+        return num_processed
 
+    def process_found_urls(self, found_urls: list[FoundURL]):
+        min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
 
-def update_url_queue(url_queue: Queue):
-    logger.info("Updating URL queue")
-    current_size = url_queue.qsize()
-    if current_size >= MIN_QUEUE_SIZE:
-        logger.info(f"Skipping queue update, current size {current_size}, sleeping for 10 seconds")
-        sleep(10)
-        return
+        valid_urls = [found_url.url for found_url in found_urls if found_url.status == URLStatus.NEW or (
+                found_url.status == URLStatus.ASSIGNED and found_url.timestamp < min_updated_date)]
 
-    with Database() as db:
-        url_db = URLDatabase(db.connection)
-        urls = url_db.get_urls_for_crawling()
-        queue_batches(url_queue, urls)
-        logger.info(f"Queued {len(urls)} urls, current queue size: {url_queue.qsize()}")
+        self._queue_urls(valid_urls)
 
+    def _queue_urls(self, valid_urls: list[str]):
+        for url_batch in batch(valid_urls, BATCH_SIZE):
+            self._queued_batches.put(url_batch, block=False)
 
-def initialize_url_queue(url_queue: Queue):
-    with Database() as db:
-        url_db = URLDatabase(db.connection)
-        urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE)
-        queue_batches(url_queue, urls)
-        logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {url_queue.qsize()}")
+    @property
+    def num_queued_batches(self):
+        return self._queued_batches.qsize()
+
+
+def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue):
+    queue = URLQueue(new_item_queue, queued_batches)
+    queue.initialize()
+    while True:
+        num_processed = queue.update()
+        logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}")
+        if num_processed == 0:
+            time.sleep(5)
 
 
-def queue_batches(url_queue, urls):
-    for url_batch in batch(urls, BATCH_SIZE):
-        url_queue.put(url_batch, block=False)