diff --git a/devdata/index-v2.tinysearch b/devdata/index-v2.tinysearch index 2c2df96..d361a8d 100644 Binary files a/devdata/index-v2.tinysearch and b/devdata/index-v2.tinysearch differ diff --git a/mwmbl/background.py b/mwmbl/background.py index 2bc087a..b76dac7 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -2,6 +2,7 @@ Script that updates data in a background process. """ from logging import getLogger +from multiprocessing import Queue from pathlib import Path from time import sleep @@ -14,7 +15,7 @@ from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME logger = getLogger(__name__) -def run(data_path: str): +def run(data_path: str, new_item_queue: Queue): logger.info("Started background process") with Database() as db: @@ -31,7 +32,7 @@ def run(data_path: str): # except Exception: # logger.exception("Error retrieving batches") try: - update_urls.run(batch_cache) + update_urls.run(batch_cache, new_item_queue) except Exception: logger.exception("Error updating URLs") try: diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index b2eb786..2a33148 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -80,7 +80,7 @@ class URLDatabase: def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]: if len(found_urls) == 0: - return + return [] get_urls_sql = """ SELECT url FROM urls @@ -104,7 +104,7 @@ class URLDatabase: updated = CASE WHEN urls.status > excluded.status THEN urls.updated ELSE excluded.updated END - RETURNING (url, user_id_hash, score, status, timestamp) + RETURNING url, user_id_hash, score, status, updated """ input_urls = [x.url for x in found_urls] @@ -112,6 +112,7 @@ class URLDatabase: with self.connection as connection: with connection.cursor() as cursor: + logger.info(f"Input URLs: {len(input_urls)}") cursor.execute(get_urls_sql, {'urls': tuple(input_urls)}) existing_urls = {x[0] for x in cursor.fetchall()} new_urls = set(input_urls) - existing_urls @@ -120,6 +121,7 @@ class URLDatabase: locked_urls = {x[0] for x in cursor.fetchall()} urls_to_insert = new_urls | locked_urls + logger.info(f"URLs to insert: {len(urls_to_insert)}") if len(urls_to_insert) != len(input_urls): print(f"Only got {len(urls_to_insert)} instead of {len(input_urls)} - {len(new_urls)} new") @@ -129,8 +131,10 @@ class URLDatabase: (found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp) for found_url in sorted_urls if found_url.url in urls_to_insert] - execute_values(cursor, insert_sql, data) - results = cursor.fetchall() + logger.info(f"Data: {len(data)}") + results = execute_values(cursor, insert_sql, data, fetch=True) + # results = cursor.fetchall() + logger.info(f"Results: {len(results)}") updated = [FoundURL(*result) for result in results] return updated diff --git a/mwmbl/indexer/process_batch.py b/mwmbl/indexer/process_batch.py index f8a67bd..0c7ce02 100644 --- a/mwmbl/indexer/process_batch.py +++ b/mwmbl/indexer/process_batch.py @@ -10,13 +10,13 @@ logger = getLogger(__name__) def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus, - process: Callable[[Collection[HashedBatch]], None]): + process: Callable[[Collection[HashedBatch], ...], None], *args): with Database() as db: index_db = IndexDatabase(db.connection) logger.info(f"Getting batches with status {start_status}") - batches = index_db.get_batches_by_status(start_status, 10000) + batches = index_db.get_batches_by_status(start_status, 1000) logger.info(f"Got {len(batches)} batch urls") if len(batches) == 0: return @@ -24,6 +24,6 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta batch_data = batch_cache.get_cached([batch.url for batch in batches]) logger.info(f"Got {len(batch_data)} cached batches") - process(batch_data.values()) + process(batch_data.values(), *args) index_db.update_batch_status(list(batch_data.keys()), end_status) diff --git a/mwmbl/indexer/update_urls.py b/mwmbl/indexer/update_urls.py index 2f57215..4ad299d 100644 --- a/mwmbl/indexer/update_urls.py +++ b/mwmbl/indexer/update_urls.py @@ -1,6 +1,7 @@ from collections import defaultdict from datetime import datetime, timezone, timedelta from logging import getLogger +from multiprocessing import Queue from typing import Iterable, Collection from urllib.parse import urlparse @@ -18,11 +19,11 @@ from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FO logger = getLogger(__name__) -def run(batch_cache: BatchCache): - process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database) +def run(batch_cache: BatchCache, new_item_queue: Queue): + process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue) -def record_urls_in_database(batches: Collection[HashedBatch]): +def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Queue): logger.info(f"Recording URLs in database for {len(batches)} batches") with Database() as db: url_db = URLDatabase(db.connection) @@ -53,7 +54,11 @@ def record_urls_in_database(batches: Collection[HashedBatch]): found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url]) for url in url_scores.keys() | url_statuses.keys()] - url_db.update_found_urls(found_urls) + # TODO: why does this number not match the new items number below? at all. + logger.info(f"Found URLs, {len(found_urls)}") + urls = url_db.update_found_urls(found_urls) + new_item_queue.put(urls) + logger.info(f"Put {len(urls)} new items in the URL queue") def process_link(batch, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool): diff --git a/mwmbl/main.py b/mwmbl/main.py index d4d59a9..37c6eb6 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -51,7 +51,7 @@ def run(): queued_batches = Queue() if args.background: - Process(target=background.run, args=(args.data,)).start() + Process(target=background.run, args=(args.data, new_item_queue)).start() Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start() completer = Completer() diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index 8930407..6404d5d 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -1,12 +1,19 @@ +import random import time +from collections import defaultdict +from dataclasses import dataclass from datetime import datetime, timedelta from logging import getLogger from multiprocessing import Queue from queue import Empty from time import sleep +from typing import KeysView, Union +from urllib.parse import urlparse from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS from mwmbl.database import Database +from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS +from mwmbl.settings import CORE_DOMAINS from mwmbl.utils import batch @@ -14,7 +21,19 @@ logger = getLogger(__name__) MAX_QUEUE_SIZE = 5000 -MIN_QUEUE_SIZE = 1000 + +MAX_TOP_URLS = 100000 +MAX_OTHER_URLS = 1000 +MAX_URLS_PER_CORE_DOMAIN = 1000 +MAX_URLS_PER_TOP_DOMAIN = 100 +MAX_URLS_PER_OTHER_DOMAIN = 5 +MAX_OTHER_DOMAINS = 10000 + + +@dataclass +class URLScore: + url: str + score: float class URLQueue: @@ -25,6 +44,8 @@ class URLQueue: """ self._new_item_queue = new_item_queue self._queued_batches = queued_batches + self._other_urls = defaultdict(list) + self._top_urls = defaultdict(list) def initialize(self): with Database() as db: @@ -41,16 +62,51 @@ class URLQueue: num_processed += 1 except Empty: break - self.process_found_urls(new_batch) + self._process_found_urls(new_batch) return num_processed - def process_found_urls(self, found_urls: list[FoundURL]): + def _process_found_urls(self, found_urls: list[FoundURL]): min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS) - valid_urls = [found_url.url for found_url in found_urls if found_url.status == URLStatus.NEW or ( - found_url.status == URLStatus.ASSIGNED and found_url.timestamp < min_updated_date)] + logger.info(f"Found URLS: {len(found_urls)}") + valid_urls = [found_url for found_url in found_urls if found_url.status == URLStatus.NEW.value or ( + found_url.status == URLStatus.ASSIGNED.value and found_url.timestamp < min_updated_date)] + logger.info(f"Valid URLs: {len(valid_urls)}") - self._queue_urls(valid_urls) + self._sort_urls(valid_urls) + logger.info(f"Queue size: {self.num_queued_batches}") + while self.num_queued_batches < MAX_QUEUE_SIZE and len(self._top_urls) > 0: + total_top_urls = sum(len(urls) for urls in self._top_urls.values()) + logger.info(f"Total top URLs stored: {total_top_urls}") + + total_other_urls = sum(len(urls) for urls in self._other_urls.values()) + logger.info(f"Total other URLs stored: {total_other_urls}") + + self._batch_urls() + logger.info(f"Queue size after batching: {self.num_queued_batches}") + + def _sort_urls(self, valid_urls: list[FoundURL]): + for found_url in valid_urls: + domain = urlparse(found_url.url).hostname + url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls + url_store[domain].append(URLScore(found_url.url, found_url.score)) + + _sort_and_limit_urls(self._top_urls, MAX_TOP_URLS) + _sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS) + + # Keep only the top "other" domains, ranked by the top item for that domain + top_other_urls = sorted(self._other_urls.items(), key=lambda x: x[1][0].score, reverse=True)[:MAX_OTHER_DOMAINS] + self._other_urls = dict(top_other_urls) + + def _batch_urls(self): + urls = [] + logger.info("Adding core domains") + _add_urls(CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_CORE_DOMAIN) + logger.info("Adding top domains") + _add_urls(TOP_DOMAINS.keys() - CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_TOP_DOMAIN) + logger.info("Adding other domains") + _add_urls(self._other_urls.keys(), self._other_urls, urls, MAX_URLS_PER_OTHER_DOMAIN) + self._queue_urls(urls) def _queue_urls(self, valid_urls: list[str]): for url_batch in batch(valid_urls, BATCH_SIZE): @@ -61,6 +117,23 @@ class URLQueue: return self._queued_batches.qsize() +def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int): + for domain, urls in domain_urls.items(): + domain_urls[domain] = sorted(urls, key=lambda url_score: url_score.score, reverse=True)[:max_urls] + + +def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int): + for domain in list(domains & domain_urls.keys()): + new_urls = domain_urls[domain][:max_urls] + logger.info(f"Adding URLs {new_urls}") + urls += [url_score.url for url_score in new_urls] + new_domain_urls = domain_urls[domain][max_urls:] + if len(new_domain_urls) > 0: + domain_urls[domain] = new_domain_urls + else: + del domain_urls[domain] + + def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue): queue = URLQueue(new_item_queue, queued_batches) queue.initialize() diff --git a/test/test_url_queue.py b/test/test_url_queue.py new file mode 100644 index 0000000..05561ba --- /dev/null +++ b/test/test_url_queue.py @@ -0,0 +1,19 @@ +from datetime import datetime +from queue import Queue + +from mwmbl.crawler.urls import FoundURL, URLStatus +from mwmbl.url_queue import URLQueue + + +def test_url_queue_empties(): + new_item_queue = Queue() + queued_batches = Queue() + + url_queue = URLQueue(new_item_queue, queued_batches) + new_item_queue.put([FoundURL("https://google.com", "123", 10.0, URLStatus.NEW.value, datetime(2023, 1, 19))]) + + url_queue.update() + + items = queued_batches.get(block=False) + + assert items == ["https://google.com"]