From 7dae39b78048e8bbcf03303769b9dbc2362b8e7d Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 13:32:15 +0000 Subject: [PATCH 1/6] WIP: improve method of getting URLs for crawling --- mwmbl/background.py | 12 +++++++++- mwmbl/crawler/app.py | 6 ----- mwmbl/crawler/urls.py | 51 ++++++++++++++++++++++++++++++++++--------- mwmbl/url_queue.py | 4 +++- 4 files changed, 55 insertions(+), 18 deletions(-) diff --git a/mwmbl/background.py b/mwmbl/background.py index cdb7489..f686bee 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -6,6 +6,8 @@ from multiprocessing import Queue from pathlib import Path from time import sleep +from mwmbl.crawler.urls import URLDatabase +from mwmbl.database import Database from mwmbl.indexer import index_batches, historical, update_urls from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME @@ -15,15 +17,23 @@ logger = getLogger(__name__) def run(data_path: str, url_queue: Queue): + logger.info("Started background process") + + with Database() as db: + url_db = URLDatabase(db.connection) + url_db.create_tables() + initialize_url_queue(url_queue) - historical.run() + # historical.run() index_path = Path(data_path) / INDEX_NAME batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) + while True: try: update_url_queue(url_queue) except Exception: logger.exception("Error updating URL queue") + return try: batch_cache.retrieve_batches(num_batches=10000) except Exception: diff --git a/mwmbl/crawler/app.py b/mwmbl/crawler/app.py index c7aa9de..e283cfc 100644 --- a/mwmbl/crawler/app.py +++ b/mwmbl/crawler/app.py @@ -48,12 +48,6 @@ last_batch = None def get_router(batch_cache: BatchCache, url_queue: Queue): router = APIRouter(prefix="/crawler", tags=["crawler"]) - @router.on_event("startup") - async def on_startup(): - with Database() as db: - url_db = URLDatabase(db.connection) - return url_db.create_tables() - @router.post('/batches/') def create_batch(batch: Batch): if len(batch.items) > MAX_BATCH_SIZE: diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index 02ef6f3..d5ae18b 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -4,15 +4,20 @@ Database storing info on URLs from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum +from logging import getLogger from psycopg2.extras import execute_values - +from mwmbl.hn_top_domains_filtered import DOMAINS # Client has one hour to crawl a URL that has been assigned to them, or it will be reassigned from mwmbl.utils import batch REASSIGN_MIN_HOURS = 5 BATCH_SIZE = 100 +MAX_TOP_DOMAIN_URLS = 10 + + +logger = getLogger(__name__) class URLStatus(Enum): @@ -43,6 +48,8 @@ class URLDatabase: self.connection = connection def create_tables(self): + logger.info("Creating URL tables") + sql = """ CREATE TABLE IF NOT EXISTS urls ( url VARCHAR PRIMARY KEY, @@ -53,8 +60,19 @@ class URLDatabase: ) """ + index_sql = """ + CREATE INDEX IF NOT EXISTS host_index + ON urls(substring(url FROM '.*://([^/]*)'), score) + """ + + view_sql = """ + CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls + """ + with self.connection.cursor() as cursor: cursor.execute(sql) + cursor.execute(index_sql) + cursor.execute(view_sql) def update_found_urls(self, found_urls: list[FoundURL]): if len(found_urls) == 0: @@ -109,27 +127,40 @@ class URLDatabase: execute_values(cursor, insert_sql, data) def get_urls_for_crawling(self, num_urls: int): + start = datetime.utcnow() + logger.info("Getting URLs for crawling") + + work_mem = "SET work_mem = '512MB'" + sql = f""" UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s WHERE url IN ( - SELECT url FROM urls - WHERE status IN ({URLStatus.NEW.value}) OR ( - status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s - ) - ORDER BY score DESC - LIMIT %(num_urls)s - FOR UPDATE SKIP LOCKED + SELECT url FROM ( + SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos + FROM url_and_hosts + WHERE host IN %(domains)s + AND status IN ({URLStatus.NEW.value}) OR ( + status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s + ) + ) u + WHERE pos < {MAX_TOP_DOMAIN_URLS} ) RETURNING url """ now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) + domains = tuple(DOMAINS.keys()) with self.connection.cursor() as cursor: - cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls}) + cursor.execute(work_mem) + cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains}) results = cursor.fetchall() - return [result[0] for result in results] + total_time_seconds = (datetime.now() - start).total_seconds() + results = [result[0] for result in results] + logger.info(f"Got {len(results)} in {total_time_seconds} seconds") + + return results def get_urls(self, status: URLStatus, num_urls: int): sql = f""" diff --git a/mwmbl/url_queue.py b/mwmbl/url_queue.py index 96deb5d..da3f75d 100644 --- a/mwmbl/url_queue.py +++ b/mwmbl/url_queue.py @@ -10,11 +10,13 @@ logger = getLogger(__name__) MAX_QUEUE_SIZE = 5000 +MIN_QUEUE_SIZE = 1000 def update_url_queue(url_queue: Queue): + logger.info("Updating URL queue") current_size = url_queue.qsize() - if current_size >= MAX_QUEUE_SIZE: + if current_size >= MIN_QUEUE_SIZE: logger.info(f"Skipping queue update, current size {current_size}") return From ea16e7b5cd0207816b62e9b0f2f11b6bf884d374 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 13:37:40 +0000 Subject: [PATCH 2/6] WIP: improve method of getting URLs for crawling --- mwmbl/crawler/urls.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index d5ae18b..9236ca8 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -130,20 +130,17 @@ class URLDatabase: start = datetime.utcnow() logger.info("Getting URLs for crawling") - work_mem = "SET work_mem = '512MB'" - sql = f""" UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s WHERE url IN ( - SELECT url FROM ( - SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos - FROM url_and_hosts - WHERE host IN %(domains)s + SELECT url FROM url_and_hosts + WHERE host = %(domain)s AND status IN ({URLStatus.NEW.value}) OR ( status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s ) - ) u - WHERE pos < {MAX_TOP_DOMAIN_URLS} + ORDER BY score DESC + LIMIT {MAX_TOP_DOMAIN_URLS} + ) ) RETURNING url """ @@ -151,13 +148,15 @@ class URLDatabase: now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) domains = tuple(DOMAINS.keys()) - with self.connection.cursor() as cursor: - cursor.execute(work_mem) - cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains}) - results = cursor.fetchall() + + results = [] + for domain in domains: + with self.connection.cursor() as cursor: + cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'domain': domain}) + domain_results = cursor.fetchall() + results += [result[0] for result in domain_results] total_time_seconds = (datetime.now() - start).total_seconds() - results = [result[0] for result in results] logger.info(f"Got {len(results)} in {total_time_seconds} seconds") return results From 36af579f7cec61bc45bae1a8a8f947e4e8a1a810 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 17:04:38 +0000 Subject: [PATCH 3/6] Sample domains --- mwmbl/crawler/urls.py | 47 ++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index 9236ca8..6c16e35 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -1,6 +1,7 @@ """ Database storing info on URLs """ +import random from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum @@ -130,35 +131,39 @@ class URLDatabase: start = datetime.utcnow() logger.info("Getting URLs for crawling") + work_mem = "SET work_mem = '512MB'" + sql = f""" - UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s - WHERE url IN ( - SELECT url FROM url_and_hosts - WHERE host = %(domain)s - AND status IN ({URLStatus.NEW.value}) OR ( - status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s - ) - ORDER BY score DESC - LIMIT {MAX_TOP_DOMAIN_URLS} - ) - ) - RETURNING url - """ + UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s + WHERE url IN ( + SELECT url FROM ( + SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos + FROM url_and_hosts + WHERE host IN %(domains)s + AND status IN ({URLStatus.NEW.value}) OR ( + status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s + ) + ) u + WHERE pos < {MAX_TOP_DOMAIN_URLS} + ) + RETURNING url + """ now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) - domains = tuple(DOMAINS.keys()) - - results = [] - for domain in domains: - with self.connection.cursor() as cursor: - cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'domain': domain}) - domain_results = cursor.fetchall() - results += [result[0] for result in domain_results] + domains = tuple(random.sample(DOMAINS.keys(), 100)) + logger.info(f"Getting URLs for domains {domains}") + with self.connection.cursor() as cursor: + cursor.execute(work_mem) + cursor.execute(sql, + {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains}) + results = cursor.fetchall() + results = [result[0] for result in results] total_time_seconds = (datetime.now() - start).total_seconds() logger.info(f"Got {len(results)} in {total_time_seconds} seconds") + random.shuffle(results) return results def get_urls(self, status: URLStatus, num_urls: int): From 77f08d8f0a91e360f66bd17a0cd82553dfcbc2c3 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 22:25:05 +0000 Subject: [PATCH 4/6] Update URL status --- mwmbl/crawler/urls.py | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index 6c16e35..f3d9388 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -133,33 +133,40 @@ class URLDatabase: work_mem = "SET work_mem = '512MB'" - sql = f""" - UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s - WHERE url IN ( - SELECT url FROM ( - SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos - FROM url_and_hosts - WHERE host IN %(domains)s + select_sql = f""" + SELECT (array_agg(url order by score desc))[:100] FROM url_and_hosts + WHERE host IN %(domains)s AND status IN ({URLStatus.NEW.value}) OR ( status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s ) - ) u - WHERE pos < {MAX_TOP_DOMAIN_URLS} - ) - RETURNING url + GROUP BY host + """ + + update_sql = f""" + UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s + WHERE url IN %(urls)s """ now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) - domains = tuple(random.sample(DOMAINS.keys(), 100)) + domains = tuple(random.sample(DOMAINS.keys(), 500)) + # domains = tuple(DOMAINS.keys()) logger.info(f"Getting URLs for domains {domains}") with self.connection.cursor() as cursor: cursor.execute(work_mem) - cursor.execute(sql, - {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains}) - results = cursor.fetchall() + cursor.execute(select_sql, + {'min_updated_date': min_updated_date, 'domains': domains}) + agg_results = cursor.fetchall() + + results = [] + for result in agg_results: + results += result[0] + + with self.connection.cursor() as cursor: + cursor.execute(update_sql, + {'now': now, 'urls': tuple(results)}) + print("Results", agg_results) - results = [result[0] for result in results] total_time_seconds = (datetime.now() - start).total_seconds() logger.info(f"Got {len(results)} in {total_time_seconds} seconds") From d9cd3c585b8d3e192a39980293778692011c0f79 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 22:51:00 +0000 Subject: [PATCH 5/6] Get results from other domains --- mwmbl/crawler/urls.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index f3d9388..fb02d6e 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -15,7 +15,9 @@ from mwmbl.utils import batch REASSIGN_MIN_HOURS = 5 BATCH_SIZE = 100 -MAX_TOP_DOMAIN_URLS = 10 +MAX_URLS_PER_TOP_DOMAIN = 100 +MAX_TOP_DOMAINS = 500 +MAX_OTHER_DOMAINS = 50000 logger = getLogger(__name__) @@ -134,7 +136,7 @@ class URLDatabase: work_mem = "SET work_mem = '512MB'" select_sql = f""" - SELECT (array_agg(url order by score desc))[:100] FROM url_and_hosts + SELECT (array_agg(url order by score desc))[:{MAX_URLS_PER_TOP_DOMAIN}] FROM url_and_hosts WHERE host IN %(domains)s AND status IN ({URLStatus.NEW.value}) OR ( status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s @@ -142,6 +144,14 @@ class URLDatabase: GROUP BY host """ + others_sql = f""" + SELECT DISTINCT ON (host) url FROM ( + SELECT * FROM url_and_hosts + WHERE status=0 + ORDER BY score DESC LIMIT {MAX_OTHER_DOMAINS}) u + ORDER BY host + """ + update_sql = f""" UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s WHERE url IN %(urls)s @@ -149,8 +159,7 @@ class URLDatabase: now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) - domains = tuple(random.sample(DOMAINS.keys(), 500)) - # domains = tuple(DOMAINS.keys()) + domains = tuple(random.sample(DOMAINS.keys(), MAX_TOP_DOMAINS)) logger.info(f"Getting URLs for domains {domains}") with self.connection.cursor() as cursor: cursor.execute(work_mem) @@ -161,11 +170,18 @@ class URLDatabase: results = [] for result in agg_results: results += result[0] + logger.info(f"Got {len(results)} top domain results") + + with self.connection.cursor() as cursor: + cursor.execute(others_sql) + other_results = cursor.fetchall() + other_results_list = [result[0] for result in other_results] + logger.info(f"Got {len(other_results_list)} results from all domains") + results += other_results_list with self.connection.cursor() as cursor: cursor.execute(update_sql, {'now': now, 'urls': tuple(results)}) - print("Results", agg_results) total_time_seconds = (datetime.now() - start).total_seconds() logger.info(f"Got {len(results)} in {total_time_seconds} seconds") From a86e172bf3b93574c1fd2fe2e41413e7074f528c Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sat, 31 Dec 2022 22:52:17 +0000 Subject: [PATCH 6/6] Reinstate background tasks --- mwmbl/background.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mwmbl/background.py b/mwmbl/background.py index f686bee..f1c1695 100644 --- a/mwmbl/background.py +++ b/mwmbl/background.py @@ -24,7 +24,7 @@ def run(data_path: str, url_queue: Queue): url_db.create_tables() initialize_url_queue(url_queue) - # historical.run() + historical.run() index_path = Path(data_path) / INDEX_NAME batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME) @@ -33,7 +33,6 @@ def run(data_path: str, url_queue: Queue): update_url_queue(url_queue) except Exception: logger.exception("Error updating URL queue") - return try: batch_cache.retrieve_batches(num_batches=10000) except Exception: