From d347a17d634773b827f67615f4b37770eb956eb9 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Mon, 9 Jan 2023 20:50:28 +0000 Subject: [PATCH] Update URL queue separately from the other background process to speed it up --- mwmbl/background.py | 13 +------------ mwmbl/main.py | 9 +++++---- mwmbl/url_queue.py | 10 +++++++++- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/mwmbl/background.py b/mwmbl/background.py index 98e8eb5..23b0a98 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -2,7 +2,6 @@ Script that updates data in a background process. """ from logging import getLogger -from multiprocessing import Queue from pathlib import Path from time import sleep @@ -11,32 +10,22 @@ from mwmbl.database import Database from mwmbl.indexer import index_batches, historical, update_urls from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME -from mwmbl.url_queue import update_url_queue, initialize_url_queue logger = getLogger(__name__) -def run(data_path: str, url_queue: Queue): +def run(data_path: str): logger.info("Started background process") with Database() as db: url_db = URLDatabase(db.connection) url_db.create_tables() - initialize_url_queue(url_queue) - try: - update_url_queue(url_queue) - except Exception: - logger.exception("Error updating URL queue") historical.run() index_path = Path(data_path) / INDEX_NAME batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) while True: - try: - update_url_queue(url_queue) - except Exception: - logger.exception("Error updating URL queue") try: batch_cache.retrieve_batches(num_batches=10000) except Exception: diff --git a/mwmbl/main.py b/mwmbl/main.py index fd93824..8b14393 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -8,7 +8,7 @@ from pathlib import Path import uvicorn from fastapi import FastAPI -from mwmbl import background +from mwmbl import background, url_queue from mwmbl.crawler import app as crawler from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME @@ -46,10 +46,11 @@ def run(): print("Creating a new index") TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE) - url_queue = Queue() + queue = Queue() if args.background: - Process(target=background.run, args=(args.data, url_queue)).start() + Process(target=background.run, args=(args.data,)).start() + Process(target=url_queue.run, args=(queue,)).start() completer = Completer() @@ -65,7 +66,7 @@ def run(): app.include_router(search_router) batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME) - crawler_router = crawler.get_router(batch_cache, url_queue) + crawler_router = crawler.get_router(batch_cache, queue) app.include_router(crawler_router) # Initialize uvicorn server using global app instance and server config params diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index e4124ec..316dae2 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -1,5 +1,6 @@ from logging import getLogger from multiprocessing import Queue +from time import sleep from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus from mwmbl.database import Database @@ -13,11 +14,18 @@ MAX_QUEUE_SIZE = 5000 MIN_QUEUE_SIZE = 1000 +def run(url_queue: Queue): + initialize_url_queue(url_queue) + while True: + update_url_queue(url_queue) + + def update_url_queue(url_queue: Queue): logger.info("Updating URL queue") current_size = url_queue.qsize() if current_size >= MIN_QUEUE_SIZE: - logger.info(f"Skipping queue update, current size {current_size}") + logger.info(f"Skipping queue update, current size {current_size}, sleeping for 10 seconds") + sleep(10) return with Database() as db: