diff --git a/mwmbl/background.py b/mwmbl/background.py index 23b0a98..2bc087a 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -21,15 +21,15 @@ def run(data_path: str): url_db = URLDatabase(db.connection) url_db.create_tables() - historical.run() + # historical.run() index_path = Path(data_path) / INDEX_NAME batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) while True: - try: - batch_cache.retrieve_batches(num_batches=10000) - except Exception: - logger.exception("Error retrieving batches") + # try: + # batch_cache.retrieve_batches(num_batches=10000) + # except Exception: + # logger.exception("Error retrieving batches") try: update_urls.run(batch_cache) except Exception: diff --git a/mwmbl/crawler/app.py b/mwmbl/crawler/app.py index c113438..c24cb00 100644 --- a/mwmbl/crawler/app.py +++ b/mwmbl/crawler/app.py @@ -2,8 +2,7 @@ import gzip import hashlib import json from datetime import datetime, timezone, date -from multiprocessing import Queue -from queue import Empty +from queue import Queue, Empty from typing import Union from uuid import uuid4 @@ -28,6 +27,7 @@ from mwmbl.settings import ( PUBLIC_USER_ID_LENGTH, FILE_NAME_SUFFIX, DATE_REGEX) +from mwmbl.url_queue import URLQueue def get_bucket(name): @@ -45,7 +45,7 @@ def upload(data: bytes, name: str): last_batch = None -def get_router(batch_cache: BatchCache, url_queue: Queue): +def get_router(batch_cache: BatchCache, queued_batches: Queue): router = APIRouter(prefix="/crawler", tags=["crawler"]) @router.post('/batches/') @@ -103,7 +103,7 @@ def get_router(batch_cache: BatchCache, url_queue: Queue): def request_new_batch(batch_request: NewBatchRequest) -> list[str]: user_id_hash = _get_user_id_hash(batch_request) try: - urls = url_queue.get(block=False) + urls = queued_batches.get(block=False) except Empty: return [] diff --git a/mwmbl/main.py b/mwmbl/main.py index 8b14393..d4d59a9 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -16,6 +16,7 @@ from mwmbl.tinysearchengine import search from mwmbl.tinysearchengine.completer import Completer from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE from mwmbl.tinysearchengine.rank import HeuristicRanker +from mwmbl.url_queue import URLQueue, update_queue_continuously logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -46,11 +47,12 @@ def run(): print("Creating a new index") TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE) - queue = Queue() + new_item_queue = Queue() + queued_batches = Queue() if args.background: Process(target=background.run, args=(args.data,)).start() - Process(target=url_queue.run, args=(queue,)).start() + Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start() completer = Completer() @@ -66,7 +68,7 @@ def run(): app.include_router(search_router) batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME) - crawler_router = crawler.get_router(batch_cache, queue) + crawler_router = crawler.get_router(batch_cache, queued_batches) app.include_router(crawler_router) # Initialize uvicorn server using global app instance and server config params diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index 316dae2..8930407 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -1,8 +1,11 @@ +import time +from datetime import datetime, timedelta from logging import getLogger from multiprocessing import Queue +from queue import Empty from time import sleep -from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus +from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS from mwmbl.database import Database from mwmbl.utils import batch @@ -14,35 +17,57 @@ MAX_QUEUE_SIZE = 5000 MIN_QUEUE_SIZE = 1000 -def run(url_queue: Queue): - initialize_url_queue(url_queue) +class URLQueue: + def __init__(self, new_item_queue: Queue, queued_batches: Queue): + """ + new_item_queue: each item in the queue is a list of FoundURLs + queued_batches: each item in the queue is a list of URLs (strings) + """ + self._new_item_queue = new_item_queue + self._queued_batches = queued_batches + + def initialize(self): + with Database() as db: + url_db = URLDatabase(db.connection) + urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE) + self._queue_urls(urls) + logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {self.num_queued_batches}") + + def update(self): + num_processed = 0 + while True: + try: + new_batch = self._new_item_queue.get_nowait() + num_processed += 1 + except Empty: + break + self.process_found_urls(new_batch) + return num_processed + + def process_found_urls(self, found_urls: list[FoundURL]): + min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS) + + valid_urls = [found_url.url for found_url in found_urls if found_url.status == URLStatus.NEW or ( + found_url.status == URLStatus.ASSIGNED and found_url.timestamp < min_updated_date)] + + self._queue_urls(valid_urls) + + def _queue_urls(self, valid_urls: list[str]): + for url_batch in batch(valid_urls, BATCH_SIZE): + self._queued_batches.put(url_batch, block=False) + + @property + def num_queued_batches(self): + return self._queued_batches.qsize() + + +def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue): + queue = URLQueue(new_item_queue, queued_batches) + queue.initialize() while True: - update_url_queue(url_queue) + num_processed = queue.update() + logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}") + if num_processed == 0: + time.sleep(5) -def update_url_queue(url_queue: Queue): - logger.info("Updating URL queue") - current_size = url_queue.qsize() - if current_size >= MIN_QUEUE_SIZE: - logger.info(f"Skipping queue update, current size {current_size}, sleeping for 10 seconds") - sleep(10) - return - - with Database() as db: - url_db = URLDatabase(db.connection) - urls = url_db.get_urls_for_crawling() - queue_batches(url_queue, urls) - logger.info(f"Queued {len(urls)} urls, current queue size: {url_queue.qsize()}") - - -def initialize_url_queue(url_queue: Queue): - with Database() as db: - url_db = URLDatabase(db.connection) - urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE) - queue_batches(url_queue, urls) - logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {url_queue.qsize()}") - - -def queue_batches(url_queue, urls): - for url_batch in batch(urls, BATCH_SIZE): - url_queue.put(url_batch, block=False)