Index batches in memory

This commit is contained in:
Daoud Clarke 2022-07-24 15:44:01 +01:00
parent 1bceeae3df
commit 6209382d76
11 changed files with 217 additions and 402 deletions

View file

@ -5,11 +5,9 @@ from logging import getLogger
from pathlib import Path
from time import sleep
from mwmbl.indexer import historical, index_batches
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import INDEX_PATH, BATCH_DIR_NAME
from mwmbl.indexer.preprocess import run_preprocessing
from mwmbl.indexer.update_pages import run_update
logger = getLogger(__name__)
@ -20,7 +18,7 @@ def run(data_path: str):
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
batch_cache.retrieve_batches(1)
batch_cache.retrieve_batches(1000)
except Exception:
logger.exception("Error retrieving batches")
try:

View file

@ -1,10 +1,8 @@
import gzip
import hashlib
import json
from collections import defaultdict
from datetime import datetime, timezone, timedelta, date
from datetime import datetime, timezone, date
from typing import Union
from urllib.parse import urlparse
from uuid import uuid4
import boto3
@ -12,9 +10,9 @@ import requests
from fastapi import HTTPException, APIRouter
from mwmbl.crawler.batch import Batch, NewBatchRequest, HashedBatch
from mwmbl.crawler.urls import URLDatabase, FoundURL, URLStatus
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.indexdb import IndexDatabase, BatchInfo, BatchStatus
from mwmbl.settings import (
ENDPOINT_URL,
@ -25,18 +23,11 @@ from mwmbl.settings import (
USER_ID_LENGTH,
VERSION,
PUBLIC_URL_PREFIX,
UNKNOWN_DOMAIN_MULTIPLIER,
SCORE_FOR_SAME_DOMAIN,
SCORE_FOR_DIFFERENT_DOMAIN,
SCORE_FOR_ROOT_PATH,
PUBLIC_USER_ID_LENGTH,
FILE_NAME_SUFFIX,
DATE_REGEX)
from mwmbl.tinysearchengine.indexer import Document
router = APIRouter(prefix="/crawler", tags=["crawler"])
def get_bucket(name):
s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
aws_secret_access_key=APPLICATION_KEY)
@ -52,167 +43,118 @@ def upload(data: bytes, name: str):
last_batch = None
@router.on_event("startup")
async def on_startup():
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.create_tables()
def get_router(batch_cache: BatchCache):
router = APIRouter(prefix="/crawler", tags=["crawler"])
@router.on_event("startup")
async def on_startup():
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.create_tables()
@router.post('/batches/')
def create_batch(batch: Batch):
if len(batch.items) > MAX_BATCH_SIZE:
raise HTTPException(400, f"Batch size too large (maximum {MAX_BATCH_SIZE}), got {len(batch.items)}")
@router.post('/batches/')
def create_batch(batch: Batch):
if len(batch.items) > MAX_BATCH_SIZE:
raise HTTPException(400, f"Batch size too large (maximum {MAX_BATCH_SIZE}), got {len(batch.items)}")
if len(batch.user_id) != USER_ID_LENGTH:
raise HTTPException(400, f"User ID length is incorrect, should be {USER_ID_LENGTH} characters")
if len(batch.user_id) != USER_ID_LENGTH:
raise HTTPException(400, f"User ID length is incorrect, should be {USER_ID_LENGTH} characters")
if len(batch.items) == 0:
return {
'status': 'ok',
}
user_id_hash = _get_user_id_hash(batch)
now = datetime.now(timezone.utc)
seconds = (now - datetime(now.year, now.month, now.day, tzinfo=timezone.utc)).seconds
# How to pad a string with zeros: https://stackoverflow.com/a/39402910
# Maximum seconds in a day is 60*60*24 = 86400, so 5 digits is enough
padded_seconds = str(seconds).zfill(5)
# See discussion here: https://stackoverflow.com/a/13484764
uid = str(uuid4())[:8]
filename = f'1/{VERSION}/{now.date()}/1/{user_id_hash}/{padded_seconds}__{uid}.json.gz'
# Using an approach from https://stackoverflow.com/a/30476450
epoch_time = (now - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()
hashed_batch = HashedBatch(user_id_hash=user_id_hash, timestamp=epoch_time, items=batch.items)
data = gzip.compress(hashed_batch.json().encode('utf8'))
upload(data, filename)
global last_batch
last_batch = hashed_batch
batch_url = f'{PUBLIC_URL_PREFIX}{filename}'
batch_cache.store(hashed_batch, batch_url)
# Record the batch as being local so that we don't retrieve it again when the server restarts
infos = [BatchInfo(batch_url, user_id_hash, BatchStatus.LOCAL)]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.record_batches(infos)
if len(batch.items) == 0:
return {
'status': 'ok',
'public_user_id': user_id_hash,
'url': batch_url,
}
user_id_hash = _get_user_id_hash(batch)
@router.post('/batches/new')
def request_new_batch(batch_request: NewBatchRequest):
user_id_hash = _get_user_id_hash(batch_request)
now = datetime.now(timezone.utc)
seconds = (now - datetime(now.year, now.month, now.day, tzinfo=timezone.utc)).seconds
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.get_new_batch_for_user(user_id_hash)
# How to pad a string with zeros: https://stackoverflow.com/a/39402910
# Maximum seconds in a day is 60*60*24 = 86400, so 5 digits is enough
padded_seconds = str(seconds).zfill(5)
@router.get('/batches/{date_str}/users/{public_user_id}')
def get_batches_for_date_and_user(date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
prefix = f'1/{VERSION}/{date_str}/1/{public_user_id}/'
return get_batch_ids_for_prefix(prefix)
# See discussion here: https://stackoverflow.com/a/13484764
uid = str(uuid4())[:8]
filename = f'1/{VERSION}/{now.date()}/1/{user_id_hash}/{padded_seconds}__{uid}.json.gz'
@router.get('/batches/{date_str}/users/{public_user_id}/batch/{batch_id}')
def get_batch_from_id(date_str, public_user_id, batch_id):
url = get_batch_url(batch_id, date_str, public_user_id)
data = json.loads(gzip.decompress(requests.get(url).content))
return {
'url': url,
'batch': data,
}
# Using an approach from https://stackoverflow.com/a/30476450
epoch_time = (now - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()
hashed_batch = HashedBatch(user_id_hash=user_id_hash, timestamp=epoch_time, items=batch.items)
data = gzip.compress(hashed_batch.json().encode('utf8'))
upload(data, filename)
@router.get('/latest-batch', response_model=list[HashedBatch])
def get_latest_batch():
return [] if last_batch is None else [last_batch]
record_urls_in_database(batch, user_id_hash, now)
queue_batch(hashed_batch)
@router.get('/batches/{date_str}/users')
def get_user_id_hashes_for_date(date_str: str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
return get_subfolders(prefix)
global last_batch
last_batch = hashed_batch
@router.get('/')
def status():
return {
'status': 'ok'
}
# Record the batch as being local so that we don't retrieve it again when the server restarts
batch_url = f'{PUBLIC_URL_PREFIX}{filename}'
infos = [BatchInfo(batch_url, user_id_hash, BatchStatus.LOCAL)]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.record_batches(infos)
return {
'status': 'ok',
'public_user_id': user_id_hash,
'url': batch_url,
}
return router
def _get_user_id_hash(batch: Union[Batch, NewBatchRequest]):
return hashlib.sha3_256(batch.user_id.encode('utf8')).hexdigest()
@router.post('/batches/new')
def request_new_batch(batch_request: NewBatchRequest):
user_id_hash = _get_user_id_hash(batch_request)
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.get_new_batch_for_user(user_id_hash)
@router.post('/batches/historical')
def create_historical_batch(batch: HashedBatch):
"""
Update the database state of URL crawling for old data
"""
user_id_hash = batch.user_id_hash
batch_datetime = get_datetime_from_timestamp(batch.timestamp)
record_urls_in_database(batch, user_id_hash, batch_datetime)
def get_datetime_from_timestamp(timestamp: int) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime
def record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str, timestamp: datetime):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
for item in batch.items:
if item.content is not None:
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
parsed_link = urlparse(link)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
found_urls = [FoundURL(url, user_id_hash, score, URLStatus.NEW, timestamp) for url, score in url_scores.items()]
if len(found_urls) > 0:
url_db.update_found_urls(found_urls)
crawled_urls = [FoundURL(item.url, user_id_hash, 0.0, URLStatus.CRAWLED, timestamp)
for item in batch.items]
url_db.update_found_urls(crawled_urls)
def get_batches_for_date(date_str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
cache_filename = prefix + 'batches.json.gz'
cache_url = PUBLIC_URL_PREFIX + cache_filename
try:
cached_batches = json.loads(gzip.decompress(requests.get(cache_url).content))
print(f"Got cached batches for {date_str}")
return cached_batches
except gzip.BadGzipFile:
pass
batches = get_batches_for_prefix(prefix)
result = {'batch_urls': [f'{PUBLIC_URL_PREFIX}{batch}' for batch in sorted(batches)]}
if date_str != str(date.today()):
# Don't cache data from today since it may change
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
print(f"Cached batches for {date_str} in {PUBLIC_URL_PREFIX}{cache_filename}")
return result
def get_user_id_hash_from_url(url):
return url.split('/')[9]
@router.get('/batches/{date_str}/users/{public_user_id}')
def get_batches_for_date_and_user(date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
prefix = f'1/{VERSION}/{date_str}/1/{public_user_id}/'
return get_batch_ids_for_prefix(prefix)
def check_public_user_id(public_user_id):
if len(public_user_id) != PUBLIC_USER_ID_LENGTH:
raise HTTPException(400, f"Incorrect public user ID length, should be {PUBLIC_USER_ID_LENGTH}")
@router.get('/batches/{date_str}/users/{public_user_id}/batch/{batch_id}')
def get_batch_from_id(date_str, public_user_id, batch_id):
url = get_batch_url(batch_id, date_str, public_user_id)
data = json.loads(gzip.decompress(requests.get(url).content))
return {
'url': url,
'batch': data,
}
def get_batch_url(batch_id, date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
@ -220,11 +162,6 @@ def get_batch_url(batch_id, date_str, public_user_id):
return url
@router.get('/latest-batch', response_model=list[HashedBatch])
def get_latest_batch():
return [] if last_batch is None else [last_batch]
def get_batch_id_from_file_name(file_name: str):
assert file_name.endswith(FILE_NAME_SUFFIX)
return file_name[:-len(FILE_NAME_SUFFIX)]
@ -246,13 +183,6 @@ def get_batches_for_prefix(prefix):
return filenames
@router.get('/batches/{date_str}/users')
def get_user_id_hashes_for_date(date_str: str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
return get_subfolders(prefix)
def check_date_str(date_str):
if not DATE_REGEX.match(date_str):
raise HTTPException(400, f"Incorrect date format, should be YYYY-MM-DD")
@ -268,17 +198,23 @@ def get_subfolders(prefix):
return item_keys
@router.get('/')
def status():
return {
'status': 'ok'
}
def get_batches_for_date(date_str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
cache_filename = prefix + 'batches.json.gz'
cache_url = PUBLIC_URL_PREFIX + cache_filename
try:
cached_batches = json.loads(gzip.decompress(requests.get(cache_url).content))
print(f"Got cached batches for {date_str}")
return cached_batches
except gzip.BadGzipFile:
pass
def queue_batch(batch: HashedBatch):
# TODO: get the score from the URLs database
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.queue_documents(documents)
batches = get_batches_for_prefix(prefix)
result = {'batch_urls': [f'{PUBLIC_URL_PREFIX}{batch}' for batch in sorted(batches)]}
if date_str != str(date.today()):
# Don't cache data from today since it may change
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
print(f"Cached batches for {date_str} in {PUBLIC_URL_PREFIX}{cache_filename}")
return result

View file

@ -21,9 +21,13 @@ class URLStatus(Enum):
"""
URL state update is idempotent and can only progress forwards.
"""
NEW = 0 # One user has identified this URL
ASSIGNED = 2 # The crawler has given the URL to a user to crawl
CRAWLED = 3 # At least one user has crawled the URL
NEW = 0 # One user has identified this URL
ASSIGNED = 10 # The crawler has given the URL to a user to crawl
ERROR_TIMEOUT = 20 # Timeout while retrieving
ERROR_404 = 30 # 404 response
ERROR_OTHER = 40 # Some other error
ERROR_ROBOTS_DENIED = 50 # Robots disallow this page
CRAWLED = 100 # At least one user has crawled the URL
@dataclass

View file

@ -35,26 +35,24 @@ class BatchCache:
batches[url] = batch
return batches
def retrieve_batches(self, num_thousand_batches=10):
def retrieve_batches(self, num_batches=1000):
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
with Database() as db:
index_db = IndexDatabase(db.connection)
for i in range(num_thousand_batches):
batches = index_db.get_batches_by_status(BatchStatus.REMOTE, 100)
print(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
pool = ThreadPool(self.num_threads)
results = pool.imap_unordered(self.retrieve_batch, urls)
for result in results:
if result > 0:
print("Processed batch with items:", result)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE, num_batches)
print(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
pool = ThreadPool(self.num_threads)
results = pool.imap_unordered(self.retrieve_batch, urls)
for result in results:
if result > 0:
print("Processed batch with items:", result)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(self, url):
data = json.loads(gzip.decompress(retry_requests.get(url).content))

View file

@ -1,6 +1,6 @@
from datetime import date, timedelta
from mwmbl.crawler.app import get_batches_for_date, get_user_id_hash_from_url
from mwmbl.crawler.app import get_batches_for_date
from mwmbl.database import Database
from mwmbl.indexer.indexdb import BatchInfo, BatchStatus, IndexDatabase
@ -20,5 +20,9 @@ def run():
index_db.record_batches(infos)
def get_user_id_hash_from_url(url):
return url.split('/')[9]
if __name__ == '__main__':
run()

View file

@ -2,17 +2,22 @@
Index batches that are stored locally.
"""
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from typing import Iterable
from urllib.parse import urlparse
import spacy
from mwmbl.crawler.batch import HashedBatch
from mwmbl.crawler.urls import URLDatabase
from mwmbl.crawler.batch import HashedBatch, Item
from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, SCORE_FOR_SAME_DOMAIN, SCORE_FOR_DIFFERENT_DOMAIN, \
SCORE_FOR_ROOT_PATH
from mwmbl.tinysearchengine.indexer import Document, TinyIndex
logger = getLogger(__name__)
@ -31,23 +36,31 @@ def run(batch_cache: BatchCache, index_path: str):
index_db = IndexDatabase(db.connection)
logger.info("Getting local batches")
batches = index_db.get_batches_by_status(BatchStatus.LOCAL)
batches = index_db.get_batches_by_status(BatchStatus.LOCAL, 1000)
logger.info(f"Got {len(batches)} batch urls")
if len(batches) == 0:
return
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
record_urls_in_database(batch_data.values())
document_tuples = list(get_documents_from_batches(batch_data.values()))
urls = [url for title, url, extract in document_tuples]
print(f"Got {len(urls)} document tuples")
logger.info(f"Got {len(urls)} document tuples")
url_db = URLDatabase(db.connection)
url_scores = url_db.get_url_scores(urls)
print(f"Got {len(url_scores)} scores")
logger.info(f"Got {len(url_scores)} scores")
documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples]
page_documents = preprocess_documents(documents, index_path, nlp)
index_pages(index_path, page_documents)
logger.info("Indexed pages")
index_db.update_batch_status([batch.url for batch in batches], BatchStatus.INDEXED)
def index_pages(index_path, page_documents):
@ -79,3 +92,59 @@ def preprocess_documents(documents, index_path, nlp):
page_documents[page].append(document)
print(f"Preprocessed for {len(page_documents)} pages")
return page_documents
def get_url_error_status(item: Item):
if item.status == 404:
return URLStatus.ERROR_404
if item.error is not None:
if item.error.name == 'AbortError':
return URLStatus.ERROR_TIMEOUT
elif item.error.name == 'RobotsDenied':
return URLStatus.ERROR_ROBOTS_DENIED
return URLStatus.ERROR_OTHER
def record_urls_in_database(batches: Iterable[HashedBatch]):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
url_users = {}
url_timestamps = {}
url_statuses = defaultdict(lambda: URLStatus.NEW)
for batch in batches:
for item in batch.items:
timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0)
url_timestamps[item.url] = timestamp
url_users[item.url] = batch.user_id_hash
if item.content is None:
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
parsed_link = urlparse(link)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
def get_datetime_from_timestamp(timestamp: float) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime
# TODO: clean unicode at some point
def clean_unicode(s: str) -> str:
return s.encode('utf-8', 'ignore').decode('utf-8')

View file

@ -6,12 +6,11 @@ from enum import Enum
from psycopg2.extras import execute_values
from mwmbl.tinysearchengine.indexer import Document
class BatchStatus(Enum):
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 1 # We have a copy of the batch locally in Postgresql
INDEXED = 2
class DocumentStatus(Enum):
@ -95,89 +94,3 @@ class IndexDatabase:
with self.connection.cursor() as cursor:
cursor.execute(sql, {'status': status.value, 'urls': tuple(batch_urls)})
def queue_documents(self, documents: list[Document]):
sql = """
INSERT INTO documents (url, title, extract, score, status)
VALUES %s
ON CONFLICT (url) DO NOTHING
"""
sorted_documents = sorted(documents, key=lambda x: x.url)
data = [(document.url, clean_unicode(document.title), clean_unicode(document.extract),
document.score, DocumentStatus.NEW.value)
for document in sorted_documents]
print("Queueing documents", len(data))
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)
def get_documents_for_preprocessing(self):
sql = f"""
UPDATE documents SET status = {DocumentStatus.PREPROCESSING.value}
WHERE url IN (
SELECT url FROM documents
WHERE status = {DocumentStatus.NEW.value}
LIMIT 1000
FOR UPDATE SKIP LOCKED
)
RETURNING url, title, extract, score
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [Document(title, url, extract, score) for url, title, extract, score in results]
def clear_documents_for_preprocessing(self) -> int:
sql = f"""
DELETE FROM documents WHERE status = {DocumentStatus.PREPROCESSING.value}
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
return cursor.rowcount
def queue_documents_for_page(self, urls_and_page_indexes: list[tuple[str, int]]):
sql = """
INSERT INTO document_pages (url, page) values %s
"""
print(f"Queuing {len(urls_and_page_indexes)} urls and page indexes")
with self.connection.cursor() as cursor:
execute_values(cursor, sql, urls_and_page_indexes)
def get_queued_documents_for_page(self, page_index: int) -> list[Document]:
sql = """
SELECT d.url, title, extract, score
FROM document_pages p INNER JOIN documents d ON p.url = d.url
WHERE p.page = %(page_index)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'page_index': page_index})
results = cursor.fetchall()
return [Document(title, url, extract, score) for url, title, extract, score in results]
def get_queued_pages(self) -> list[int]:
sql = """
SELECT DISTINCT page FROM document_pages ORDER BY page
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [x[0] for x in results]
def clear_queued_documents_for_page(self, page_index: int):
sql = """
DELETE FROM document_pages
WHERE page = %(page_index)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'page_index': page_index})
def clean_unicode(s: str) -> str:
return s.encode('utf-8', 'ignore').decode('utf-8')

View file

@ -1,48 +0,0 @@
"""
Preprocess local documents for indexing.
"""
import traceback
from logging import getLogger
from time import sleep
import spacy
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logger = getLogger(__name__)
def run(index_path):
while True:
try:
run_preprocessing(index_path)
except Exception as e:
print("Exception preprocessing")
traceback.print_exception(type(e), e, e.__traceback__)
sleep(10)
def run_preprocessing(index_path):
nlp = spacy.load("en_core_web_sm")
with Database() as db:
index_db = IndexDatabase(db.connection)
for i in range(100):
documents = index_db.get_documents_for_preprocessing()
print(f"Got {len(documents)} documents for preprocessing")
if len(documents) == 0:
break
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
logger.debug(f"Tokenized: {tokenized}")
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
logger.debug(f"Page indexes: {page_indexes}")
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
if __name__ == '__main__':
run('data/index.tinysearch')

View file

@ -1,54 +0,0 @@
"""
Iterate over each page in the index and update it based on what is in the index database.
"""
import traceback
from time import sleep
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PageError
def run_update(index_path):
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
with TinyIndex(Document, index_path, 'w') as indexer:
with Database() as db:
index_db = IndexDatabase(db.connection)
pages_to_process = index_db.get_queued_pages()
print(f"Got {len(pages_to_process)} pages to process")
for i in pages_to_process:
documents = index_db.get_queued_documents_for_page(i)
print(f"Documents queued for page {i}: {len(documents)}")
if len(documents) > 0:
for j in range(20):
try:
indexer.add_to_page(i, documents)
break
except PageError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
print("No more space")
break
print(f"Not enough space, adding {len(documents)}")
index_db.clear_queued_documents_for_page(i)
# All preprocessed documents should now have been indexed
# Clear documents that have now been preprocessed and indexed
num_cleared = index_db.clear_documents_for_preprocessing()
print(f"Indexed {num_cleared} documents")
def run(index_path):
while True:
try:
run_update(index_path)
except Exception as e:
print("Exception updating pages in index")
traceback.print_exception(type(e), e, e.__traceback__)
sleep(10)
if __name__ == '__main__':
run_update('data/index.tinysearch')

View file

@ -9,14 +9,15 @@ import uvicorn
from fastapi import FastAPI
from mwmbl import background
from mwmbl.crawler.app import router as crawler_router
from mwmbl.indexer.paths import INDEX_NAME
from mwmbl.crawler import app as crawler
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, NUM_PAGES, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def setup_args():
@ -56,6 +57,9 @@ def run():
search_router = search.create_router(ranker)
app.include_router(search_router)
batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME)
crawler_router = crawler.get_router(batch_cache)
app.include_router(crawler_router)
# Initialize uvicorn server using global app instance and server config params

View file

@ -1,13 +1,4 @@
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase, clean_unicode
from mwmbl.tinysearchengine.indexer import Document
def test_bad_unicode_encoding():
bad_doc = Document('Good title', 'https://goodurl.com', 'Bad extract text \ud83c', 1.0)
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.queue_documents([bad_doc])
from mwmbl.indexer.index_batches import clean_unicode
def test_clean_unicode():