Update URL queue separately from the other background process to speed it up
This commit is contained in:
parent
7bd12c1ead
commit
d347a17d63
3 changed files with 15 additions and 17 deletions
|
@ -2,7 +2,6 @@
|
|||
Script that updates data in a background process.
|
||||
"""
|
||||
from logging import getLogger
|
||||
from multiprocessing import Queue
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
|
@ -11,32 +10,22 @@ from mwmbl.database import Database
|
|||
from mwmbl.indexer import index_batches, historical, update_urls
|
||||
from mwmbl.indexer.batch_cache import BatchCache
|
||||
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
|
||||
from mwmbl.url_queue import update_url_queue, initialize_url_queue
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
def run(data_path: str, url_queue: Queue):
|
||||
def run(data_path: str):
|
||||
logger.info("Started background process")
|
||||
|
||||
with Database() as db:
|
||||
url_db = URLDatabase(db.connection)
|
||||
url_db.create_tables()
|
||||
|
||||
initialize_url_queue(url_queue)
|
||||
try:
|
||||
update_url_queue(url_queue)
|
||||
except Exception:
|
||||
logger.exception("Error updating URL queue")
|
||||
historical.run()
|
||||
index_path = Path(data_path) / INDEX_NAME
|
||||
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
|
||||
|
||||
while True:
|
||||
try:
|
||||
update_url_queue(url_queue)
|
||||
except Exception:
|
||||
logger.exception("Error updating URL queue")
|
||||
try:
|
||||
batch_cache.retrieve_batches(num_batches=10000)
|
||||
except Exception:
|
||||
|
|
|
@ -8,7 +8,7 @@ from pathlib import Path
|
|||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
|
||||
from mwmbl import background
|
||||
from mwmbl import background, url_queue
|
||||
from mwmbl.crawler import app as crawler
|
||||
from mwmbl.indexer.batch_cache import BatchCache
|
||||
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
|
||||
|
@ -46,10 +46,11 @@ def run():
|
|||
print("Creating a new index")
|
||||
TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE)
|
||||
|
||||
url_queue = Queue()
|
||||
queue = Queue()
|
||||
|
||||
if args.background:
|
||||
Process(target=background.run, args=(args.data, url_queue)).start()
|
||||
Process(target=background.run, args=(args.data,)).start()
|
||||
Process(target=url_queue.run, args=(queue,)).start()
|
||||
|
||||
completer = Completer()
|
||||
|
||||
|
@ -65,7 +66,7 @@ def run():
|
|||
app.include_router(search_router)
|
||||
|
||||
batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME)
|
||||
crawler_router = crawler.get_router(batch_cache, url_queue)
|
||||
crawler_router = crawler.get_router(batch_cache, queue)
|
||||
app.include_router(crawler_router)
|
||||
|
||||
# Initialize uvicorn server using global app instance and server config params
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from logging import getLogger
|
||||
from multiprocessing import Queue
|
||||
from time import sleep
|
||||
|
||||
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus
|
||||
from mwmbl.database import Database
|
||||
|
@ -13,11 +14,18 @@ MAX_QUEUE_SIZE = 5000
|
|||
MIN_QUEUE_SIZE = 1000
|
||||
|
||||
|
||||
def run(url_queue: Queue):
|
||||
initialize_url_queue(url_queue)
|
||||
while True:
|
||||
update_url_queue(url_queue)
|
||||
|
||||
|
||||
def update_url_queue(url_queue: Queue):
|
||||
logger.info("Updating URL queue")
|
||||
current_size = url_queue.qsize()
|
||||
if current_size >= MIN_QUEUE_SIZE:
|
||||
logger.info(f"Skipping queue update, current size {current_size}")
|
||||
logger.info(f"Skipping queue update, current size {current_size}, sleeping for 10 seconds")
|
||||
sleep(10)
|
||||
return
|
||||
|
||||
with Database() as db:
|
||||
|
|
Loading…
Add table
Reference in a new issue