Merge pull request #74 from mwmbl/evaluate-indexing

Evaluate indexing
This commit is contained in:
Daoud Clarke 2022-08-27 09:37:22 +01:00 committed by GitHub
commit b6183e00ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 267 additions and 120 deletions

58
analyse/index_local.py Normal file
View file

@ -0,0 +1,58 @@
"""
Index batches stored locally on the filesystem for the purpose of evaluation.
"""
import glob
import gzip
import json
import logging
import os
import sys
from pathlib import Path
from datetime import datetime
import spacy
from mwmbl.crawler.batch import HashedBatch
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer.index_batches import index_batches
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
LOCAL_BATCHES_PATH = f'{os.environ["HOME"]}/data/mwmbl/file/**/*.json.gz'
NUM_BATCHES = 10000
EVALUATE_INDEX_PATH = f'{os.environ["HOME"]}/data/mwmbl/evaluate-index.tinysearch'
NUM_PAGES = 1_024_000
PAGE_SIZE = 4096
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def get_batches():
for path in sorted(glob.glob(LOCAL_BATCHES_PATH, recursive=True))[:NUM_BATCHES]:
data = json.load(gzip.open(path))
yield HashedBatch.parse_obj(data)
def run():
try:
os.remove(EVALUATE_INDEX_PATH)
except FileNotFoundError:
pass
TinyIndex.create(item_factory=Document, index_path=EVALUATE_INDEX_PATH, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
batches = get_batches()
start = datetime.now()
with Database() as db:
nlp = spacy.load("en_core_web_sm")
url_db = URLDatabase(db.connection)
index_batches(batches, EVALUATE_INDEX_PATH, nlp, url_db)
end = datetime.now()
total_time = (end - start).total_seconds()
print("total_seconds:", total_time)
if __name__ == '__main__':
run()

View file

@ -1,8 +1,10 @@
import logging
import sys
import numpy as np
import spacy
from analyse.index_local import EVALUATE_INDEX_PATH
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.paths import INDEX_PATH
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
@ -35,16 +37,24 @@ def get_items():
print("Items", item)
def run():
with TinyIndex(Document, INDEX_PATH) as tiny_index:
for i in range(100000):
def run(index_path):
with TinyIndex(Document, index_path) as tiny_index:
sizes = {}
for i in range(tiny_index.num_pages):
page = tiny_index.get_page(i)
for item in page:
if ' search' in item.title:
print("Page", i, item)
if page:
sizes[i] = len(page)
if len(page) > 50:
print("Page", len(page), page)
# for item in page:
# if ' search' in item.title:
# print("Page", i, item)
print("Max", max(sizes.values()))
print("Top", sorted(sizes.values())[-100:])
print("Mean", np.mean(list(sizes.values())))
if __name__ == '__main__':
# store()
# run()
get_items()
run(EVALUATE_INDEX_PATH)
# get_items()

View file

@ -6,7 +6,7 @@ from multiprocessing import Queue
from pathlib import Path
from time import sleep
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
from mwmbl.url_queue import update_url_queue, initialize_url_queue
@ -28,6 +28,10 @@ def run(data_path: str, url_queue: Queue):
batch_cache.retrieve_batches(num_batches=10000)
except Exception:
logger.exception("Error retrieving batches")
try:
update_urls.run(batch_cache)
except Exception:
logger.exception("Error updating URLs")
try:
index_batches.run(batch_cache, index_path)
except Exception:

View file

@ -8,28 +8,34 @@ from urllib.parse import unquote
import pandas as pd
from mwmbl.tinysearchengine.indexer import Document, TokenizedDocument, TinyIndex
from mwmbl.tokenizer import tokenize, get_bigrams
DEFAULT_SCORE = 0
HTTP_START = 'http://'
HTTPS_START = 'https://'
BATCH_SIZE = 100
NUM_FIRST_TOKENS = 3
NUM_BIGRAMS = 5
def is_content_token(nlp, token):
lexeme = nlp.vocab[token.orth]
return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop
def tokenize(nlp, input_text):
cleaned_text = input_text.encode('utf8', 'replace').decode('utf8')
tokens = nlp.tokenizer(cleaned_text)
if input_text.endswith(''):
# Discard the last two tokens since there will likely be a word cut in two
tokens = tokens[:-2]
content_tokens = [token for token in tokens if is_content_token(nlp, token)]
lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens}
return lowered
STOPWORDS = set("0,1,2,3,4,5,6,7,8,9,a,A,about,above,across,after,again,against,all,almost,alone,along,already,also," \
"although,always,am,among,an,and,another,any,anyone,anything,anywhere,are,aren't,around,as,at,b,B,back," \
"be,became,because,become,becomes,been,before,behind,being,below,between,both,but,by,c,C,can,cannot,can't," \
"could,couldn't,d,D,did,didn't,do,does,doesn't,doing,done,don't,down,during,e,E,each,either,enough,even," \
"ever,every,everyone,everything,everywhere,f,F,few,find,first,for,four,from,full,further,g,G,get,give,go," \
"h,H,had,hadn't,has,hasn't,have,haven't,having,he,he'd,he'll,her,here,here's,hers,herself,he's,him," \
"himself,his,how,however,how's,i,I,i'd,if,i'll,i'm,in,interest,into,is,isn't,it,it's,its,itself,i've," \
"j,J,k,K,keep,l,L,last,least,less,let's,m,M,made,many,may,me,might,more,most,mostly,much,must,mustn't," \
"my,myself,n,N,never,next,no,nobody,noone,nor,not,nothing,now,nowhere,o,O,of,off,often,on,once,one,only," \
"or,other,others,ought,our,ours,ourselves,out,over,own,p,P,part,per,perhaps,put,q,Q,r,R,rather,s,S,same," \
"see,seem,seemed,seeming,seems,several,shan't,she,she'd,she'll,she's,should,shouldn't,show,side,since,so," \
"some,someone,something,somewhere,still,such,t,T,take,than,that,that's,the,their,theirs,them,themselves," \
"then,there,therefore,there's,these,they,they'd,they'll,they're,they've,this,those,though,three,through," \
"thus,to,together,too,toward,two,u,U,under,until,up,upon,us,v,V,very,w,W,was,wasn't,we,we'd,we'll,well," \
"we're,were,weren't,we've,what,what's,when,when's,where,where's,whether,which,while,who,whole,whom,who's," \
"whose,why,why's,will,with,within,without,won't,would,wouldn't,x,X,y,Y,yet,you,you'd,you'll,your,you're," \
"yours,yourself,yourselves,you've,z,Z".split(','))
def prepare_url_for_tokenizing(url: str):
@ -52,13 +58,23 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD
print("Processed", i)
def get_index_tokens(tokens):
first_tokens = tokens[:NUM_FIRST_TOKENS]
bigrams = get_bigrams(NUM_BIGRAMS, tokens)
return set(first_tokens + bigrams)
def tokenize_document(url, title_cleaned, extract, score, nlp):
title_tokens = tokenize(nlp, title_cleaned)
title_tokens = tokenize(title_cleaned)
prepared_url = prepare_url_for_tokenizing(unquote(url))
url_tokens = tokenize(nlp, prepared_url)
extract_tokens = tokenize(nlp, extract)
url_tokens = tokenize(prepared_url)
extract_tokens = tokenize(extract)
# print("Extract tokens", extract_tokens)
tokens = title_tokens | url_tokens | extract_tokens
tokens = get_index_tokens(title_tokens) | get_index_tokens(url_tokens) | get_index_tokens(extract_tokens)
# doc = Document(title_cleaned, url, extract, score)
# token_scores = {token: score_result([token], doc, True) for token in tokens}
# high_scoring_tokens = [k for k, v in token_scores.items() if v > 0.5]
# print("High scoring", len(high_scoring_tokens), token_scores, doc)
document = TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract, score=score)
return document

