Correctly insert new URLs

This commit is contained in:
Daoud Clarke 2022-06-29 22:39:21 +01:00
parent 955d650cf4
commit b8c495bda8
9 changed files with 93 additions and 52 deletions

View file

@ -2,9 +2,14 @@
Script that updates data in a background process.
"""
from mwmbl.indexer import historical
from mwmbl.indexer.preprocess import run_preprocessing
from mwmbl.indexer.retrieve import retrieve_batches
from mwmbl.indexer.update_pages import run_update
def run():
# historical.run()
retrieve_batches()
def run(index_path: str):
historical.run()
while True:
retrieve_batches()
run_preprocessing(index_path)
run_update(index_path)

View file

@ -4,7 +4,7 @@ import json
import os
import re
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone, timedelta, date
from typing import Union
from urllib.parse import urlparse
from uuid import uuid4
@ -159,9 +159,7 @@ def record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str,
url_db.update_found_urls(crawled_urls)
# TODO:
# - test this code
# - delete existing crawl data for change from INT to FLOAT
# - load some historical data as a starting point
def get_batches_for_date(date_str):
@ -178,8 +176,10 @@ def get_batches_for_date(date_str):
batches = get_batches_for_prefix(prefix)
result = {'batch_urls': [f'{PUBLIC_URL_PREFIX}{batch}' for batch in sorted(batches)]}
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
if date_str != str(date.today()):
# Don't cache data from today since it may change
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
print(f"Cached batches for {date_str} in {PUBLIC_URL_PREFIX}{cache_filename}")
return result

View file

@ -60,6 +60,11 @@ class URLDatabase:
get_urls_sql = """
SELECT url FROM urls
WHERE url in %(urls)s
"""
lock_urls_sql = """
SELECT url FROM urls
WHERE url in %(urls)s
FOR UPDATE SKIP LOCKED
"""
@ -76,19 +81,27 @@ class URLDatabase:
END
"""
urls_to_insert = [x.url for x in found_urls]
assert len(urls_to_insert) == len(set(urls_to_insert))
input_urls = [x.url for x in found_urls]
assert len(input_urls) == len(set(input_urls))
with self.connection as connection:
with connection.cursor() as cursor:
cursor.execute(get_urls_sql, {'urls': tuple(urls_to_insert)})
locked_urls = {x[0] for x in cursor.fetchall()}
if len(locked_urls) != len(urls_to_insert):
print(f"Only got {len(locked_urls)} instead of {len(urls_to_insert)}")
cursor.execute(get_urls_sql, {'urls': tuple(input_urls)})
existing_urls = {x[0] for x in cursor.fetchall()}
new_urls = set(input_urls) - existing_urls
cursor.execute(lock_urls_sql, {'urls': tuple(input_urls)})
locked_urls = {x[0] for x in cursor.fetchall()}
urls_to_insert = new_urls | locked_urls
if len(urls_to_insert) != len(input_urls):
print(f"Only got {len(locked_urls)} instead of {len(input_urls)}")
sorted_urls = sorted(found_urls, key=lambda x: x.url)
data = [
(found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp)
for found_url in found_urls if found_url.url in locked_urls]
for found_url in sorted_urls if found_url.url in urls_to_insert]
execute_values(cursor, insert_sql, data)

View file

@ -9,6 +9,7 @@ class Database:
def __enter__(self):
self.connection = connect(os.environ["DATABASE_URL"])
self.connection.set_session(autocommit=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):

View file

@ -117,7 +117,7 @@ class IndexDatabase:
WHERE url IN (
SELECT url FROM documents
WHERE status = {DocumentStatus.NEW.value}
LIMIT 100
LIMIT 1000
FOR UPDATE SKIP LOCKED
)
RETURNING url, title, extract, score
@ -149,6 +149,16 @@ class IndexDatabase:
results = cursor.fetchall()
return [Document(title, url, extract, score) for url, title, extract, score in results]
def get_queued_pages(self) -> list[int]:
sql = """
SELECT DISTINCT page FROM document_pages ORDER BY page
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [x[0] for x in results]
def clear_queued_documents_for_page(self, page_index: int):
sql = """
DELETE FROM document_pages

View file

@ -26,15 +26,16 @@ def run_preprocessing(index_path):
nlp = spacy.load("en_core_web_sm")
with Database() as db:
index_db = IndexDatabase(db.connection)
documents = index_db.get_documents_for_preprocessing()
print(f"Got {len(documents)} documents for preprocessing")
if len(documents) == 0:
sleep(10)
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
for i in range(100):
documents = index_db.get_documents_for_preprocessing()
print(f"Got {len(documents)} documents for preprocessing")
if len(documents) == 0:
break
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
if __name__ == '__main__':

View file

@ -8,6 +8,7 @@ from multiprocessing.pool import ThreadPool
from time import sleep
import requests
from pydantic import ValidationError
from mwmbl.crawler.app import create_historical_batch
from mwmbl.crawler.batch import HashedBatch
@ -26,22 +27,32 @@ def retrieve_batches():
with Database() as db:
index_db = IndexDatabase(db.connection)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
print(f"Found {len(batches)} remote batches")
urls = [batch.url for batch in batches]
pool = ThreadPool(NUM_THREADS)
results = pool.imap_unordered(retrieve_batch, urls)
for result in results:
print("Processed batch with items:", result)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
for i in range(100):
batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
print(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
pool = ThreadPool(NUM_THREADS)
results = pool.imap_unordered(retrieve_batch, urls)
for result in results:
if result > 0:
print("Processed batch with items:", result)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(url):
data = json.loads(gzip.decompress(retry_requests.get(url).content))
batch = HashedBatch.parse_obj(data)
print(f"Retrieved batch with {len(batch.items)} items")
create_historical_batch(batch)
queue_batch(batch)
try:
batch = HashedBatch.parse_obj(data)
except ValidationError:
print("Failed to validate batch", data)
raise
if len(batch.items) > 0:
print(f"Retrieved batch with {len(batch.items)} items")
create_historical_batch(batch)
queue_batch(batch)
return len(batch.items)

View file

@ -15,23 +15,23 @@ def run_update(index_path):
index_db.create_tables()
with TinyIndex(Document, index_path, 'w') as indexer:
for i in range(indexer.num_pages):
with Database() as db:
index_db = IndexDatabase(db.connection)
with Database() as db:
index_db = IndexDatabase(db.connection)
pages_to_process = index_db.get_queued_pages()
print(f"Got {len(pages_to_process)} pages to process")
for i in pages_to_process:
documents = index_db.get_queued_documents_for_page(i)
print(f"Documents queued for page {i}: {len(documents)}")
if len(documents) == 0:
continue
for j in range(3):
try:
indexer.add_to_page(i, documents)
break
except ValueError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
if len(documents) > 0:
for j in range(3):
try:
indexer.add_to_page(i, documents)
break
print(f"Not enough space, adding {len(documents)}")
except ValueError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
break
print(f"Not enough space, adding {len(documents)}")
index_db.clear_queued_documents_for_page(i)

View file

@ -41,7 +41,7 @@ def run():
print("Creating a new index")
TinyIndex.create(item_factory=Document, index_path=args.index, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
Process(target=background.run).start()
Process(target=background.run, args=(args.index,)).start()
# Process(target=historical.run).start()
# Process(target=retrieve.run).start()
# Process(target=preprocess.run, args=(args.index,)).start()