Merge pull request #127 from mwmbl/add-term-info-to-index

Add term info to index
This commit is contained in:
Daoud Clarke 2023-11-18 18:56:53 +00:00 committed by GitHub
commit 36df016445
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 125 additions and 141 deletions

51
analyse/add_term_info.py Normal file
View file

@ -0,0 +1,51 @@
"""
Investigate adding term information to the database.
How much extra space will it take?
"""
import os
from pathlib import Path
from random import Random
import numpy as np
from scipy.stats import sem
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, _trim_items_to_page, astuple
from zstandard import ZstdCompressor
from mwmbl.utils import add_term_info
random = Random(1)
INDEX_PATH = Path(__file__).parent.parent / "devdata" / "index-v2.tinysearch"
def run():
compressor = ZstdCompressor()
with TinyIndex(Document, INDEX_PATH) as index:
# Get some random integers between 0 and index.num_pages:
pages = random.sample(range(index.num_pages), 10000)
old_sizes = []
new_sizes = []
for i in pages:
page = index.get_page(i)
term_documents = []
for document in page:
term_document = add_term_info(document, index, i)
term_documents.append(term_document)
value_tuples = [astuple(value) for value in term_documents]
num_fitting, compressed = _trim_items_to_page(compressor, index.page_size, value_tuples)
new_sizes.append(num_fitting)
old_sizes.append(len(page))
print("Old sizes mean", np.mean(old_sizes), sem(old_sizes))
print("New sizes mean", np.mean(new_sizes), sem(new_sizes))
if __name__ == '__main__':
run()

View file

@ -1,57 +0,0 @@
"""
Index batches stored locally on the filesystem for the purpose of evaluation.
"""
import glob
import gzip
import json
import logging
import os
import sys
from datetime import datetime
import spacy
from mwmbl.crawler import HashedBatch
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer import index_batches
from mwmbl.tinysearchengine import TinyIndex, Document
LOCAL_BATCHES_PATH = f'{os.environ["HOME"]}/data/mwmbl/file/**/*.json.gz'
NUM_BATCHES = 10000
EVALUATE_INDEX_PATH = f'{os.environ["HOME"]}/data/mwmbl/evaluate-index.tinysearch'
NUM_PAGES = 1_024_000
PAGE_SIZE = 4096
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def get_batches():
for path in sorted(glob.glob(LOCAL_BATCHES_PATH, recursive=True))[:NUM_BATCHES]:
data = json.load(gzip.open(path))
yield HashedBatch.parse_obj(data)
def run():
try:
os.remove(EVALUATE_INDEX_PATH)
except FileNotFoundError:
pass
TinyIndex.create(item_factory=Document, index_path=EVALUATE_INDEX_PATH, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
batches = get_batches()
start = datetime.now()
with Database() as db:
nlp = spacy.load("en_core_web_sm")
url_db = URLDatabase(db.connection)
index_batches(batches, EVALUATE_INDEX_PATH, nlp, url_db)
end = datetime.now()
total_time = (end - start).total_seconds()
print("total_seconds:", total_time)
if __name__ == '__main__':
run()

View file

@ -1,60 +0,0 @@
import logging
import sys
import numpy as np
import spacy
from analyse.index_local import EVALUATE_INDEX_PATH
from mwmbl.indexer import tokenize_document
from mwmbl.indexer import INDEX_PATH
from mwmbl.tinysearchengine import TinyIndex, Document
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
nlp = spacy.load("en_core_web_sm")
def store():
document = Document(
title='A nation in search of the new black | Theatre | The Guardian',
url='https://www.theguardian.com/stage/2007/nov/18/theatre',
extract="Topic-stuffed and talk-filled, Kwame Kwei-Armah's new play proves that issue-driven drama is (despite reports of its death) still being written and staged…",
score=1.0
)
with TinyIndex(Document, INDEX_PATH, 'w') as tiny_index:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
print("Tokenized", tokenized)
# for token in tokenized.tokens:
#
# tiny_index.index(token, document)
def get_items():
with TinyIndex(Document, INDEX_PATH) as tiny_index:
items = tiny_index.retrieve('wikipedia')
if items:
for item in items:
print("Items", item)
def run(index_path):
with TinyIndex(Document, index_path) as tiny_index:
sizes = {}
for i in range(tiny_index.num_pages):
page = tiny_index.get_page(i)
if page:
sizes[i] = len(page)
if len(page) > 50:
print("Page", len(page), page)
# for item in page:
# if ' search' in item.title:
# print("Page", i, item)
print("Max", max(sizes.values()))
print("Top", sorted(sizes.values())[-100:])
print("Mean", np.mean(list(sizes.values())))
if __name__ == '__main__':
# store()
run(EVALUATE_INDEX_PATH)
# get_items()

Binary file not shown.

View file

@ -6,6 +6,10 @@ from pathlib import Path
from django.apps import AppConfig from django.apps import AppConfig
from django.conf import settings from django.conf import settings
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
class MwmblConfig(AppConfig): class MwmblConfig(AppConfig):
name = "mwmbl" name = "mwmbl"
@ -31,6 +35,12 @@ class MwmblConfig(AppConfig):
TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=settings.NUM_PAGES, TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=settings.NUM_PAGES,
page_size=PAGE_SIZE) page_size=PAGE_SIZE)
with Database() as db:
url_db = URLDatabase(db.connection)
url_db.create_tables()
index_db = IndexDatabase(db.connection)
index_db.create_tables()
if settings.RUN_BACKGROUND_PROCESSES: if settings.RUN_BACKGROUND_PROCESSES:
new_item_queue = Queue() new_item_queue = Queue()
Process(target=background.run, args=(settings.DATA_PATH,)).start() Process(target=background.run, args=(settings.DATA_PATH,)).start()

