Merge pull request #89 from mwmbl/update-urls-queue-quickly

Update urls queue quickly
This commit is contained in:
Daoud Clarke 2023-02-24 21:39:40 +00:00 committed by GitHub
commit 5616626fc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 295 additions and 139 deletions

26
analyse/update_urls.py Normal file
View file

@ -0,0 +1,26 @@
import os
import pickle
from datetime import datetime
from pathlib import Path
from queue import Queue
from mwmbl.indexer.update_urls import record_urls_in_database
def run_update_urls_on_fixed_batches():
with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "hashed-batches.pickle", "rb") as file:
batches = pickle.load(file)
# print("Batches", batches[:3])
queue = Queue()
start = datetime.now()
record_urls_in_database(batches, queue)
total_time = (datetime.now() - start).total_seconds()
print("Total time:", total_time)
if __name__ == '__main__':
run_update_urls_on_fixed_batches()

35
analyse/url_queue.py Normal file
View file

@ -0,0 +1,35 @@
import logging
import os
import pickle
import sys
from datetime import datetime
from pathlib import Path
from queue import Queue
from mwmbl.url_queue import URLQueue
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT)
def run_url_queue():
data = pickle.load(open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "rb"))
print("First URLs", [x.url for x in data[:1000]])
new_item_queue = Queue()
queued_batches = Queue()
queue = URLQueue(new_item_queue, queued_batches)
new_item_queue.put(data)
start = datetime.now()
queue.update()
total_time = (datetime.now() - start).total_seconds()
print(f"Total time: {total_time}")
if __name__ == '__main__':
run_url_queue()

Binary file not shown.

View file

@ -7,7 +7,7 @@ from time import sleep
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
@ -30,10 +30,6 @@ def run(data_path: str):
batch_cache.retrieve_batches(num_batches=10000)
except Exception:
logger.exception("Error retrieving batches")
try:
update_urls.run(batch_cache)
except Exception:
logger.exception("Error updating URLs")
try:
index_batches.run(batch_cache, index_path)
except Exception:

View file

@ -2,8 +2,7 @@ import gzip
import hashlib
import json
from datetime import datetime, timezone, date
from multiprocessing import Queue
from queue import Empty
from queue import Queue, Empty
from typing import Union
from uuid import uuid4
@ -28,6 +27,7 @@ from mwmbl.settings import (
PUBLIC_USER_ID_LENGTH,
FILE_NAME_SUFFIX,
DATE_REGEX)
from mwmbl.url_queue import URLQueue
def get_bucket(name):
@ -45,7 +45,7 @@ def upload(data: bytes, name: str):
last_batch = None
def get_router(batch_cache: BatchCache, url_queue: Queue):
def get_router(batch_cache: BatchCache, queued_batches: Queue):
router = APIRouter(prefix="/crawler", tags=["crawler"])
@router.post('/batches/')
@ -103,7 +103,7 @@ def get_router(batch_cache: BatchCache, url_queue: Queue):
def request_new_batch(batch_request: NewBatchRequest) -> list[str]:
user_id_hash = _get_user_id_hash(batch_request)
try:
urls = url_queue.get(block=False)
urls = queued_batches.get(block=False)
except Empty:
return []

View file

