diff --git a/mwmbl/crawler/urls.py b/mwmbl/crawler/urls.py index 9236ca8..6c16e35 100644 --- a/mwmbl/crawler/urls.py +++ b/mwmbl/crawler/urls.py @@ -1,6 +1,7 @@ """ Database storing info on URLs """ +import random from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum @@ -130,35 +131,39 @@ class URLDatabase: start = datetime.utcnow() logger.info("Getting URLs for crawling") + work_mem = "SET work_mem = '512MB'" + sql = f""" - UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s - WHERE url IN ( - SELECT url FROM url_and_hosts - WHERE host = %(domain)s - AND status IN ({URLStatus.NEW.value}) OR ( - status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s - ) - ORDER BY score DESC - LIMIT {MAX_TOP_DOMAIN_URLS} - ) - ) - RETURNING url - """ + UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s + WHERE url IN ( + SELECT url FROM ( + SELECT url, host, score, rank() OVER (PARTITION BY host ORDER BY score DESC) AS pos + FROM url_and_hosts + WHERE host IN %(domains)s + AND status IN ({URLStatus.NEW.value}) OR ( + status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s + ) + ) u + WHERE pos < {MAX_TOP_DOMAIN_URLS} + ) + RETURNING url + """ now = datetime.utcnow() min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS) - domains = tuple(DOMAINS.keys()) - - results = [] - for domain in domains: - with self.connection.cursor() as cursor: - cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'domain': domain}) - domain_results = cursor.fetchall() - results += [result[0] for result in domain_results] + domains = tuple(random.sample(DOMAINS.keys(), 100)) + logger.info(f"Getting URLs for domains {domains}") + with self.connection.cursor() as cursor: + cursor.execute(work_mem) + cursor.execute(sql, + {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls, 'domains': domains}) + results = cursor.fetchall() + results = [result[0] for result in results] total_time_seconds = (datetime.now() - start).total_seconds() logger.info(f"Got {len(results)} in {total_time_seconds} seconds") + random.shuffle(results) return results def get_urls(self, status: URLStatus, num_urls: int):