Browse Source

Try and balance URLs before adding to queue

Daoud Clarke 2 years ago
parent
commit
2b36f2ccc1

BIN
devdata/index-v2.tinysearch


+ 3 - 2
mwmbl/background.py

@@ -2,6 +2,7 @@
 Script that updates data in a background process.
 """
 from logging import getLogger
+from multiprocessing import Queue
 from pathlib import Path
 from time import sleep
 
@@ -14,7 +15,7 @@ from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
 logger = getLogger(__name__)
 
 
-def run(data_path: str):
+def run(data_path: str, new_item_queue: Queue):
     logger.info("Started background process")
 
     with Database() as db:
@@ -31,7 +32,7 @@ def run(data_path: str):
         # except Exception:
         #     logger.exception("Error retrieving batches")
         try:
-            update_urls.run(batch_cache)
+            update_urls.run(batch_cache, new_item_queue)
         except Exception:
             logger.exception("Error updating URLs")
         try:

+ 8 - 4
mwmbl/crawler/urls.py

@@ -80,7 +80,7 @@ class URLDatabase:
 
     def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]:
         if len(found_urls) == 0:
-            return
+            return []
 
         get_urls_sql = """
           SELECT url FROM urls
@@ -104,7 +104,7 @@ class URLDatabase:
            updated = CASE
              WHEN urls.status > excluded.status THEN urls.updated ELSE excluded.updated
            END
-        RETURNING (url, user_id_hash, score, status, timestamp)
+        RETURNING url, user_id_hash, score, status, updated
         """
 
         input_urls = [x.url for x in found_urls]
@@ -112,6 +112,7 @@ class URLDatabase:
 
         with self.connection as connection:
             with connection.cursor() as cursor:
+                logger.info(f"Input URLs: {len(input_urls)}")
                 cursor.execute(get_urls_sql, {'urls': tuple(input_urls)})
                 existing_urls = {x[0] for x in cursor.fetchall()}
                 new_urls = set(input_urls) - existing_urls
@@ -120,6 +121,7 @@ class URLDatabase:
                 locked_urls = {x[0] for x in cursor.fetchall()}
 
                 urls_to_insert = new_urls | locked_urls
+                logger.info(f"URLs to insert: {len(urls_to_insert)}")
 
                 if len(urls_to_insert) != len(input_urls):
                     print(f"Only got {len(urls_to_insert)} instead of {len(input_urls)} - {len(new_urls)} new")
@@ -129,8 +131,10 @@ class URLDatabase:
                     (found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp)
                     for found_url in sorted_urls if found_url.url in urls_to_insert]
 
-                execute_values(cursor, insert_sql, data)
-                results = cursor.fetchall()
+                logger.info(f"Data: {len(data)}")
+                results = execute_values(cursor, insert_sql, data, fetch=True)
+                # results = cursor.fetchall()
+                logger.info(f"Results: {len(results)}")
                 updated = [FoundURL(*result) for result in results]
                 return updated
 

+ 3 - 3
mwmbl/indexer/process_batch.py

@@ -10,13 +10,13 @@ logger = getLogger(__name__)
 
 
 def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus,
-        process: Callable[[Collection[HashedBatch]], None]):
+        process: Callable[[Collection[HashedBatch], ...], None], *args):
 
     with Database() as db:
         index_db = IndexDatabase(db.connection)
 
         logger.info(f"Getting batches with status {start_status}")
-        batches = index_db.get_batches_by_status(start_status, 10000)
+        batches = index_db.get_batches_by_status(start_status, 1000)
         logger.info(f"Got {len(batches)} batch urls")
         if len(batches) == 0:
             return
@@ -24,6 +24,6 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta
         batch_data = batch_cache.get_cached([batch.url for batch in batches])
         logger.info(f"Got {len(batch_data)} cached batches")
 
-        process(batch_data.values())
+        process(batch_data.values(), *args)
 
         index_db.update_batch_status(list(batch_data.keys()), end_status)

+ 9 - 4
mwmbl/indexer/update_urls.py

@@ -1,6 +1,7 @@
 from collections import defaultdict
 from datetime import datetime, timezone, timedelta
 from logging import getLogger
+from multiprocessing import Queue
 from typing import Iterable, Collection
 from urllib.parse import urlparse
 
@@ -18,11 +19,11 @@ from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FO
 logger = getLogger(__name__)
 
 
-def run(batch_cache: BatchCache):
-    process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database)
+def run(batch_cache: BatchCache, new_item_queue: Queue):
+    process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue)
 
 
-def record_urls_in_database(batches: Collection[HashedBatch]):
+def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Queue):
     logger.info(f"Recording URLs in database for {len(batches)} batches")
     with Database() as db:
         url_db = URLDatabase(db.connection)
@@ -53,7 +54,11 @@ def record_urls_in_database(batches: Collection[HashedBatch]):
         found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
                       for url in url_scores.keys() | url_statuses.keys()]
 
-        url_db.update_found_urls(found_urls)
+        # TODO: why does this number not match the new items number below? at all.
+        logger.info(f"Found URLs, {len(found_urls)}")
+        urls = url_db.update_found_urls(found_urls)
+        new_item_queue.put(urls)
+        logger.info(f"Put {len(urls)} new items in the URL queue")
 
 
 def process_link(batch, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool):

+ 1 - 1
mwmbl/main.py

@@ -51,7 +51,7 @@ def run():
     queued_batches = Queue()
 
     if args.background:
-        Process(target=background.run, args=(args.data,)).start()
+        Process(target=background.run, args=(args.data, new_item_queue)).start()
         Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start()
 
     completer = Completer()

+ 80 - 7
mwmbl/url_queue.py

@@ -1,12 +1,19 @@
+import random
 import time
+from collections import defaultdict
+from dataclasses import dataclass
 from datetime import datetime, timedelta
 from logging import getLogger
 from multiprocessing import Queue
 from queue import Empty
 from time import sleep
+from typing import KeysView, Union
+from urllib.parse import urlparse
 
 from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS
 from mwmbl.database import Database
+from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS
+from mwmbl.settings import CORE_DOMAINS
 from mwmbl.utils import batch
 
 
@@ -14,7 +21,19 @@ logger = getLogger(__name__)
 
 
 MAX_QUEUE_SIZE = 5000
-MIN_QUEUE_SIZE = 1000
+
+MAX_TOP_URLS = 100000
+MAX_OTHER_URLS = 1000
+MAX_URLS_PER_CORE_DOMAIN = 1000
+MAX_URLS_PER_TOP_DOMAIN = 100
+MAX_URLS_PER_OTHER_DOMAIN = 5
+MAX_OTHER_DOMAINS = 10000
+
+
+@dataclass
+class URLScore:
+    url: str
+    score: float
 
 
 class URLQueue:
@@ -25,6 +44,8 @@ class URLQueue:
         """
         self._new_item_queue = new_item_queue
         self._queued_batches = queued_batches
