commit
0d33b4f68f
4 changed files with 86 additions and 23 deletions
|
@ -6,6 +6,8 @@ from multiprocessing import Queue
|
|||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from mwmbl.crawler.urls import URLDatabase
|
||||
from mwmbl.database import Database
|
||||
from mwmbl.indexer import index_batches, historical, update_urls
|
||||
from mwmbl.indexer.batch_cache import BatchCache
|
||||
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
|
||||
|
@ -15,10 +17,17 @@ logger = getLogger(__name__)
|
|||
|
||||
|
||||
def run(data_path: str, url_queue: Queue):
|
||||
logger.info("Started background process")
|
||||
|
||||
with Database() as db:
|
||||
url_db = URLDatabase(db.connection)
|
||||
url_db.create_tables()
|
||||
|
||||
initialize_url_queue(url_queue)
|
||||
historical.run()
|
||||
index_path = Path(data_path) / INDEX_NAME
|
||||
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
|
||||
|
||||
while True:
|
||||
try:
|
||||
update_url_queue(url_queue)
|
||||
|
|
|
@ -48,12 +48,6 @@ last_batch = None
|
|||
def get_router(batch_cache: BatchCache, url_queue: Queue):
|
||||
router = APIRouter(prefix="/crawler", tags=["crawler"])
|
||||
|
||||
@router.on_event("startup")
|
||||
async def on_startup():
|
||||
with Database() as db:
|
||||
url_db = URLDatabase(db.connection)
|
||||
return url_db.create_tables()
|
||||
|
||||
@router.post('/batches/')
|
||||
def create_batch(batch: Batch):
|
||||
if len(batch.items) > MAX_BATCH_SIZE:
|
||||
|
|
|
@ -1,18 +1,26 @@
|
|||
"""
|
||||
Database storing info on URLs
|
||||
"""
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
from logging import getLogger
|
||||
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
|
||||
from mwmbl.hn_top_domains_filtered import DOMAINS
|
||||
# Client has one hour to crawl a URL that has been assigned to them, or it will be reassigned
|
||||
from mwmbl.utils import batch
|
||||
|
||||
REASSIGN_MIN_HOURS = 5
|
||||
BATCH_SIZE = 100
|
||||
MAX_URLS_PER_TOP_DOMAIN = 100
|
||||
MAX_TOP_DOMAINS = 500
|
||||
MAX_OTHER_DOMAINS = 50000
|
||||
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class URLStatus(Enum):
|
||||
|
@ -43,6 +51,8 @@ class URLDatabase:
|
|||
self.connection = connection
|
||||
|
||||
def create_tables(self):
|
||||
logger.info("Creating URL tables")
|
||||
|
||||
sql = """
|
||||
CREATE TABLE IF NOT EXISTS urls (
|
||||
url VARCHAR PRIMARY KEY,
|
||||
|
@ -53,8 +63,19 @@ class URLDatabase:
|
|||
)
|
||||
"""
|
||||
|
||||
index_sql = """
|
||||
CREATE INDEX IF NOT EXISTS host_index
|
||||
ON urls(substring(url FROM '.*://([^/]*)'), score)
|
||||
"""
|
||||
|
||||
view_sql = """
|
||||
CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls
|
||||
"""
|
||||
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(sql)
|
||||
cursor.execute(index_sql)
|
||||
cursor.execute(view_sql)
|
||||
|
||||
def update_found_urls(self, found_urls: list[FoundURL]):
|
||||
if len(found_urls) == 0:
|
||||
|
@ -109,27 +130,64 @@ class URLDatabase:
|
|||
execute_values(cursor, insert_sql, data)
|
||||
|
||||
def get_urls_for_crawling(self, num_urls: int):
|
||||
sql = f"""
|
||||
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
|
||||
WHERE url IN (
|
||||
SELECT url FROM urls
|
||||
WHERE status IN ({URLStatus.NEW.value}) OR (
|
||||
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
|
||||
)
|
||||
ORDER BY score DESC
|
||||
LIMIT %(num_urls)s
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING url
|
||||
start = datetime.utcnow()
|
||||
logger.info("Getting URLs for crawling")
|
||||
|
||||
work_mem = "SET work_mem = '512MB'"
|
||||
|
||||
select_sql = f"""
|
||||
SELECT (array_agg(url order by score desc))[:{MAX_URLS_PER_TOP_DOMAIN}] FROM url_and_hosts
|
||||
WHERE host IN %(domains)s
|
||||
AND status IN ({URLStatus.NEW.value}) OR (
|
||||
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
|
||||
)
|
||||
GROUP BY host
|
||||
"""
|
||||
|
||||
others_sql = f"""
|
||||
SELECT DISTINCT ON (host) url FROM (
|
||||
SELECT * FROM url_and_hosts
|
||||
WHERE status=0
|
||||
ORDER BY score DESC LIMIT {MAX_OTHER_DOMAINS}) u
|
||||
ORDER BY host
|
||||
"""
|
||||
|
||||
update_sql = f"""
|
||||
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
|
||||
WHERE url IN %(urls)s
|
||||
"""
|
||||
|
||||
now = datetime.utcnow()
|
||||
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
|
||||
domains = tuple(random.sample(DOMAINS.keys(), MAX_TOP_DOMAINS))
|
||||
logger.info(f"Getting URLs for domains {domains}")
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls})
|
||||
results = cursor.fetchall()
|
||||
cursor.execute(work_mem)
|
||||
cursor.execute(select_sql,
|
||||
{'min_updated_date': min_updated_date, 'domains': domains})
|
||||
agg_results = cursor.fetchall()
|
||||
|
||||
return [result[0] for result in results]
|
||||
results = []
|
||||
for result in agg_results:
|
||||
results += result[0]
|
||||
logger.info(f"Got {len(results)} top domain results")
|
||||
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(others_sql)
|
||||
other_results = cursor.fetchall()
|
||||
other_results_list = [result[0] for result in other_results]
|
||||
logger.info(f"Got {len(other_results_list)} results from all domains")
|
||||
results += other_results_list
|
||||
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(update_sql,
|
||||
{'now': now, 'urls': tuple(results)})
|
||||
|
||||
total_time_seconds = (datetime.now() - start).total_seconds()
|
||||
logger.info(f"Got {len(results)} in {total_time_seconds} seconds")
|
||||
|
||||
random.shuffle(results)
|
||||
return results
|
||||
|
||||
def get_urls(self, status: URLStatus, num_urls: int):
|
||||
sql = f"""
|
||||
|
|
|
@ -10,11 +10,13 @@ logger = getLogger(__name__)
|
|||
|
||||
|
||||
MAX_QUEUE_SIZE = 5000
|
||||
MIN_QUEUE_SIZE = 1000
|
||||
|
||||
|
||||
def update_url_queue(url_queue: Queue):
|
||||
logger.info("Updating URL queue")
|
||||
current_size = url_queue.qsize()
|
||||
if current_size >= MAX_QUEUE_SIZE:
|
||||
if current_size >= MIN_QUEUE_SIZE:
|
||||
logger.info(f"Skipping queue update, current size {current_size}")
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in a new issue