Tokenize documents and store pages to be added to the index

This commit is contained in:
Daoud Clarke 2022-06-20 22:54:35 +01:00
parent 9594915de1
commit 4330551e0f
4 changed files with 104 additions and 8 deletions

View file

@ -15,6 +15,11 @@ class BatchStatus(Enum):
INDEXED = 2 # The batch has been indexed and the local data has been deleted
class DocumentStatus(Enum):
NEW = 0
PREPROCESSING = 1
@dataclass
class BatchInfo:
url: str
@ -40,13 +45,23 @@ class IndexDatabase:
url VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
extract VARCHAR NOT NULL,
score FLOAT NOT NULL
score FLOAT NOT NULL,
status INT NOT NULL
)
"""
document_pages_sql = """
CREATE TABLE IF NOT EXISTS document_pages (
url VARCHAR NOT NULL,
page INT NOT NULL
)
"""
with self.connection.cursor() as cursor:
cursor.execute(batches_sql)
print("Creating documents table")
cursor.execute(documents_sql)
cursor.execute(document_pages_sql)
def record_batches(self, batch_infos: list[BatchInfo]):
sql = """
@ -69,15 +84,52 @@ class IndexDatabase:
results = cursor.fetchall()
return [BatchInfo(url, user_id_hash, status) for url, user_id_hash, status in results]
def update_batch_status(self, batch_urls: list[str], status: BatchStatus):
sql = """
UPDATE batches SET status = %(status)s
WHERE url IN %(urls)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'status': status.value, 'urls': tuple(batch_urls)})
def queue_documents(self, documents: list[Document]):
sql = """
INSERT INTO documents (url, title, extract, score)
INSERT INTO documents (url, title, extract, score, status)
VALUES %s
ON CONFLICT (url) DO NOTHING
"""
sorted_documents = sorted(documents, key=lambda x: x.url)
data = [(document.url, document.title, document.extract, document.score) for document in sorted_documents]
data = [(document.url, document.title, document.extract, document.score, DocumentStatus.NEW.value)
for document in sorted_documents]
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)
def get_documents_for_preprocessing(self):
sql = f"""
UPDATE documents SET status = {DocumentStatus.PREPROCESSING.value}
WHERE url IN (
SELECT url FROM documents
WHERE status = {DocumentStatus.NEW.value}
LIMIT 100
FOR UPDATE SKIP LOCKED
)
RETURNING url, title, extract, score
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
print("Results", results)
return [Document(title, url, extract, score) for url, title, extract, score in results]
def queue_documents_for_page(self, urls_and_page_indexes: list[tuple[str, int]]):
sql = """
INSERT INTO document_pages (url, page) values %s
"""
print("Queuing", urls_and_page_indexes)
with self.connection.cursor() as cursor:
execute_values(cursor, sql, urls_and_page_indexes)

31
mwmbl/local.py Normal file
View file

@ -0,0 +1,31 @@
"""
Preprocess local documents for indexing.
"""
from time import sleep
import spacy
from mwmbl.database import Database
from mwmbl.indexdb import IndexDatabase
from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run(index_path):
nlp = spacy.load("en_core_web_sm")
while True:
with Database() as db:
index_db = IndexDatabase(db.connection)
documents = index_db.get_documents_for_preprocessing()
print(f"Got {len(documents)} documents")
if len(documents) == 0:
sleep(10)
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
if __name__ == '__main__':
run('data/index.tinysearch')

View file

@ -20,23 +20,33 @@ def retrieve_batches():
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
with Database() as db:
index_db = IndexDatabase(db.connection)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
print("Batches", batches)
urls = [batch.url for batch in batches][:10]
pool = ThreadPool(NUM_THREADS)
results = pool.imap_unordered(retrieve_batch, urls)
for result in results:
print("Result", result)
print("Processed batch with items:", result)
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(url):
data = json.loads(gzip.decompress(requests.get(url).content))
batch = HashedBatch.parse_obj(data)
queue_batch(url, batch)
return len(batch.items)
def queue_batch(batch_url: str, batch: HashedBatch):
# TODO get the score from the URLs database
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.queue_documents(documents)

View file

@ -106,10 +106,10 @@ class TinyIndex(Generic[T]):
self.index_file.close()
def retrieve(self, key: str) -> List[T]:
index = self._get_key_page_index(key)
index = self.get_key_page_index(key)
return self.get_page(index)
def _get_key_page_index(self, key):
def get_key_page_index(self, key) -> int:
key_hash = mmh3.hash(key, signed=False)
return key_hash % self.num_pages
@ -128,7 +128,10 @@ class TinyIndex(Generic[T]):
def index(self, key: str, value: T):
assert type(value) == self.item_factory, f"Can only index the specified type" \
f" ({self.item_factory.__name__})"
page_index = self._get_key_page_index(key)
page_index = self.get_key_page_index(key)
self.add_to_page(page_index, value)
def add_to_page(self, page_index: int, value: T):
current_page = self._get_page_tuples(page_index)
if current_page is None:
current_page = []