+        self._other_urls = defaultdict(list)
+        self._top_urls = defaultdict(list)
 
     def initialize(self):
         with Database() as db:
@@ -41,16 +62,51 @@ class URLQueue:
                 num_processed += 1
             except Empty:
                 break
-            self.process_found_urls(new_batch)
+            self._process_found_urls(new_batch)
         return num_processed
 
-    def process_found_urls(self, found_urls: list[FoundURL]):
+    def _process_found_urls(self, found_urls: list[FoundURL]):
         min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
 
-        valid_urls = [found_url.url for found_url in found_urls if found_url.status == URLStatus.NEW or (
-                found_url.status == URLStatus.ASSIGNED and found_url.timestamp < min_updated_date)]
-
-        self._queue_urls(valid_urls)
+        logger.info(f"Found URLS: {len(found_urls)}")
+        valid_urls = [found_url for found_url in found_urls if found_url.status == URLStatus.NEW.value or (
+                found_url.status == URLStatus.ASSIGNED.value and found_url.timestamp < min_updated_date)]
+        logger.info(f"Valid URLs: {len(valid_urls)}")
+
+        self._sort_urls(valid_urls)
+        logger.info(f"Queue size: {self.num_queued_batches}")
+        while self.num_queued_batches < MAX_QUEUE_SIZE and len(self._top_urls) > 0:
+            total_top_urls = sum(len(urls) for urls in self._top_urls.values())
+            logger.info(f"Total top URLs stored: {total_top_urls}")
+
+            total_other_urls = sum(len(urls) for urls in self._other_urls.values())
+            logger.info(f"Total other URLs stored: {total_other_urls}")
+
+            self._batch_urls()
+            logger.info(f"Queue size after batching: {self.num_queued_batches}")
+
+    def _sort_urls(self, valid_urls: list[FoundURL]):
+        for found_url in valid_urls:
+            domain = urlparse(found_url.url).hostname
+            url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
+            url_store[domain].append(URLScore(found_url.url, found_url.score))
+
+        _sort_and_limit_urls(self._top_urls, MAX_TOP_URLS)
+        _sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS)
+
+        # Keep only the top "other" domains, ranked by the top item for that domain
+        top_other_urls = sorted(self._other_urls.items(), key=lambda x: x[1][0].score, reverse=True)[:MAX_OTHER_DOMAINS]
+        self._other_urls = dict(top_other_urls)
+
+    def _batch_urls(self):
+        urls = []
+        logger.info("Adding core domains")
+        _add_urls(CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_CORE_DOMAIN)
+        logger.info("Adding top domains")
+        _add_urls(TOP_DOMAINS.keys() - CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_TOP_DOMAIN)
+        logger.info("Adding other domains")
+        _add_urls(self._other_urls.keys(), self._other_urls, urls, MAX_URLS_PER_OTHER_DOMAIN)
+        self._queue_urls(urls)
 
     def _queue_urls(self, valid_urls: list[str]):
         for url_batch in batch(valid_urls, BATCH_SIZE):
@@ -61,6 +117,23 @@ class URLQueue:
         return self._queued_batches.qsize()
 
 
+def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int):
+    for domain, urls in domain_urls.items():
+        domain_urls[domain] = sorted(urls, key=lambda url_score: url_score.score, reverse=True)[:max_urls]
+
+
+def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int):
+    for domain in list(domains & domain_urls.keys()):
+        new_urls = domain_urls[domain][:max_urls]
+        logger.info(f"Adding URLs {new_urls}")
+        urls += [url_score.url for url_score in new_urls]
+        new_domain_urls = domain_urls[domain][max_urls:]
+        if len(new_domain_urls) > 0:
+            domain_urls[domain] = new_domain_urls
+        else:
+            del domain_urls[domain]
+
+
 def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue):
     queue = URLQueue(new_item_queue, queued_batches)
     queue.initialize()

+ 19 - 0
test/test_url_queue.py

@@ -0,0 +1,19 @@
+from datetime import datetime
+from queue import Queue
+
+from mwmbl.crawler.urls import FoundURL, URLStatus
+from mwmbl.url_queue import URLQueue
+
+
+def test_url_queue_empties():
+    new_item_queue = Queue()
+    queued_batches = Queue()
+
+    url_queue = URLQueue(new_item_queue, queued_batches)
+    new_item_queue.put([FoundURL("https://google.com", "123", 10.0, URLStatus.NEW.value, datetime(2023, 1, 19))])
+
+    url_queue.update()
+
+    items = queued_batches.get(block=False)
+
+    assert items == ["https://google.com"]