View file

@ -1,7 +1,9 @@
""" """
Script that updates data in a background process. Script that updates data in a background process.
""" """
from logging import getLogger import logging
import sys
from logging import getLogger, basicConfig
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
@ -11,6 +13,8 @@ from mwmbl.indexer import index_batches, historical
from mwmbl.indexer.batch_cache import BatchCache from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
basicConfig(stream=sys.stdout, level=logging.INFO)
logger = getLogger(__name__) logger = getLogger(__name__)

View file

@ -49,7 +49,7 @@ def prepare_url_for_tokenizing(url: str):
def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedDocument]: def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedDocument]:
for i, (title_cleaned, url, extract) in enumerate(titles_urls_and_extracts): for i, (title_cleaned, url, extract) in enumerate(titles_urls_and_extracts):
score = link_counts.get(url, DEFAULT_SCORE) score = link_counts.get(url, DEFAULT_SCORE)
yield tokenize_document(url, title_cleaned, extract, score, nlp) yield tokenize_document(url, title_cleaned, extract, score)
if i % 1000 == 0: if i % 1000 == 0:
print("Processed", i) print("Processed", i)
@ -61,7 +61,7 @@ def get_index_tokens(tokens):
return set(first_tokens + bigrams) return set(first_tokens + bigrams)
def tokenize_document(url, title_cleaned, extract, score, nlp): def tokenize_document(url, title_cleaned, extract, score):
title_tokens = tokenize(title_cleaned) title_tokens = tokenize(title_cleaned)
prepared_url = prepare_url_for_tokenizing(unquote(url)) prepared_url = prepare_url_for_tokenizing(unquote(url))
url_tokens = tokenize(prepared_url) url_tokens = tokenize(prepared_url)

View file

