diff --git a/analyse/url_queue.py b/analyse/url_queue.py new file mode 100644 index 0000000..70dedee --- /dev/null +++ b/analyse/url_queue.py @@ -0,0 +1,35 @@ +import logging +import os +import pickle +import sys +from datetime import datetime +from pathlib import Path +from queue import Queue + +from mwmbl.url_queue import URLQueue + +FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s' +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT) + + +def run_url_queue(): + data = pickle.load(open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "rb")) + print("First URLs", [x.url for x in data[:1000]]) + + new_item_queue = Queue() + queued_batches = Queue() + queue = URLQueue(new_item_queue, queued_batches) + + new_item_queue.put(data) + + start = datetime.now() + queue.update() + total_time = (datetime.now() - start).total_seconds() + print(f"Total time: {total_time}") + + + + + +if __name__ == '__main__': + run_url_queue() diff --git a/devdata/index-v2.tinysearch b/devdata/index-v2.tinysearch index d361a8d..fe10556 100644 Binary files a/devdata/index-v2.tinysearch and b/devdata/index-v2.tinysearch differ diff --git a/mwmbl/indexer/process_batch.py b/mwmbl/indexer/process_batch.py index 0c7ce02..9011b87 100644 --- a/mwmbl/indexer/process_batch.py +++ b/mwmbl/indexer/process_batch.py @@ -24,6 +24,10 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta batch_data = batch_cache.get_cached([batch.url for batch in batches]) logger.info(f"Got {len(batch_data)} cached batches") + missing_batches = {batch.url for batch in batches} - batch_data.keys() + logger.info(f"Got {len(missing_batches)} missing batches") + index_db.update_batch_status(list(missing_batches), BatchStatus.REMOTE) + process(batch_data.values(), *args) index_db.update_batch_status(list(batch_data.keys()), end_status) diff --git a/mwmbl/main.py b/mwmbl/main.py index 37c6eb6..dedc0aa 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -1,6 +1,5 @@ import argparse import logging -import os import sys from multiprocessing import Process, Queue from pathlib import Path @@ -8,7 +7,7 @@ from pathlib import Path import uvicorn from fastapi import FastAPI -from mwmbl import background, url_queue +from mwmbl import background from mwmbl.crawler import app as crawler from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME @@ -16,9 +15,10 @@ from mwmbl.tinysearchengine import search from mwmbl.tinysearchengine.completer import Completer from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE from mwmbl.tinysearchengine.rank import HeuristicRanker -from mwmbl.url_queue import URLQueue, update_queue_continuously +from mwmbl.url_queue import update_queue_continuously -logging.basicConfig(stream=sys.stdout, level=logging.INFO) +FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s' +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT) MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle' diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index 6404d5d..d1fd047 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -1,10 +1,14 @@ +import os +import pickle import random +import re import time from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta from logging import getLogger from multiprocessing import Queue +from pathlib import Path from queue import Empty from time import sleep from typing import KeysView, Union @@ -29,6 +33,8 @@ MAX_URLS_PER_TOP_DOMAIN = 100 MAX_URLS_PER_OTHER_DOMAIN = 5 MAX_OTHER_DOMAINS = 10000 +DOMAIN_REGEX = re.compile(r".*://([^/]*)") + @dataclass class URLScore: @@ -66,6 +72,11 @@ class URLQueue: return num_processed def _process_found_urls(self, found_urls: list[FoundURL]): + logger.info("Processing found URLs") + # with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "wb") as output_file: + # pickle.dump(found_urls, output_file) + # logger.info("Dumped") + min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS) logger.info(f"Found URLS: {len(found_urls)}") @@ -87,10 +98,12 @@ class URLQueue: def _sort_urls(self, valid_urls: list[FoundURL]): for found_url in valid_urls: - domain = urlparse(found_url.url).hostname + domain = DOMAIN_REGEX.search(found_url.url)[0] url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls url_store[domain].append(URLScore(found_url.url, found_url.score)) + logger.info(f"URL store updated: {len(self._top_urls)} top domains, {len(self._other_urls)} other domains") + _sort_and_limit_urls(self._top_urls, MAX_TOP_URLS) _sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS) @@ -125,7 +138,6 @@ def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int): def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int): for domain in list(domains & domain_urls.keys()): new_urls = domain_urls[domain][:max_urls] - logger.info(f"Adding URLs {new_urls}") urls += [url_score.url for url_score in new_urls] new_domain_urls = domain_urls[domain][max_urls:] if len(new_domain_urls) > 0: