diff --git a/analyse/update_urls.py b/analyse/update_urls.py new file mode 100644 index 0000000..0655df7 --- /dev/null +++ b/analyse/update_urls.py @@ -0,0 +1,26 @@ +import os +import pickle +from datetime import datetime +from pathlib import Path +from queue import Queue + +from mwmbl.indexer.update_urls import record_urls_in_database + + +def run_update_urls_on_fixed_batches(): + with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "hashed-batches.pickle", "rb") as file: + batches = pickle.load(file) + + # print("Batches", batches[:3]) + + queue = Queue() + + start = datetime.now() + record_urls_in_database(batches, queue) + total_time = (datetime.now() - start).total_seconds() + + print("Total time:", total_time) + + +if __name__ == '__main__': + run_update_urls_on_fixed_batches() diff --git a/devdata/index-v2.tinysearch b/devdata/index-v2.tinysearch index fe10556..70a7c69 100644 Binary files a/devdata/index-v2.tinysearch and b/devdata/index-v2.tinysearch differ diff --git a/mwmbl/background.py b/mwmbl/background.py index b76dac7..f10f0d2 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -2,39 +2,34 @@ Script that updates data in a background process. """ from logging import getLogger -from multiprocessing import Queue from pathlib import Path from time import sleep from mwmbl.crawler.urls import URLDatabase from mwmbl.database import Database -from mwmbl.indexer import index_batches, historical, update_urls +from mwmbl.indexer import index_batches, historical from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME logger = getLogger(__name__) -def run(data_path: str, new_item_queue: Queue): +def run(data_path: str): logger.info("Started background process") with Database() as db: url_db = URLDatabase(db.connection) url_db.create_tables() - # historical.run() + historical.run() index_path = Path(data_path) / INDEX_NAME batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) while True: - # try: - # batch_cache.retrieve_batches(num_batches=10000) - # except Exception: - # logger.exception("Error retrieving batches") try: - update_urls.run(batch_cache, new_item_queue) + batch_cache.retrieve_batches(num_batches=10000) except Exception: - logger.exception("Error updating URLs") + logger.exception("Error retrieving batches") try: index_batches.run(batch_cache, index_path) except Exception: diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index 2a33148..78ef91e 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -64,19 +64,19 @@ class URLDatabase: ) """ - index_sql = """ - CREATE INDEX IF NOT EXISTS host_index - ON urls(substring(url FROM '.*://([^/]*)'), score) - """ - - view_sql = """ - CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls - """ + # index_sql = """ + # CREATE INDEX IF NOT EXISTS host_index + # ON urls(substring(url FROM '.*://([^/]*)'), score) + # """ + # + # view_sql = """ + # CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls + # """ with self.connection.cursor() as cursor: cursor.execute(sql) - cursor.execute(index_sql) - cursor.execute(view_sql) + # cursor.execute(index_sql) + # cursor.execute(view_sql) def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]: if len(found_urls) == 0: @@ -138,71 +138,6 @@ class URLDatabase: updated = [FoundURL(*result) for result in results] return updated - def get_urls_for_crawling(self): - start = datetime.utcnow() - logger.info("Getting URLs for crawling") - - work_mem = "SET work_mem = '512MB'" - - select_sql = f""" - SELECT host, (array_agg(url order by score desc))[:{MAX_URLS_PER_TOP_DOMAIN}] FROM url_and_hosts - WHERE host IN %(domains)s - AND (status = {URLStatus.NEW.value} OR ( - status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s - )) - GROUP BY host - """ - - others_sql = f""" - SELECT DISTINCT ON (host) url FROM ( - SELECT * FROM url_and_hosts - WHERE status = {URLStatus.NEW.value} OR ( - status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s - ) - ORDER BY score DESC LIMIT {MAX_OTHER_DOMAINS}) u - ORDER BY host - """ - - update_sql = f""" - UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s - WHERE url IN %(urls)s - """ - - now = datetime.utcnow() - min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) - domain_sample = set(random.sample(DOMAINS.keys(), MAX_TOP_DOMAINS)) | CORE_DOMAINS - domains = tuple(domain_sample) - logger.info(f"Getting URLs for domains {domains}") - with self.connection.cursor() as cursor: - cursor.execute(work_mem) - cursor.execute(select_sql, - {'min_updated_date': min_updated_date, 'domains': domains}) - agg_results = cursor.fetchall() - logger.info(f"Agg results: {agg_results}") - - results = [] - for host, urls in agg_results: - results += urls - - logger.info(f"Got {len(results)} top domain results") - - with self.connection.cursor() as cursor: - cursor.execute(others_sql, {'min_updated_date': min_updated_date}) - other_results = cursor.fetchall() - other_results_list = [result[0] for result in other_results] - logger.info(f"Got {len(other_results_list)} results from all domains") - results += other_results_list - - with self.connection.cursor() as cursor: - cursor.execute(update_sql, - {'now': now, 'urls': tuple(results)}) - - total_time_seconds = (datetime.now() - start).total_seconds() - logger.info(f"Got {len(results)} in {total_time_seconds} seconds") - - random.shuffle(results) - return results - def get_urls(self, status: URLStatus, num_urls: int): sql = f""" SELECT url FROM urls diff --git a/mwmbl/indexer/batch_cache.py b/mwmbl/indexer/batch_cache.py index 8081c28..3001e81 100644 --- a/mwmbl/indexer/batch_cache.py +++ b/mwmbl/indexer/batch_cache.py @@ -51,7 +51,7 @@ class BatchCache: with Database() as db: index_db = IndexDatabase(db.connection) batches = index_db.get_batches_by_status(BatchStatus.REMOTE, num_batches) - print(f"Found {len(batches)} remote batches") + logger.info(f"Found {len(batches)} remote batches") if len(batches) == 0: return urls = [batch.url for batch in batches] @@ -60,7 +60,7 @@ class BatchCache: total_processed = 0 for result in results: total_processed += result - print("Processed batches with items:", total_processed) + logger.info(f"Processed batches with {total_processed} items") index_db.update_batch_status(urls, BatchStatus.LOCAL) def retrieve_batch(self, url): @@ -68,7 +68,7 @@ class BatchCache: try: batch = HashedBatch.parse_obj(data) except ValidationError: - print("Failed to validate batch", data) + logger.info(f"Failed to validate batch {data}") return 0 if len(batch.items) > 0: self.store(batch, url) @@ -76,7 +76,7 @@ class BatchCache: def store(self, batch, url): path = self.get_path_from_url(url) - print(f"Storing local batch at {path}") + logger.debug(f"Storing local batch at {path}") os.makedirs(path.parent, exist_ok=True) with open(path, 'wb') as output_file: data = gzip.compress(batch.json().encode('utf8')) diff --git a/mwmbl/indexer/update_urls.py b/mwmbl/indexer/update_urls.py index 4ad299d..4c85ba2 100644 --- a/mwmbl/indexer/update_urls.py +++ b/mwmbl/indexer/update_urls.py @@ -1,7 +1,11 @@ +import os +import pickle from collections import defaultdict from datetime import datetime, timezone, timedelta from logging import getLogger from multiprocessing import Queue +from pathlib import Path +from time import sleep from typing import Iterable, Collection from urllib.parse import urlparse @@ -13,12 +17,24 @@ from mwmbl.indexer import process_batch from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.index_batches import get_url_error_status from mwmbl.indexer.indexdb import BatchStatus +from mwmbl.indexer.paths import BATCH_DIR_NAME from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \ SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH, EXTRA_LINK_MULTIPLIER +from mwmbl.utils import get_domain logger = getLogger(__name__) +def update_urls_continuously(data_path: str, new_item_queue: Queue): + batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) + while True: + try: + run(batch_cache, new_item_queue) + except Exception: + logger.exception("Error updating URLs") + sleep(10) + + def run(batch_cache: BatchCache, new_item_queue: Queue): process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue) @@ -40,7 +56,7 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu url_statuses[item.url] = get_url_error_status(item) else: url_statuses[item.url] = URLStatus.CRAWLED - crawled_page_domain = urlparse(item.url).netloc + crawled_page_domain = get_domain(item.url) score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER for link in item.content.links: process_link(batch, crawled_page_domain, link, score_multiplier, timestamp, url_scores, @@ -54,7 +70,6 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url]) for url in url_scores.keys() | url_statuses.keys()] - # TODO: why does this number not match the new items number below? at all. logger.info(f"Found URLs, {len(found_urls)}") urls = url_db.update_found_urls(found_urls) new_item_queue.put(urls) diff --git a/mwmbl/main.py b/mwmbl/main.py index dedc0aa..81b2191 100644 --- a/mwmbl/main.py +++ b/mwmbl/main.py @@ -11,6 +11,7 @@ from mwmbl import background from mwmbl.crawler import app as crawler from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME +from mwmbl.indexer.update_urls import update_urls_continuously from mwmbl.tinysearchengine import search from mwmbl.tinysearchengine.completer import Completer from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE @@ -18,7 +19,7 @@ from mwmbl.tinysearchengine.rank import HeuristicRanker from mwmbl.url_queue import update_queue_continuously FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s' -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT) +logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT) MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle' @@ -51,8 +52,9 @@ def run(): queued_batches = Queue() if args.background: - Process(target=background.run, args=(args.data, new_item_queue)).start() + Process(target=background.run, args=(args.data,)).start() Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start() + Process(target=update_urls_continuously, args=(args.data, new_item_queue)).start() completer = Completer() diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index d1fd047..ff6b374 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -1,25 +1,17 @@ -import os -import pickle -import random -import re import time from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta from logging import getLogger from multiprocessing import Queue -from pathlib import Path from queue import Empty -from time import sleep from typing import KeysView, Union -from urllib.parse import urlparse from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS from mwmbl.database import Database from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS from mwmbl.settings import CORE_DOMAINS -from mwmbl.utils import batch - +from mwmbl.utils import batch, get_domain logger = getLogger(__name__) @@ -33,8 +25,6 @@ MAX_URLS_PER_TOP_DOMAIN = 100 MAX_URLS_PER_OTHER_DOMAIN = 5 MAX_OTHER_DOMAINS = 10000 -DOMAIN_REGEX = re.compile(r".*://([^/]*)") - @dataclass class URLScore: @@ -72,7 +62,7 @@ class URLQueue: return num_processed def _process_found_urls(self, found_urls: list[FoundURL]): - logger.info("Processing found URLs") + logger.info(f"Processing found URLs: {found_urls[:1000]}") # with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "wb") as output_file: # pickle.dump(found_urls, output_file) # logger.info("Dumped") @@ -98,7 +88,7 @@ class URLQueue: def _sort_urls(self, valid_urls: list[FoundURL]): for found_url in valid_urls: - domain = DOMAIN_REGEX.search(found_url.url)[0] + domain = get_domain(found_url.url) url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls url_store[domain].append(URLScore(found_url.url, found_url.score)) @@ -126,9 +116,13 @@ class URLQueue: self._queued_batches.put(url_batch, block=False) @property - def num_queued_batches(self): + def num_queued_batches(self) -> int: return self._queued_batches.qsize() + @property + def num_top_domains(self) -> int: + return len(self._top_urls) + def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int): for domain, urls in domain_urls.items(): @@ -151,7 +145,8 @@ def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue): queue.initialize() while True: num_processed = queue.update() - logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}") + logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}, num top " + f"domains: {queue.num_top_domains}") if num_processed == 0: time.sleep(5) diff --git a/mwmbl/utils.py b/mwmbl/utils.py index c8083af..f98ae61 100644 --- a/mwmbl/utils.py +++ b/mwmbl/utils.py @@ -1,3 +1,8 @@ +import re + +DOMAIN_REGEX = re.compile(r".*://([^/]*)") + + def batch(items: list, batch_size): """ Adapted from https://stackoverflow.com/a/8290508 @@ -5,3 +10,8 @@ def batch(items: list, batch_size): length = len(items) for ndx in range(0, length, batch_size): yield items[ndx:min(ndx + batch_size, length)] + + +def get_domain(url): + domain = DOMAIN_REGEX.search(url)[0] + return domain