Optimise URL update

This commit is contained in:
Daoud Clarke 2023-01-22 20:28:18 +00:00
parent 66700f8a3e
commit 77e39b4a89
9 changed files with 86 additions and 108 deletions

26
analyse/update_urls.py Normal file
View file

@ -0,0 +1,26 @@
import os
import pickle
from datetime import datetime
from pathlib import Path
from queue import Queue
from mwmbl.indexer.update_urls import record_urls_in_database
def run_update_urls_on_fixed_batches():
with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "hashed-batches.pickle", "rb") as file:
batches = pickle.load(file)
# print("Batches", batches[:3])
queue = Queue()
start = datetime.now()
record_urls_in_database(batches, queue)
total_time = (datetime.now() - start).total_seconds()
print("Total time:", total_time)
if __name__ == '__main__':
run_update_urls_on_fixed_batches()

Binary file not shown.

View file

@ -2,39 +2,34 @@
Script that updates data in a background process.
"""
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from time import sleep
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
logger = getLogger(__name__)
def run(data_path: str, new_item_queue: Queue):
def run(data_path: str):
logger.info("Started background process")
with Database() as db:
url_db = URLDatabase(db.connection)
url_db.create_tables()
# historical.run()
historical.run()
index_path = Path(data_path) / INDEX_NAME
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
# try:
# batch_cache.retrieve_batches(num_batches=10000)
# except Exception:
# logger.exception("Error retrieving batches")
try:
update_urls.run(batch_cache, new_item_queue)
batch_cache.retrieve_batches(num_batches=10000)
except Exception:
logger.exception("Error updating URLs")
logger.exception("Error retrieving batches")
try:
index_batches.run(batch_cache, index_path)
except Exception:

View file

@ -64,19 +64,19 @@ class URLDatabase:
)
"""
index_sql = """
CREATE INDEX IF NOT EXISTS host_index
ON urls(substring(url FROM '.*://([^/]*)'), score)
"""
view_sql = """
CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls
"""
# index_sql = """
# CREATE INDEX IF NOT EXISTS host_index
# ON urls(substring(url FROM '.*://([^/]*)'), score)
# """
#
# view_sql = """
# CREATE OR REPLACE VIEW url_and_hosts AS SELECT *, substring(url FROM '.*://([^/]*)') AS host FROM urls
# """
with self.connection.cursor() as cursor:
cursor.execute(sql)
cursor.execute(index_sql)
cursor.execute(view_sql)
# cursor.execute(index_sql)
# cursor.execute(view_sql)
def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]:
if len(found_urls) == 0:
@ -138,71 +138,6 @@ class URLDatabase:
updated = [FoundURL(*result) for result in results]
return updated
def get_urls_for_crawling(self):
start = datetime.utcnow()
logger.info("Getting URLs for crawling")
work_mem = "SET work_mem = '512MB'"
select_sql = f"""
SELECT host, (array_agg(url order by score desc))[:{MAX_URLS_PER_TOP_DOMAIN}] FROM url_and_hosts
WHERE host IN %(domains)s
AND (status = {URLStatus.NEW.value} OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
))
GROUP BY host
"""
others_sql = f"""
SELECT DISTINCT ON (host) url FROM (
SELECT * FROM url_and_hosts
WHERE status = {URLStatus.NEW.value} OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
ORDER BY score DESC LIMIT {MAX_OTHER_DOMAINS}) u
ORDER BY host
"""
update_sql = f"""
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
WHERE url IN %(urls)s
"""
now = datetime.utcnow()
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
domain_sample = set(random.sample(DOMAINS.keys(), MAX_TOP_DOMAINS)) | CORE_DOMAINS
domains = tuple(domain_sample)
logger.info(f"Getting URLs for domains {domains}")
with self.connection.cursor() as cursor:
cursor.execute(work_mem)
cursor.execute(select_sql,
{'min_updated_date': min_updated_date, 'domains': domains})
agg_results = cursor.fetchall()
logger.info(f"Agg results: {agg_results}")
results = []
for host, urls in agg_results:
results += urls
logger.info(f"Got {len(results)} top domain results")
with self.connection.cursor() as cursor:
cursor.execute(others_sql, {'min_updated_date': min_updated_date})
other_results = cursor.fetchall()
other_results_list = [result[0] for result in other_results]
logger.info(f"Got {len(other_results_list)} results from all domains")
results += other_results_list
with self.connection.cursor() as cursor:
cursor.execute(update_sql,
{'now': now, 'urls': tuple(results)})
total_time_seconds = (datetime.now() - start).total_seconds()
logger.info(f"Got {len(results)} in {total_time_seconds} seconds")
random.shuffle(results)
return results
def get_urls(self, status: URLStatus, num_urls: int):
sql = f"""
SELECT url FROM urls

View file

@ -51,7 +51,7 @@ class BatchCache:
with Database() as db:
index_db = IndexDatabase(db.connection)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE, num_batches)
print(f"Found {len(batches)} remote batches")
logger.info(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
@ -60,7 +60,7 @@ class BatchCache:
total_processed = 0
for result in results:
total_processed += result
print("Processed batches with items:", total_processed)
logger.info(f"Processed batches with {total_processed} items")
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(self, url):
@ -68,7 +68,7 @@ class BatchCache:
try:
batch = HashedBatch.parse_obj(data)
except ValidationError:
print("Failed to validate batch", data)
logger.info(f"Failed to validate batch {data}")
return 0
if len(batch.items) > 0:
self.store(batch, url)
@ -76,7 +76,7 @@ class BatchCache:
def store(self, batch, url):
path = self.get_path_from_url(url)
print(f"Storing local batch at {path}")
logger.debug(f"Storing local batch at {path}")
os.makedirs(path.parent, exist_ok=True)
with open(path, 'wb') as output_file:
data = gzip.compress(batch.json().encode('utf8'))

View file

@ -1,7 +1,11 @@
import os
import pickle
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from time import sleep
from typing import Iterable, Collection
from urllib.parse import urlparse
@ -13,12 +17,24 @@ from mwmbl.indexer import process_batch
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index_batches import get_url_error_status
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.indexer.paths import BATCH_DIR_NAME
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH, EXTRA_LINK_MULTIPLIER
from mwmbl.utils import get_domain
logger = getLogger(__name__)
def update_urls_continuously(data_path: str, new_item_queue: Queue):
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
run(batch_cache, new_item_queue)
except Exception:
logger.exception("Error updating URLs")
sleep(10)
def run(batch_cache: BatchCache, new_item_queue: Queue):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue)
@ -40,7 +56,7 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
crawled_page_domain = get_domain(item.url)
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
process_link(batch, crawled_page_domain, link, score_multiplier, timestamp, url_scores,
@ -54,7 +70,6 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
# TODO: why does this number not match the new items number below? at all.
logger.info(f"Found URLs, {len(found_urls)}")
urls = url_db.update_found_urls(found_urls)
new_item_queue.put(urls)

View file

@ -11,6 +11,7 @@ from mwmbl import background
from mwmbl.crawler import app as crawler
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
from mwmbl.indexer.update_urls import update_urls_continuously
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
@ -18,7 +19,7 @@ from mwmbl.tinysearchengine.rank import HeuristicRanker
from mwmbl.url_queue import update_queue_continuously
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
@ -51,8 +52,9 @@ def run():
queued_batches = Queue()
if args.background:
Process(target=background.run, args=(args.data, new_item_queue)).start()
Process(target=background.run, args=(args.data,)).start()
Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start()
Process(target=update_urls_continuously, args=(args.data, new_item_queue)).start()
completer = Completer()

View file

@ -1,25 +1,17 @@
import os
import pickle
import random
import re
import time
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from queue import Empty
from time import sleep
from typing import KeysView, Union
from urllib.parse import urlparse
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS
from mwmbl.settings import CORE_DOMAINS
from mwmbl.utils import batch
from mwmbl.utils import batch, get_domain
logger = getLogger(__name__)
@ -33,8 +25,6 @@ MAX_URLS_PER_TOP_DOMAIN = 100
MAX_URLS_PER_OTHER_DOMAIN = 5
MAX_OTHER_DOMAINS = 10000
DOMAIN_REGEX = re.compile(r".*://([^/]*)")
@dataclass
class URLScore:
@ -72,7 +62,7 @@ class URLQueue:
return num_processed
def _process_found_urls(self, found_urls: list[FoundURL]):
logger.info("Processing found URLs")
logger.info(f"Processing found URLs: {found_urls[:1000]}")
# with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "wb") as output_file:
# pickle.dump(found_urls, output_file)
# logger.info("Dumped")
@ -98,7 +88,7 @@ class URLQueue:
def _sort_urls(self, valid_urls: list[FoundURL]):
for found_url in valid_urls:
domain = DOMAIN_REGEX.search(found_url.url)[0]
domain = get_domain(found_url.url)
url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
url_store[domain].append(URLScore(found_url.url, found_url.score))
@ -126,9 +116,13 @@ class URLQueue:
self._queued_batches.put(url_batch, block=False)
@property
def num_queued_batches(self):
def num_queued_batches(self) -> int:
return self._queued_batches.qsize()
@property
def num_top_domains(self) -> int:
return len(self._top_urls)
def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int):
for domain, urls in domain_urls.items():
@ -151,7 +145,8 @@ def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue):
queue.initialize()
while True:
num_processed = queue.update()
logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}")
logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}, num top "
f"domains: {queue.num_top_domains}")
if num_processed == 0:
time.sleep(5)

View file

@ -1,3 +1,8 @@
import re
DOMAIN_REGEX = re.compile(r".*://([^/]*)")
def batch(items: list, batch_size):
"""
Adapted from https://stackoverflow.com/a/8290508
@ -5,3 +10,8 @@ def batch(items: list, batch_size):
length = len(items)
for ndx in range(0, length, batch_size):
yield items[ndx:min(ndx + batch_size, length)]
def get_domain(url):
domain = DOMAIN_REGEX.search(url)[0]
return domain