@ -16,6 +16,7 @@ from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index import tokenize_document from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.indexdb import BatchStatus from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.tinysearchengine.indexer import Document, TinyIndex from mwmbl.tinysearchengine.indexer import Document, TinyIndex
from mwmbl.utils import add_term_info, add_term_infos
logger = getLogger(__name__) logger = getLogger(__name__)
@ -31,22 +32,20 @@ def run(batch_cache: BatchCache, index_path: str):
def process(batches: Collection[HashedBatch]): def process(batches: Collection[HashedBatch]):
with Database() as db: with Database() as db:
nlp = spacy.load("en_core_web_sm")
url_db = URLDatabase(db.connection) url_db = URLDatabase(db.connection)
index_batches(batches, index_path, nlp, url_db) index_batches(batches, index_path, url_db)
logger.info("Indexed pages") logger.info("Indexed pages")
process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process) process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process)
def index_batches(batch_data: Collection[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase): def index_batches(batch_data: Collection[HashedBatch], index_path: str, url_db: URLDatabase):
document_tuples = list(get_documents_from_batches(batch_data)) document_tuples = list(get_documents_from_batches(batch_data))
urls = [url for title, url, extract in document_tuples] urls = [url for title, url, extract in document_tuples]
logger.info(f"Got {len(urls)} document tuples")
url_scores = url_db.get_url_scores(urls) url_scores = url_db.get_url_scores(urls)
logger.info(f"Got {len(url_scores)} scores") logger.info(f"Indexing {len(urls)} document tuples and {len(url_scores)} URL scores")
documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples] documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples]
page_documents = preprocess_documents(documents, index_path, nlp) page_documents = preprocess_documents(documents, index_path)
index_pages(index_path, page_documents) index_pages(index_path, page_documents)
@ -58,24 +57,27 @@ def index_pages(index_path, page_documents):
seen_urls = set() seen_urls = set()
seen_titles = set() seen_titles = set()
sorted_documents = sorted(documents + existing_documents, key=lambda x: x.score, reverse=True) sorted_documents = sorted(documents + existing_documents, key=lambda x: x.score, reverse=True)
for document in sorted_documents: # TODO: for now we add the term here, until all the documents in the index have terms
sorted_documents_with_terms = add_term_infos(sorted_documents, indexer, page)
for document in sorted_documents_with_terms:
if document.title in seen_titles or document.url in seen_urls: if document.title in seen_titles or document.url in seen_urls:
continue continue
new_documents.append(document) new_documents.append(document)
seen_urls.add(document.url) seen_urls.add(document.url)
seen_titles.add(document.title) seen_titles.add(document.title)
logger.info(f"Storing {len(new_documents)} documents for page {page}, originally {len(existing_documents)}")
indexer.store_in_page(page, new_documents) indexer.store_in_page(page, new_documents)
def preprocess_documents(documents, index_path, nlp): def preprocess_documents(documents, index_path):
page_documents = defaultdict(list) page_documents = defaultdict(list)
with TinyIndex(Document, index_path, 'w') as indexer: with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents: for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, document.score, nlp) tokenized = tokenize_document(document.url, document.title, document.extract, document.score)
# logger.debug(f"Tokenized: {tokenized}") for token in tokenized.tokens:
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens] page = indexer.get_key_page_index(token)
for page in page_indexes: term_document = Document(document.title, document.url, document.extract, document.score, token)
page_documents[page].append(document) page_documents[page].append(term_document)
print(f"Preprocessed for {len(page_documents)} pages") print(f"Preprocessed for {len(page_documents)} pages")
return page_documents return page_documents

View file

@ -86,7 +86,7 @@ def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Qu
def process_link(user_id_hash, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool, blacklist_domains): def process_link(user_id_hash, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool, blacklist_domains):
parsed_link = urlparse(link) parsed_link = urlparse(link)
if is_domain_blacklisted(parsed_link.netloc, blacklist_domains): if is_domain_blacklisted(parsed_link.netloc, blacklist_domains):
logger.info(f"Excluding link for blacklisted domain: {parsed_link}") logger.debug(f"Excluding link for blacklisted domain: {parsed_link}")
return return
extra_multiplier = EXTRA_LINK_MULTIPLIER if is_extra else 1.0 extra_multiplier = EXTRA_LINK_MULTIPLIER if is_extra else 1.0

View file

@ -1,3 +1,4 @@
from logging import getLogger
from typing import Any from typing import Any
from urllib.parse import parse_qs from urllib.parse import parse_qs
@ -9,11 +10,15 @@ from mwmbl.platform.data import CurateBegin, CurateMove, CurateDelete, CurateAdd
make_curation_type make_curation_type
from mwmbl.tinysearchengine.indexer import TinyIndex, Document from mwmbl.tinysearchengine.indexer import TinyIndex, Document
from mwmbl.tokenizer import tokenize from mwmbl.tokenizer import tokenize
from mwmbl.utils import add_term_info, add_term_infos
RESULT_URL = "https://mwmbl.org/?q=" RESULT_URL = "https://mwmbl.org/?q="
MAX_CURATED_SCORE = 1_111_111.0 MAX_CURATED_SCORE = 1_111_111.0
logger = getLogger(__name__)
def create_router(index_path: str) -> Router: def create_router(index_path: str) -> Router:
router = Router(tags=["user"]) router = Router(tags=["user"])
@ -58,20 +63,24 @@ def create_router(index_path: str) -> Router:
raise ValueError(f"Should be one query value in the URL: {curation.url}") raise ValueError(f"Should be one query value in the URL: {curation.url}")
query = queries[0] query = queries[0]
print("Query", query)
tokens = tokenize(query) tokens = tokenize(query)
print("Tokens", tokens)
term = " ".join(tokens) term = " ".join(tokens)
print("Key", term)
documents = [ documents = [
Document(result.title, result.url, result.extract, MAX_CURATED_SCORE - i, term, result.curated) Document(result.title, result.url, result.extract, MAX_CURATED_SCORE - i, term, result.curated)
for i, result in enumerate(curation.results) for i, result in enumerate(curation.results)
] ]
page_index = indexer.get_key_page_index(term) page_index = indexer.get_key_page_index(term)
print("Page index", page_index) existing_documents_no_terms = indexer.get_page(page_index)
print("Storing documents", documents) existing_documents = add_term_infos(existing_documents_no_terms, indexer, page_index)
indexer.store_in_page(page_index, documents) other_documents = [doc for doc in existing_documents if doc.term != term]
logger.info(f"Found {len(other_documents)} other documents for term {term} at page {page_index} "
f"with terms { {doc.term for doc in other_documents} }")
all_documents = documents + other_documents
logger.info(f"Storing {len(all_documents)} documents at page {page_index}")
indexer.store_in_page(page_index, all_documents)
return {"curation": "ok"} return {"curation": "ok"}