@ -64,23 +64,14 @@ class URLDatabase:
)
"""
index_sql = """
CREATE INDEX IF NOT EXISTS host_index
ON urls(substring(url FROM '.*://([^/]*)'), score)
"""
view_sql = """
CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
cursor.execute(index_sql)
cursor.execute(view_sql)
# cursor.execute(index_sql)
# cursor.execute(view_sql)
def update_found_urls(self, found_urls: list[FoundURL]):
def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]:
if len(found_urls) == 0:
return
return []
get_urls_sql = """
SELECT url FROM urls
@ -104,6 +95,7 @@ class URLDatabase:
updated = CASE
WHEN urls.status > excluded.status THEN urls.updated ELSE excluded.updated
END
RETURNING url, user_id_hash, score, status, updated
"""
input_urls = [x.url for x in found_urls]
@ -111,6 +103,7 @@ class URLDatabase:
with self.connection as connection:
with connection.cursor() as cursor:
logger.info(f"Input URLs: {len(input_urls)}")
cursor.execute(get_urls_sql, {'urls': tuple(input_urls)})
existing_urls = {x[0] for x in cursor.fetchall()}
new_urls = set(input_urls) - existing_urls
@ -119,6 +112,7 @@ class URLDatabase:
locked_urls = {x[0] for x in cursor.fetchall()}
urls_to_insert = new_urls | locked_urls
logger.info(f"URLs to insert: {len(urls_to_insert)}")
if len(urls_to_insert) != len(input_urls):
print(f"Only got {len(urls_to_insert)} instead of {len(input_urls)} - {len(new_urls)} new")
@ -128,72 +122,11 @@ class URLDatabase:
(found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp)
for found_url in sorted_urls if found_url.url in urls_to_insert]
execute_values(cursor, insert_sql, data)
def get_urls_for_crawling(self):
start = datetime.utcnow()
logger.info("Getting URLs for crawling")
work_mem = "SET work_mem = '512MB'"
select_sql = f"""
SELECT host, (array_agg(url order by score desc))[:{MAX_URLS_PER_TOP_DOMAIN}] FROM url_and_hosts
WHERE host IN %(domains)s
AND (status = {URLStatus.NEW.value} OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
))
GROUP BY host
"""
others_sql = f"""
SELECT DISTINCT ON (host) url FROM (
SELECT * FROM url_and_hosts
WHERE status = {URLStatus.NEW.value} OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
ORDER BY score DESC LIMIT {MAX_OTHER_DOMAINS}) u
ORDER BY host
"""
update_sql = f"""
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
WHERE url IN %(urls)s
"""
now = datetime.utcnow()
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
domain_sample = set(random.sample(DOMAINS.keys(), MAX_TOP_DOMAINS)) | CORE_DOMAINS
domains = tuple(domain_sample)
logger.info(f"Getting URLs for domains {domains}")
with self.connection.cursor() as cursor:
cursor.execute(work_mem)
cursor.execute(select_sql,
{'min_updated_date': min_updated_date, 'domains': domains})
agg_results = cursor.fetchall()
logger.info(f"Agg results: {agg_results}")
results = []
for host, urls in agg_results:
results += urls
logger.info(f"Got {len(results)} top domain results")
with self.connection.cursor() as cursor:
cursor.execute(others_sql, {'min_updated_date': min_updated_date})
other_results = cursor.fetchall()
other_results_list = [result[0] for result in other_results]
logger.info(f"Got {len(other_results_list)} results from all domains")
results += other_results_list
with self.connection.cursor() as cursor:
cursor.execute(update_sql,
{'now': now, 'urls': tuple(results)})
total_time_seconds = (datetime.now() - start).total_seconds()
logger.info(f"Got {len(results)} in {total_time_seconds} seconds")
random.shuffle(results)
return results
logger.info(f"Data: {len(data)}")
results = execute_values(cursor, insert_sql, data, fetch=True)
logger.info(f"Results: {len(results)}")
updated = [FoundURL(*result) for result in results]
return updated
def get_urls(self, status: URLStatus, num_urls: int):
sql = f"""

View file

@ -51,7 +51,7 @@ class BatchCache:
with Database() as db:
index_db = IndexDatabase(db.connection)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE, num_batches)
print(f"Found {len(batches)} remote batches")
logger.info(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
@ -60,7 +60,7 @@ class BatchCache:
total_processed = 0
for result in results:
total_processed += result
print("Processed batches with items:", total_processed)
logger.info(f"Processed batches with {total_processed} items")
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(self, url):
@ -68,7 +68,7 @@ class BatchCache:
try:
batch = HashedBatch.parse_obj(data)
except ValidationError:
print("Failed to validate batch", data)
logger.info(f"Failed to validate batch {data}")
return 0
if len(batch.items) > 0:
self.store(batch, url)
@ -76,7 +76,7 @@ class BatchCache:
def store(self, batch, url):
path = self.get_path_from_url(url)
print(f"Storing local batch at {path}")
logger.debug(f"Storing local batch at {path}")
os.makedirs(path.parent, exist_ok=True)
with open(path, 'wb') as output_file:
data = gzip.compress(batch.json().encode('utf8'))

View file

@ -10,7 +10,7 @@ logger = getLogger(__name__)
def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus,
process: Callable[[Collection[HashedBatch]], None]):
process: Callable[[Collection[HashedBatch], ...], None], *args):
with Database() as db:
index_db = IndexDatabase(db.connection)
@ -24,6 +24,10 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
process(batch_data.values())
missing_batches = {batch.url for batch in batches} - batch_data.keys()
logger.info(f"Got {len(missing_batches)} missing batches")
index_db.update_batch_status(list(missing_batches), BatchStatus.REMOTE)
process(batch_data.values(), *args)
index_db.update_batch_status(list(batch_data.keys()), end_status)

View file

@ -1,6 +1,11 @@
import os
import pickle
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from time import sleep
from typing import Iterable, Collection
from urllib.parse import urlparse
@ -12,17 +17,29 @@ from mwmbl.indexer import process_batch
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index_batches import get_url_error_status
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.indexer.paths import BATCH_DIR_NAME
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH, EXTRA_LINK_MULTIPLIER
from mwmbl.utils import get_domain
logger = getLogger(__name__)
def run(batch_cache: BatchCache):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database)
def update_urls_continuously(data_path: str, new_item_queue: Queue):
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
run(batch_cache, new_item_queue)
except Exception:
logger.exception("Error updating URLs")
sleep(10)
def record_urls_in_database(batches: Collection[HashedBatch]):
def run(batch_cache: BatchCache, new_item_queue: Queue):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue)
def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Queue):
logger.info(f"Recording URLs in database for {len(batches)} batches")
with Database() as db:
url_db = URLDatabase(db.connection)
@ -39,7 +56,11 @@ def record_urls_in_database(batches: Collection[HashedBatch]):
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
try:
crawled_page_domain = get_domain(item.url)
except ValueError:
logger.info(f"Couldn't parse URL {item.url}")
continue
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
process_link(batch, crawled_page_domain, link, score_multiplier, timestamp, url_scores,
@ -53,7 +74,10 @@ def record_urls_in_database(batches: Collection[HashedBatch]):
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
logger.info(f"Found URLs, {len(found_urls)}")
urls = url_db.update_found_urls(found_urls)
new_item_queue.put(urls)
logger.info(f"Put {len(urls)} new items in the URL queue")
def process_link(batch, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool):

View file

@ -1,6 +1,5 @@
import argparse
import logging
import os
import sys
from multiprocessing import Process, Queue
from pathlib import Path
@ -8,16 +7,19 @@ from pathlib import Path
import uvicorn
from fastapi import FastAPI
from mwmbl import background, url_queue
from mwmbl import background
from mwmbl.crawler import app as crawler
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
from mwmbl.indexer.update_urls import update_urls_continuously
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
from mwmbl.url_queue import update_queue_continuously
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
@ -47,11 +49,13 @@ def run():
print("Creating a new index")
TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE)
queue = Queue()
new_item_queue = Queue()
queued_batches = Queue()
if args.background:
Process(target=background.run, args=(args.data,)).start()
Process(target=url_queue.run, args=(queue,)).start()
Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start()
Process(target=update_urls_continuously, args=(args.data, new_item_queue)).start()
completer = Completer()
@ -67,7 +71,7 @@ def run():
app.include_router(search_router)
batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME)
crawler_router = crawler.get_router(batch_cache, queue)
crawler_router = crawler.get_router(batch_cache, queued_batches)
app.include_router(crawler_router)
# Initialize uvicorn server using global app instance and server config params

View file

@ -1,48 +1,151 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from logging import getLogger
from multiprocessing import Queue
from time import sleep
from queue import Empty
from typing import KeysView, Union
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS
from mwmbl.database import Database
from mwmbl.utils import batch
from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS
from mwmbl.settings import CORE_DOMAINS
from mwmbl.utils import batch, get_domain
logger = getLogger(__name__)
MAX_QUEUE_SIZE = 5000
MIN_QUEUE_SIZE = 1000
MAX_TOP_URLS = 100000
MAX_OTHER_URLS = 1000
MAX_URLS_PER_CORE_DOMAIN = 1000
MAX_URLS_PER_TOP_DOMAIN = 100
MAX_URLS_PER_OTHER_DOMAIN = 5
MAX_OTHER_DOMAINS = 10000
@dataclass
class URLScore:
url: str
score: float
def run(url_queue: Queue):
initialize_url_queue(url_queue)
class URLQueue:
def __init__(self, new_item_queue: Queue, queued_batches: Queue, min_top_domains: int = 5):
"""
new_item_queue: each item in the queue is a list of FoundURLs
queued_batches: each item in the queue is a list of URLs (strings)
"""
self._new_item_queue = new_item_queue
self._queued_batches = queued_batches
self._other_urls = defaultdict(list)
self._top_urls = defaultdict(list)
self._min_top_domains = min_top_domains
def initialize(self):
with Database() as db:
url_db = URLDatabase(db.connection)
urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE)
self._queue_urls(urls)
logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {self.num_queued_batches}")
def update(self):
num_processed = 0
while True:
try:
new_batch = self._new_item_queue.get_nowait()
num_processed += 1
except Empty:
break
self._process_found_urls(new_batch)
return num_processed
def _process_found_urls(self, found_urls: list[FoundURL]):
min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
logger.info(f"Found URLS: {len(found_urls)}")
valid_urls = [found_url for found_url in found_urls if found_url.status == URLStatus.NEW.value or (
found_url.status == URLStatus.ASSIGNED.value and found_url.timestamp < min_updated_date)]
logger.info(f"Valid URLs: {len(valid_urls)}")
self._sort_urls(valid_urls)
logger.info(f"Queue size: {self.num_queued_batches}")
while self.num_queued_batches < MAX_QUEUE_SIZE and len(self._top_urls) > self._min_top_domains:
total_top_urls = sum(len(urls) for urls in self._top_urls.values())
logger.info(f"Total top URLs stored: {total_top_urls}")
total_other_urls = sum(len(urls) for urls in self._other_urls.values())
logger.info(f"Total other URLs stored: {total_other_urls}")
self._batch_urls()
logger.info(f"Queue size after batching: {self.num_queued_batches}")
def _sort_urls(self, valid_urls: list[FoundURL]):
for found_url in valid_urls:
try:
domain = get_domain(found_url.url)
except ValueError:
continue
url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
url_store[domain].append(URLScore(found_url.url, found_url.score))
logger.info(f"URL store updated: {len(self._top_urls)} top domains, {len(self._other_urls)} other domains")
_sort_and_limit_urls(self._top_urls, MAX_TOP_URLS)
_sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS)
# Keep only the top "other" domains, ranked by the top item for that domain
top_other_urls = sorted(self._other_urls.items(), key=lambda x: x[1][0].score, reverse=True)[:MAX_OTHER_DOMAINS]
self._other_urls = defaultdict(list, dict(top_other_urls))
def _batch_urls(self):
urls = []
logger.info("Adding core domains")
_add_urls(CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_CORE_DOMAIN)
logger.info("Adding top domains")
_add_urls(TOP_DOMAINS.keys() - CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_TOP_DOMAIN)
logger.info("Adding other domains")
_add_urls(self._other_urls.keys(), self._other_urls, urls, MAX_URLS_PER_OTHER_DOMAIN)
self._queue_urls(urls)
def _queue_urls(self, valid_urls: list[str]):
for url_batch in batch(valid_urls, BATCH_SIZE):
self._queued_batches.put(url_batch, block=False)
@property
def num_queued_batches(self) -> int:
return self._queued_batches.qsize()
@property
def num_top_domains(self) -> int:
return len(self._top_urls)
def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int):
for domain, urls in domain_urls.items():
domain_urls[domain] = sorted(urls, key=lambda url_score: url_score.score, reverse=True)[:max_urls]
def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int):
for domain in list(domains & domain_urls.keys()):
new_urls = domain_urls[domain][:max_urls]
urls += [url_score.url for url_score in new_urls]
new_domain_urls = domain_urls[domain][max_urls:]
if len(new_domain_urls) > 0:
domain_urls[domain] = new_domain_urls
else:
del domain_urls[domain]
def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue):
queue = URLQueue(new_item_queue, queued_batches)
queue.initialize()
while True:
update_url_queue(url_queue)
num_processed = queue.update()
logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}, num top "
f"domains: {queue.num_top_domains}")
if num_processed == 0:
time.sleep(5)
def update_url_queue(url_queue: Queue):
logger.info("Updating URL queue")
current_size = url_queue.qsize()
if current_size >= MIN_QUEUE_SIZE:
logger.info(f"Skipping queue update, current size {current_size}, sleeping for 10 seconds")
sleep(10)
return
with Database() as db:
url_db = URLDatabase(db.connection)
urls = url_db.get_urls_for_crawling()
queue_batches(url_queue, urls)
logger.info(f"Queued {len(urls)} urls, current queue size: {url_queue.qsize()}")
def initialize_url_queue(url_queue: Queue):
with Database() as db:
url_db = URLDatabase(db.connection)
urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE)
queue_batches(url_queue, urls)
logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {url_queue.qsize()}")
def queue_batches(url_queue, urls):
for url_batch in batch(urls, BATCH_SIZE):
url_queue.put(url_batch, block=False)

View file

@ -1,3 +1,8 @@
import re
DOMAIN_REGEX = re.compile(r".*://([^/]*)")
def batch(items: list, batch_size):
"""
Adapted from https://stackoverflow.com/a/8290508
@ -5,3 +10,10 @@ def batch(items: list, batch_size):
length = len(items)
for ndx in range(0, length, batch_size):
yield items[ndx:min(ndx + batch_size, length)]
def get_domain(url):
results = DOMAIN_REGEX.match(url)
if results is None or len(results.groups()) == 0:
raise ValueError(f"Unable to parse domain from URL {url}")
return results.group(1)

19
test/test_url_queue.py Normal file
View file

@ -0,0 +1,19 @@
from datetime import datetime
from queue import Queue
from mwmbl.crawler.urls import FoundURL, URLStatus
from mwmbl.url_queue import URLQueue
def test_url_queue_empties():
new_item_queue = Queue()
queued_batches = Queue()
url_queue = URLQueue(new_item_queue, queued_batches, min_top_domains=0)
new_item_queue.put([FoundURL("https://google.com", "123", 10.0, URLStatus.NEW.value, datetime(2023, 1, 19))])
url_queue.update()
items = queued_batches.get(block=False)
assert items == ["https://google.com"]