Split out URL updating from indexing

This commit is contained in:
Daoud Clarke 2022-08-26 22:20:35 +01:00
parent f4fb9f831a
commit cf253ae524
6 changed files with 119 additions and 77 deletions

View file

@ -6,7 +6,7 @@ from multiprocessing import Queue
from pathlib import Path
from time import sleep
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
from mwmbl.url_queue import update_url_queue, initialize_url_queue
@ -28,6 +28,10 @@ def run(data_path: str, url_queue: Queue):
batch_cache.retrieve_batches(num_batches=10000)
except Exception:
logger.exception("Error retrieving batches")
try:
update_urls.run(batch_cache)
except Exception:
logger.exception("Error updating URLs")
try:
index_batches.run(batch_cache, index_path)
except Exception:

View file

@ -2,32 +2,25 @@
Index batches that are stored locally.
"""
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from typing import Iterable
from urllib.parse import urlparse
from typing import Collection, Iterable
import spacy
from mwmbl.indexer import process_batch
from spacy import Language
from mwmbl.crawler.batch import HashedBatch, Item
from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL
from mwmbl.crawler.urls import URLDatabase, URLStatus
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, SCORE_FOR_SAME_DOMAIN, SCORE_FOR_DIFFERENT_DOMAIN, \
SCORE_FOR_ROOT_PATH
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.tinysearchengine.indexer import Document, TinyIndex
logger = getLogger(__name__)
EXCLUDED_DOMAINS = {'web.archive.org'}
def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple[str, str, str]]:
def get_documents_from_batches(batches: Collection[HashedBatch]) -> Iterable[tuple[str, str, str]]:
for batch in batches:
for item in batch.items:
if item.content is not None:
@ -35,28 +28,18 @@ def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple
def run(batch_cache: BatchCache, index_path: str):
nlp = spacy.load("en_core_web_sm")
with Database() as db:
index_db = IndexDatabase(db.connection)
logger.info("Getting local batches")
batches = index_db.get_batches_by_status(BatchStatus.LOCAL, 10000)
logger.info(f"Got {len(batches)} batch urls")
if len(batches) == 0:
return
def process(batches: Collection[HashedBatch]):
with Database() as db:
nlp = spacy.load("en_core_web_sm")
url_db = URLDatabase(db.connection)
index_batches(batches, index_path, nlp, url_db)
logger.info("Indexed pages")
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
record_urls_in_database(batch_data.values())
url_db = URLDatabase(db.connection)
index_batches(batch_data.values(), index_path, nlp, url_db)
logger.info("Indexed pages")
index_db.update_batch_status([batch.url for batch in batches], BatchStatus.INDEXED)
process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process)
def index_batches(batch_data: Iterable[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase):
def index_batches(batch_data: Collection[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase):
document_tuples = list(get_documents_from_batches(batch_data))
urls = [url for title, url, extract in document_tuples]
logger.info(f"Got {len(urls)} document tuples")
@ -108,49 +91,6 @@ def get_url_error_status(item: Item):
return URLStatus.ERROR_OTHER
def record_urls_in_database(batches: Iterable[HashedBatch]):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
url_users = {}
url_timestamps = {}
url_statuses = defaultdict(lambda: URLStatus.NEW)
for batch in batches:
for item in batch.items:
timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0)
url_timestamps[item.url] = timestamp
url_users[item.url] = batch.user_id_hash
if item.content is None:
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
if parsed_link.netloc in EXCLUDED_DOMAINS:
continue
parsed_link = urlparse(link)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
def get_datetime_from_timestamp(timestamp: float) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime
# TODO: clean unicode at some point
def clean_unicode(s: str) -> str:
return s.encode('utf-8', 'ignore').decode('utf-8')

View file

@ -8,9 +8,10 @@ from psycopg2.extras import execute_values
class BatchStatus(Enum):
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 1 # We have a copy of the batch locally in Postgresql
INDEXED = 2
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 10 # We have a copy of the batch locally in Postgresql
URLS_UPDATED = 20 # We've updated URLs from the batch
INDEXED = 30 # The batch has been indexed
@dataclass

View file

@ -0,0 +1,29 @@
from logging import getLogger
from typing import Callable, Collection
from mwmbl.crawler.batch import HashedBatch
from mwmbl.database import Database
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase
logger = getLogger(__name__)
def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus,
process: Callable[[Collection[HashedBatch]], None]):
with Database() as db:
index_db = IndexDatabase(db.connection)
logger.info(f"Getting batches with status {start_status}")
batches = index_db.get_batches_by_status(start_status, 10000)
logger.info(f"Got {len(batches)} batch urls")
if len(batches) == 0:
return
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
process(batch_data.values())
index_db.update_batch_status([batch.url for batch in batches], end_status)

View file

@ -0,0 +1,67 @@
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from typing import Iterable, Collection
from urllib.parse import urlparse
from mwmbl.crawler.batch import HashedBatch
from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer import process_batch
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index_batches import get_url_error_status
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH
logger = getLogger(__name__)
def run(batch_cache: BatchCache):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database)
def record_urls_in_database(batches: Collection[HashedBatch]):
logger.info(f"Recording URLs in database for {len(batches)} batches")
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
url_users = {}
url_timestamps = {}
url_statuses = defaultdict(lambda: URLStatus.NEW)
for batch in batches:
for item in batch.items:
timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0)
url_timestamps[item.url] = timestamp
url_users[item.url] = batch.user_id_hash
if item.content is None:
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
parsed_link = urlparse(link)
if parsed_link.netloc in EXCLUDED_DOMAINS:
continue
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
def get_datetime_from_timestamp(timestamp: float) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime

View file

@ -27,3 +27,4 @@ SCORE_FOR_ROOT_PATH = 0.1
SCORE_FOR_DIFFERENT_DOMAIN = 1.0
SCORE_FOR_SAME_DOMAIN = 0.01
UNKNOWN_DOMAIN_MULTIPLIER = 0.001
EXCLUDED_DOMAINS = {'web.archive.org'}