View file

@ -79,6 +79,7 @@ class TinyIndexMetadata:
values = json.loads(data[constant_length:].decode('utf8')) values = json.loads(data[constant_length:].decode('utf8'))
return TinyIndexMetadata(**values) return TinyIndexMetadata(**values)
# Find the optimal amount of data that fits onto a page # Find the optimal amount of data that fits onto a page
# We do this by leveraging binary search to quickly find the index where: # We do this by leveraging binary search to quickly find the index where:
# - index+1 cannot fit onto a page # - index+1 cannot fit onto a page
@ -106,10 +107,12 @@ def _binary_search_fitting_size(compressor: ZstdCompressor, page_size: int, item
# No better match, use our index # No better match, use our index
return mid, compressed_data return mid, compressed_data
def _trim_items_to_page(compressor: ZstdCompressor, page_size: int, items:list[T]): def _trim_items_to_page(compressor: ZstdCompressor, page_size: int, items:list[T]):
# Find max number of items that fit on a page # Find max number of items that fit on a page
return _binary_search_fitting_size(compressor, page_size, items, 0, len(items)) return _binary_search_fitting_size(compressor, page_size, items, 0, len(items))
def _get_page_data(compressor: ZstdCompressor, page_size: int, items: list[T]): def _get_page_data(compressor: ZstdCompressor, page_size: int, items: list[T]):
num_fitting, serialised_data = _trim_items_to_page(compressor, page_size, items) num_fitting, serialised_data = _trim_items_to_page(compressor, page_size, items)
@ -186,7 +189,6 @@ class TinyIndex(Generic[T]):
except ZstdError: except ZstdError:
logger.exception(f"Error decompressing page data, content: {page_data}") logger.exception(f"Error decompressing page data, content: {page_data}")
return [] return []
# logger.debug(f"Decompressed data: {decompressed_data}")
return json.loads(decompressed_data.decode('utf8')) return json.loads(decompressed_data.decode('utf8'))
def store_in_page(self, page_index: int, values: list[T]): def store_in_page(self, page_index: int, values: list[T]):

View file

@ -1,5 +1,8 @@
import re import re
from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import Document, TinyIndex
DOMAIN_REGEX = re.compile(r".*://([^/]*)") DOMAIN_REGEX = re.compile(r".*://([^/]*)")
@ -17,3 +20,23 @@ def get_domain(url):
if results is None or len(results.groups()) == 0: if results is None or len(results.groups()) == 0:
raise ValueError(f"Unable to parse domain from URL {url}") raise ValueError(f"Unable to parse domain from URL {url}")
return results.group(1) return results.group(1)
def add_term_info(document: Document, index: TinyIndex, page_index: int):
tokenized = tokenize_document(document.url, document.title, document.extract, document.score)
for token in tokenized.tokens:
token_page_index = index.get_key_page_index(token)
if token_page_index == page_index:
return Document(document.title, document.url, document.extract, document.score, token)
raise ValueError("Could not find token in page index")
def add_term_infos(documents: list[Document], index: TinyIndex, page_index: int):
for document in documents:
if document.term is not None:
yield document
continue
try:
yield add_term_info(document, index, page_index)
except ValueError:
continue