Merge pull request #68 from mwmbl/fix-missing-query

Fix missing query
This commit is contained in:
Daoud Clarke 2022-07-19 20:17:20 +01:00 committed by GitHub
commit 27a4784d08
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 20 deletions

View file

@ -1,20 +1,50 @@
from mwmbl.tinysearchengine.indexer import TinyIndex, NUM_PAGES, PAGE_SIZE, Document
import logging
import sys
import spacy
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.paths import INDEX_PATH
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
nlp = spacy.load("en_core_web_sm")
def store():
document = Document(
title='A nation in search of the new black | Theatre | The Guardian',
url='https://www.theguardian.com/stage/2007/nov/18/theatre',
extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…",
score=1.0
)
with TinyIndex(Document, INDEX_PATH, 'w') as tiny_index:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
print("Tokenized", tokenized)
# for token in tokenized.tokens:
#
# tiny_index.index(token, document)
def get_items():
tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE)
items = tiny_index.retrieve('soup')
if items:
for item in items:
print("Items", item)
with TinyIndex(Document, INDEX_PATH) as tiny_index:
items = tiny_index.retrieve('search')
if items:
for item in items:
print("Items", item)
def run():
tiny_index = TinyIndex(Document, INDEX_PATH, NUM_PAGES, PAGE_SIZE)
for i in range(100):
tiny_index.get_page(i)
with TinyIndex(Document, INDEX_PATH) as tiny_index:
for i in range(100000):
page = tiny_index.get_page(i)
for item in page:
if ' search' in item.title:
print("Page", i, item)
if __name__ == '__main__':
run()
# store()
# run()
get_items()

27
analyse/send_batch.py Normal file
View file

@ -0,0 +1,27 @@
"""
Send a batch to a running instance.
"""
import requests
from mwmbl.crawler.batch import Batch, Item, ItemContent
URL = 'http://localhost:5000/crawler/batches/'
def run():
batch = Batch(user_id='test_user_id111111111111111111111111', items=[Item(
url='https://www.theguardian.com/stage/2007/nov/18/theatre',
content=ItemContent(
title='A nation in search of the new black | Theatre | The Guardian',
extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…",
links=[]),
timestamp=123456,
status=200,
)])
result = requests.post(URL, data=batch.json())
print("Result", result.content)
if __name__ == '__main__':
run()

View file

@ -277,7 +277,6 @@ def status():
def queue_batch(batch: HashedBatch):
# TODO: get the score from the URLs database
# TODO: also queue documents for batches sent through the API
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:

View file

@ -2,6 +2,7 @@
Preprocess local documents for indexing.
"""
import traceback
from logging import getLogger
from time import sleep
import spacy
@ -12,6 +13,9 @@ from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logger = getLogger(__name__)
def run(index_path):
while True:
try:
@ -34,7 +38,9 @@ def run_preprocessing(index_path):
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
logger.debug(f"Tokenized: {tokenized}")
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
logger.debug(f"Page indexes: {page_indexes}")
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])

View file

@ -6,7 +6,7 @@ from time import sleep
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PageError
def run_update(index_path):
@ -23,13 +23,14 @@ def run_update(index_path):
documents = index_db.get_queued_documents_for_page(i)
print(f"Documents queued for page {i}: {len(documents)}")
if len(documents) > 0:
for j in range(3):
for j in range(20):
try:
indexer.add_to_page(i, documents)
break
except ValueError:
except PageError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
print("No more space")
break
print(f"Not enough space, adding {len(documents)}")
index_db.clear_queued_documents_for_page(i)

View file

@ -1,6 +1,7 @@
import argparse
import logging
import os
import sys
from multiprocessing import Process
import uvicorn
@ -14,7 +15,7 @@ from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, NUM_PAGES, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
logging.basicConfig()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def setup_args():

View file

@ -2,11 +2,12 @@ import json
import os
from dataclasses import astuple, dataclass, asdict
from io import UnsupportedOperation
from logging import getLogger
from mmap import mmap, PROT_READ, PROT_WRITE
from typing import TypeVar, Generic, Callable, List
import mmh3
from zstandard import ZstdDecompressor, ZstdCompressor
from zstandard import ZstdDecompressor, ZstdCompressor, ZstdError
VERSION = 1
METADATA_CONSTANT = b'mwmbl-tiny-search'
@ -16,6 +17,9 @@ NUM_PAGES = 5_120_000
PAGE_SIZE = 4096
logger = getLogger(__name__)
@dataclass
class Document:
title: str
@ -32,6 +36,10 @@ class TokenizedDocument(Document):
T = TypeVar('T')
class PageError(Exception):
pass
@dataclass
class TinyIndexMetadata:
version: int
@ -64,7 +72,7 @@ def _get_page_data(compressor, page_size, data):
def _pad_to_page_size(data: bytes, page_size: int):
page_length = len(data)
if page_length > page_size:
raise ValueError(f"Data is too big ({page_length}) for page size ({page_size})")
raise PageError(f"Data is too big ({page_length}) for page size ({page_size})")
padding = b'\x00' * (page_size - page_length)
page_data = data + padding
return page_data
@ -92,6 +100,7 @@ class TinyIndex(Generic[T]):
self.page_size = metadata.page_size
self.compressor = ZstdCompressor()
self.decompressor = ZstdDecompressor()
logger.info(f"Loaded index with {self.num_pages} pages and {self.page_size} page size")
self.index_file = None
self.mmap = None
@ -107,13 +116,14 @@ class TinyIndex(Generic[T]):
def retrieve(self, key: str) -> List[T]:
index = self.get_key_page_index(key)
logger.debug(f"Retrieving index {index}")
return self.get_page(index)
def get_key_page_index(self, key) -> int:
key_hash = mmh3.hash(key, signed=False)
return key_hash % self.num_pages
def get_page(self, i):
def get_page(self, i) -> list[T]:
"""
Get the page at index i, decompress and deserialise it using JSON
"""
@ -122,7 +132,12 @@ class TinyIndex(Generic[T]):
def _get_page_tuples(self, i):
page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
decompressed_data = self.decompressor.decompress(page_data)
try:
decompressed_data = self.decompressor.decompress(page_data)
except ZstdError:
logger.exception(f"Error decompressing page data, content: {page_data}")
return []
# logger.debug(f"Decompressed data: {decompressed_data}")
return json.loads(decompressed_data.decode('utf8'))
def index(self, key: str, value: T):
@ -131,7 +146,7 @@ class TinyIndex(Generic[T]):
page_index = self.get_key_page_index(key)
try:
self.add_to_page(page_index, [value])
except ValueError:
except PageError:
pass
def add_to_page(self, page_index: int, values: list[T]):