WIP: index continuously. Retrieve batches and store in Postgres

This commit is contained in:
Daoud Clarke 2022-06-19 23:23:57 +01:00
parent b8b605daed
commit 9594915de1
7 changed files with 200 additions and 51 deletions

View file

@ -1,3 +0,0 @@
"""
Database interface for batches of crawled data.
"""

View file

@ -179,11 +179,19 @@ def check_public_user_id(public_user_id):
@router.get('/batches/{date_str}/users/{public_user_id}/batch/{batch_id}')
def get_batch_from_id(date_str, public_user_id, batch_id):
url = get_batch_url(batch_id, date_str, public_user_id)
data = json.loads(gzip.decompress(requests.get(url).content))
return {
'url': url,
'batch': data,
}
def get_batch_url(batch_id, date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
url = f'{PUBLIC_URL_PREFIX}1/{VERSION}/{date_str}/1/{public_user_id}/{batch_id}{FILE_NAME_SUFFIX}'
data = json.loads(gzip.decompress(requests.get(url).content))
return data
return url
@router.get('/latest-batch', response_model=list[HashedBatch])

View file

@ -1,43 +0,0 @@
from datetime import date, datetime
import spacy
from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_from_id, \
create_historical_batch, HashedBatch
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.paths import INDEX_PATH
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run(index_path):
nlp = spacy.load("en_core_web_sm")
date_str = str(date.today())
users = get_user_id_hashes_for_date(date_str)
print("Users", users)
with TinyIndex(Document, index_path, 'w') as indexer:
for user in users:
batch_ids = get_batches_for_date_and_user(date_str, user)
print("Batches", batch_ids)
for batch_id in batch_ids["batch_ids"]:
start = datetime.now()
batch_dict = get_batch_from_id(date_str, user, batch_id)
get_batch_time = datetime.now()
print("Get batch time", get_batch_time - start)
batch = HashedBatch.parse_obj(batch_dict)
create_historical_batch(batch)
create_historical_time = datetime.now()
print("Create historical time", create_historical_time - get_batch_time)
for item in batch.items:
if item.content is None:
continue
page = tokenize_document(item.url, item.content.title, item.content.extract, 1, nlp)
for token in page.tokens:
indexer.index(token, Document(url=page.url, title=page.title, extract=page.extract, score=page.score))
tokenize_time = datetime.now()
print("Tokenize time", tokenize_time - create_historical_time)
if __name__ == '__main__':
run(INDEX_PATH)

54
mwmbl/historical.py Normal file
View file

@ -0,0 +1,54 @@
from datetime import date, datetime
import spacy
from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_from_id, \
create_historical_batch, HashedBatch, get_batch_url
from mwmbl.database import Database
from mwmbl.indexdb import BatchInfo, BatchStatus, IndexDatabase
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.paths import INDEX_PATH
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run():
date_str = str(date.today())
users = get_user_id_hashes_for_date(date_str)
print("Users", users)
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
for user in users:
batches = get_batches_for_date_and_user(date_str, user)
print("Batches", batches)
batch_urls = [get_batch_url(batch_id, date_str, user) for batch_id in batches["batch_ids"]]
infos = [BatchInfo(url, user, BatchStatus.REMOTE) for url in batch_urls]
index_db.record_batches(infos)
def index_batches(index_path: str):
nlp = spacy.load("en_core_web_sm")
with TinyIndex(Document, index_path, 'w') as indexer:
for batch_id in batch_ids["batch_ids"]:
start = datetime.now()
batch_dict = get_batch_from_id(date_str, user, batch_id)
get_batch_time = datetime.now()
print("Get batch time", get_batch_time - start)
batch = HashedBatch.parse_obj(batch_dict['batch'])
create_historical_batch(batch)
create_historical_time = datetime.now()
print("Create historical time", create_historical_time - get_batch_time)
for item in batch.items:
if item.content is None:
continue
page = tokenize_document(item.url, item.content.title, item.content.extract, 1, nlp)
for token in page.tokens:
indexer.index(token, Document(url=page.url, title=page.title, extract=page.extract, score=page.score))
tokenize_time = datetime.now()
print("Tokenize time", tokenize_time - create_historical_time)
if __name__ == '__main__':
run()

83
mwmbl/indexdb.py Normal file
View file

@ -0,0 +1,83 @@
"""
Database interface for batches of crawled data.
"""
from dataclasses import dataclass
from enum import Enum
from psycopg2.extras import execute_values
from mwmbl.tinysearchengine.indexer import Document
class BatchStatus(Enum):
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 1 # We have a copy of the batch locally in Postgresql
INDEXED = 2 # The batch has been indexed and the local data has been deleted
@dataclass
class BatchInfo:
url: str
user_id_hash: str
status: BatchStatus
class IndexDatabase:
def __init__(self, connection):
self.connection = connection
def create_tables(self):
batches_sql = """
CREATE TABLE IF NOT EXISTS batches (
url VARCHAR PRIMARY KEY,
user_id_hash VARCHAR NOT NULL,
status INT NOT NULL
)
"""
documents_sql = """
CREATE TABLE IF NOT EXISTS documents (
url VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
extract VARCHAR NOT NULL,
score FLOAT NOT NULL
)
"""
with self.connection.cursor() as cursor:
cursor.execute(batches_sql)
cursor.execute(documents_sql)
def record_batches(self, batch_infos: list[BatchInfo]):
sql = """
INSERT INTO batches (url, user_id_hash, status) values %s
ON CONFLICT (url) DO NOTHING
"""
data = [(info.url, info.user_id_hash, info.status.value) for info in batch_infos]
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)
def get_batches_by_status(self, status: BatchStatus) -> list[BatchInfo]:
sql = """
SELECT * FROM batches WHERE status = %(status)s LIMIT 1000
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'status': status.value})
results = cursor.fetchall()
return [BatchInfo(url, user_id_hash, status) for url, user_id_hash, status in results]
def queue_documents(self, documents: list[Document]):
sql = """
INSERT INTO documents (url, title, extract, score)
VALUES %s
ON CONFLICT (url) DO NOTHING
"""
sorted_documents = sorted(documents, key=lambda x: x.url)
data = [(document.url, document.title, document.extract, document.score) for document in sorted_documents]
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)

View file

@ -2,12 +2,10 @@ import argparse
import logging
from multiprocessing import Process
import pandas as pd
import uvicorn
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from mwmbl.crawler import historical
from mwmbl import historical
from mwmbl.crawler.app import router as crawler_router
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer

52
mwmbl/retrieve.py Normal file
View file

@ -0,0 +1,52 @@
"""
Retrieve remote batches and store them in Postgres locally
"""
import gzip
import json
from multiprocessing.pool import ThreadPool
from time import sleep
import requests
from mwmbl.crawler.app import HashedBatch
from mwmbl.database import Database
from mwmbl.indexdb import IndexDatabase, BatchStatus
from mwmbl.tinysearchengine.indexer import Document
NUM_THREADS = 10
def retrieve_batches():
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
print("Batches", batches)
urls = [batch.url for batch in batches][:10]
pool = ThreadPool(NUM_THREADS)
results = pool.imap_unordered(retrieve_batch, urls)
for result in results:
print("Result", result)
def retrieve_batch(url):
data = json.loads(gzip.decompress(requests.get(url).content))
batch = HashedBatch.parse_obj(data)
# TODO get the score from the URLs database
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.queue_documents(documents)
def run():
while True:
retrieve_batches()
sleep(10)
if __name__ == '__main__':
retrieve_batches()