Cache batches; start a background process

This commit is contained in:
Daoud Clarke 2022-06-27 23:44:25 +01:00
parent ff2312a5ca
commit 1457cba2c2
5 changed files with 61 additions and 25 deletions

10
mwmbl/background.py Normal file
View file

@ -0,0 +1,10 @@
"""
Script that updates data in a background process.
"""
from mwmbl.indexer import historical
from mwmbl.indexer.retrieve import retrieve_batches
def run():
historical.run()
retrieve_batches()

View file

@ -92,7 +92,7 @@ def create_batch(batch: Batch):
data = gzip.compress(hashed_batch.json().encode('utf8'))
upload(data, filename)
_record_urls_in_database(batch, user_id_hash, now)
record_urls_in_database(batch, user_id_hash, now)
global last_batch
last_batch = hashed_batch
@ -124,10 +124,10 @@ def create_historical_batch(batch: HashedBatch):
"""
user_id_hash = batch.user_id_hash
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=batch.timestamp)
_record_urls_in_database(batch, user_id_hash, batch_datetime)
record_urls_in_database(batch, user_id_hash, batch_datetime)
def _record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str, timestamp: datetime):
def record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str, timestamp: datetime):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
@ -157,12 +157,36 @@ def _record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str
# - load some historical data as a starting point
def get_batches_for_date(date_str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
cache_filename = prefix + 'batches.json.gz'
cache_url = PUBLIC_URL_PREFIX + cache_filename
try:
cached_batches = json.loads(gzip.decompress(requests.get(cache_url).content))
print(f"Got cached batches for {date_str}")
return cached_batches
except gzip.BadGzipFile:
pass
batches = get_batches_for_prefix(prefix)
result = {'batch_urls': [f'{PUBLIC_URL_PREFIX}{batch}' for batch in sorted(batches)]}
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
print(f"Cached batches for {date_str} in {PUBLIC_URL_PREFIX}{cache_filename}")
return result
def get_user_id_hash_from_url(url):
return url.split('/')[9]
@router.get('/batches/{date_str}/users/{public_user_id}')
def get_batches_for_date_and_user(date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
prefix = f'1/{VERSION}/{date_str}/1/{public_user_id}/'
return get_batches_for_prefix(prefix)
return get_batch_ids_for_prefix(prefix)
def check_public_user_id(public_user_id):
@ -197,14 +221,20 @@ def get_batch_id_from_file_name(file_name: str):
return file_name[:-len(FILE_NAME_SUFFIX)]
def get_batch_ids_for_prefix(prefix):
filenames = get_batches_for_prefix(prefix)
filename_endings = sorted(filename.rsplit('/', 1)[1] for filename in filenames)
results = {'batch_ids': [get_batch_id_from_file_name(name) for name in filename_endings]}
return results
def get_batches_for_prefix(prefix):
s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
aws_secret_access_key=APPLICATION_KEY)
bucket = s3.Bucket(BUCKET_NAME)
items = bucket.objects.filter(Prefix=prefix)
file_names = sorted(item.key.rsplit('/', 1)[1] for item in items)
results = {'batch_ids': [get_batch_id_from_file_name(name) for name in file_names]}
return results
filenames = [item.key for item in items]
return filenames
@router.get('/batches/{date_str}/users')

View file

@ -1,15 +1,9 @@
from datetime import date, datetime, timedelta
from datetime import date, timedelta
import spacy
from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_from_id, \
create_historical_batch, get_batch_url
from mwmbl.crawler.batch import HashedBatch
from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_url, \
get_batches_for_date, get_user_id_hash_from_url
from mwmbl.database import Database
from mwmbl.indexer.indexdb import BatchInfo, BatchStatus, IndexDatabase
from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
DAYS = 10
@ -17,17 +11,14 @@ DAYS = 10
def run():
for day in range(DAYS):
date_str = str(date.today() - timedelta(days=day))
users = get_user_id_hashes_for_date(date_str)
print(f"Got {len(users)} for day {date_str}")
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
for user in users:
batches = get_batches_for_date_and_user(date_str, user)
print("Historical batches for user", user, len(batches))
batch_urls = [get_batch_url(batch_id, date_str, user) for batch_id in batches["batch_ids"]]
infos = [BatchInfo(url, user, BatchStatus.REMOTE) for url in batch_urls]
index_db.record_batches(infos)
batches = get_batches_for_date(date_str)
batch_urls = batches['batch_urls']
print("Historical batches for date", date_str, len(batch_urls))
infos = [BatchInfo(url, get_user_id_hash_from_url(url), BatchStatus.REMOTE) for url in batch_urls]
index_db.record_batches(infos)
if __name__ == '__main__':

View file

@ -9,6 +9,7 @@ from time import sleep
import requests
from mwmbl.crawler.app import create_historical_batch
from mwmbl.crawler.batch import HashedBatch
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase, BatchStatus
@ -39,12 +40,14 @@ def retrieve_batch(url):
data = json.loads(gzip.decompress(retry_requests.get(url).content))
batch = HashedBatch.parse_obj(data)
print(f"Retrieved batch with {len(batch.items)} items")
create_historical_batch(batch)
queue_batch(batch)
return len(batch.items)
def queue_batch(batch: HashedBatch):
# TODO get the score from the URLs database
# TODO: get the score from the URLs database
# TODO: also queue documents for batches sent through the API
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:

View file

@ -6,6 +6,7 @@ from multiprocessing import Process
import uvicorn
from fastapi import FastAPI
from mwmbl import background
from mwmbl.indexer import historical, retrieve, preprocess, update_pages
from mwmbl.crawler.app import router as crawler_router
from mwmbl.tinysearchengine import search
@ -40,6 +41,7 @@ def run():
print("Creating a new index")
TinyIndex.create(item_factory=Document, index_path=args.index, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
Process(target=background.run).start()
# Process(target=historical.run).start()
# Process(target=retrieve.run).start()
# Process(target=preprocess.run, args=(args.index,)).start()