View file

@ -2,31 +2,25 @@
Index batches that are stored locally.
"""
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from typing import Iterable
from urllib.parse import urlparse
from typing import Collection, Iterable
import spacy
from mwmbl.indexer import process_batch
from spacy import Language
from mwmbl.crawler.batch import HashedBatch, Item
from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL
from mwmbl.crawler.urls import URLDatabase, URLStatus
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index import tokenize_document
from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, SCORE_FOR_SAME_DOMAIN, SCORE_FOR_DIFFERENT_DOMAIN, \
SCORE_FOR_ROOT_PATH
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.tinysearchengine.indexer import Document, TinyIndex
logger = getLogger(__name__)
EXCLUDED_DOMAINS = {'web.archive.org'}
def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple[str, str, str]]:
def get_documents_from_batches(batches: Collection[HashedBatch]) -> Iterable[tuple[str, str, str]]:
for batch in batches:
for item in batch.items:
if item.content is not None:
@ -34,36 +28,26 @@ def get_documents_from_batches(batches: Iterable[HashedBatch]) -> Iterable[tuple
def run(batch_cache: BatchCache, index_path: str):
nlp = spacy.load("en_core_web_sm")
with Database() as db:
index_db = IndexDatabase(db.connection)
logger.info("Getting local batches")
batches = index_db.get_batches_by_status(BatchStatus.LOCAL, 10000)
logger.info(f"Got {len(batches)} batch urls")
if len(batches) == 0:
return
def process(batches: Collection[HashedBatch]):
with Database() as db:
nlp = spacy.load("en_core_web_sm")
url_db = URLDatabase(db.connection)
index_batches(batches, index_path, nlp, url_db)
logger.info("Indexed pages")
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
process_batch.run(batch_cache, BatchStatus.URLS_UPDATED, BatchStatus.INDEXED, process)
record_urls_in_database(batch_data.values())
document_tuples = list(get_documents_from_batches(batch_data.values()))
urls = [url for title, url, extract in document_tuples]
logger.info(f"Got {len(urls)} document tuples")
url_db = URLDatabase(db.connection)
url_scores = url_db.get_url_scores(urls)
logger.info(f"Got {len(url_scores)} scores")
documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples]
page_documents = preprocess_documents(documents, index_path, nlp)
index_pages(index_path, page_documents)
logger.info("Indexed pages")
index_db.update_batch_status([batch.url for batch in batches], BatchStatus.INDEXED)
def index_batches(batch_data: Collection[HashedBatch], index_path: str, nlp: Language, url_db: URLDatabase):
document_tuples = list(get_documents_from_batches(batch_data))
urls = [url for title, url, extract in document_tuples]
logger.info(f"Got {len(urls)} document tuples")
url_scores = url_db.get_url_scores(urls)
logger.info(f"Got {len(url_scores)} scores")
documents = [Document(title, url, extract, url_scores.get(url, 1.0)) for title, url, extract in document_tuples]
page_documents = preprocess_documents(documents, index_path, nlp)
index_pages(index_path, page_documents)
def index_pages(index_path, page_documents):
@ -107,49 +91,6 @@ def get_url_error_status(item: Item):
return URLStatus.ERROR_OTHER
def record_urls_in_database(batches: Iterable[HashedBatch]):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
url_users = {}
url_timestamps = {}
url_statuses = defaultdict(lambda: URLStatus.NEW)
for batch in batches:
for item in batch.items:
timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0)
url_timestamps[item.url] = timestamp
url_users[item.url] = batch.user_id_hash
if item.content is None:
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
if parsed_link.netloc in EXCLUDED_DOMAINS:
continue
parsed_link = urlparse(link)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
def get_datetime_from_timestamp(timestamp: float) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime
# TODO: clean unicode at some point
def clean_unicode(s: str) -> str:
return s.encode('utf-8', 'ignore').decode('utf-8')

View file

@ -8,9 +8,10 @@ from psycopg2.extras import execute_values
class BatchStatus(Enum):
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 1 # We have a copy of the batch locally in Postgresql
INDEXED = 2
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 10 # We have a copy of the batch locally in Postgresql
URLS_UPDATED = 20 # We've updated URLs from the batch
INDEXED = 30 # The batch has been indexed
@dataclass

View file

@ -0,0 +1,29 @@
from logging import getLogger
from typing import Callable, Collection
from mwmbl.crawler.batch import HashedBatch
from mwmbl.database import Database
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.indexdb import BatchStatus, IndexDatabase
logger = getLogger(__name__)
def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus,
process: Callable[[Collection[HashedBatch]], None]):
with Database() as db:
index_db = IndexDatabase(db.connection)
logger.info(f"Getting batches with status {start_status}")
batches = index_db.get_batches_by_status(start_status, 10000)
logger.info(f"Got {len(batches)} batch urls")
if len(batches) == 0:
return
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
process(batch_data.values())
index_db.update_batch_status([batch.url for batch in batches], end_status)

View file

@ -0,0 +1,67 @@
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from typing import Iterable, Collection
from urllib.parse import urlparse
from mwmbl.crawler.batch import HashedBatch
from mwmbl.crawler.urls import URLDatabase, URLStatus, FoundURL
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer import process_batch
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index_batches import get_url_error_status
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH
logger = getLogger(__name__)
def run(batch_cache: BatchCache):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database)
def record_urls_in_database(batches: Collection[HashedBatch]):
logger.info(f"Recording URLs in database for {len(batches)} batches")
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
url_users = {}
url_timestamps = {}
url_statuses = defaultdict(lambda: URLStatus.NEW)
for batch in batches:
for item in batch.items:
timestamp = get_datetime_from_timestamp(item.timestamp / 1000.0)
url_timestamps[item.url] = timestamp
url_users[item.url] = batch.user_id_hash
if item.content is None:
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
parsed_link = urlparse(link)
if parsed_link.netloc in EXCLUDED_DOMAINS:
continue
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
def get_datetime_from_timestamp(timestamp: float) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime

View file

@ -27,3 +27,4 @@ SCORE_FOR_ROOT_PATH = 0.1
SCORE_FOR_DIFFERENT_DOMAIN = 1.0
SCORE_FOR_SAME_DOMAIN = 0.01
UNKNOWN_DOMAIN_MULTIPLIER = 0.001
EXCLUDED_DOMAINS = {'web.archive.org'}

View file

@ -1,6 +1,6 @@
import json
import os
from dataclasses import astuple, dataclass, asdict
from dataclasses import dataclass, asdict
from io import UnsupportedOperation, BytesIO
from logging import getLogger
from mmap import mmap, PROT_READ, PROT_WRITE
@ -20,6 +20,10 @@ PAGE_SIZE = 4096
logger = getLogger(__name__)
def astuple(dc):
return tuple(dc.__dict__.values())
@dataclass
class Document:
title: str

View file

@ -5,6 +5,7 @@ from logging import getLogger
from operator import itemgetter
from urllib.parse import urlparse
from mwmbl.tokenizer import tokenize, get_bigrams
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
@ -34,7 +35,7 @@ def _get_query_regex(terms, is_complete, is_url):
return pattern
def _score_result(terms: list[str], result: Document, is_complete: bool):
def score_result(terms: list[str], result: Document, is_complete: bool):
features = get_features(terms, result.title, result.url, result.extract, result.score, is_complete)
length_penalty = math.e ** (-LENGTH_PENALTY * len(result.url))
@ -115,7 +116,7 @@ def order_results(terms: list[str], results: list[Document], is_complete: bool)
if len(results) == 0:
return []
results_and_scores = [(_score_result(terms, result, is_complete), result) for result in results]
results_and_scores = [(score_result(terms, result, is_complete), result) for result in results]
ordered_results = sorted(results_and_scores, key=itemgetter(0), reverse=True)
filtered_results = [result for score, result in ordered_results if score > SCORE_THRESHOLD]
return filtered_results
@ -171,7 +172,7 @@ class Ranker:
return [q, urls + completed]
def get_results(self, q):
terms = [x.lower() for x in q.replace('.', ' ').split()]
terms = tokenize(q)
is_complete = q.endswith(' ')
if len(terms) > 0 and not is_complete:
completions = self.completer.complete(terms[-1])
@ -180,16 +181,18 @@ class Ranker:
completions = []
retrieval_terms = set(terms)
bigrams = set(get_bigrams(len(terms), terms))
pages = []
seen_items = set()
for term in retrieval_terms:
for term in retrieval_terms | bigrams:
items = self.tiny_index.retrieve(term)
if items is not None:
for item in items:
if term in item.title.lower() or term in item.extract.lower():
if item.title not in seen_items:
pages.append(item)
seen_items.add(item.title)
# if term in item.title.lower() or term in item.extract.lower():
if item.title not in seen_items:
pages.append(item)
seen_items.add(item.title)
ordered_results = self.order_results(terms, pages, is_complete)
return ordered_results, terms, completions

13
mwmbl/tokenizer.py Normal file
View file

@ -0,0 +1,13 @@
def tokenize(input_text):
cleaned_text = input_text.encode('utf8', 'replace').decode('utf8')
tokens = cleaned_text.lower().split()
if input_text.endswith(''):
# Discard the last two tokens since there will likely be a word cut in two
tokens = tokens[:-2]
return tokens
def get_bigrams(num_bigrams, tokens):
num_bigrams = min(num_bigrams, len(tokens) - 1)
bigrams = [f'{tokens[i]} {tokens[i + 1]}' for i in range(num_bigrams)]
return bigrams