Merge pull request #55 from mwmbl/index-continuously

Index continuously
This commit is contained in:
Daoud Clarke 2022-07-01 20:55:24 +01:00 committed by GitHub
commit 4967830ae1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 181129 additions and 442 deletions

View file

@ -33,11 +33,12 @@ FROM base as final
# Copy only the required /venv directory from the builder image that contains mwmbl and its dependencies
COPY --from=builder /venv /venv
# Working directory is /app
# Copying data and config into /app so that relative (default) paths in the config work
COPY data /app/data
COPY config /app/config
ADD nginx.conf.sigil /app
# Set up a volume where the data will live
VOLUME ["/data"]
EXPOSE 5000
# Using the mwmbl-tinysearchengine binary/entrypoint which comes packaged with mwmbl
# TODO: fix the arguments for the recent changes
CMD ["/venv/bin/mwmbl-tinysearchengine", "--config", "config/tinysearchengine.yaml"]
CMD ["/venv/bin/mwmbl-tinysearchengine"]

View file

@ -7,15 +7,23 @@ import json
from collections import defaultdict, Counter
from urllib.parse import urlparse
from mwmbl.indexer.paths import CRAWL_GLOB
from mwmbl.crawler.batch import HashedBatch
from mwmbl.indexer.paths import CRAWL_GLOB, MWMBL_DATA_DIR
# TODO: remove this line - temporary override
CRAWL_GLOB = str(MWMBL_DATA_DIR / "b2") + "/*/*/2022-06-23/*/*/*.json.gz"
def get_urls():
for path in glob.glob(CRAWL_GLOB):
data = json.load(gzip.open(path))
user = data['user_id_hash']
for item in data['items']:
yield user, item['url']
batch = HashedBatch.parse_obj(data)
user = batch.user_id_hash
for item in batch.items:
if item.content is not None:
for url in item.content.links:
yield user, url
def analyse_urls(urls):

View file

@ -1,7 +1,7 @@
import json
from mwmbl.indexer.paths import TOP_DOMAINS_JSON_PATH
from mwmbl.tinysearchengine.hn_top_domains_filtered import DOMAINS
from mwmbl.hn_top_domains_filtered import DOMAINS
def export_top_domains_to_json():

View file

@ -4,7 +4,7 @@ Export the list of unique URLs to a SQLite file for analysis/evaluation.
import sqlite3
from mwmbl.indexer.paths import URLS_PATH
from mwmbl.tinysearchengine.app import get_config_and_index
from mwmbl.app import get_config_and_index
def create_database():

View file

@ -0,0 +1,19 @@
"""
Count unique URLs in the index.
"""
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run():
urls = set()
with TinyIndex(Document, 'data/index.tinysearch') as index:
for i in range(index.num_pages):
print("Page", i)
page = index.get_page(i)
new_urls = {doc.url for doc in page}
urls |= new_urls
print("URLs", len(urls))
if __name__ == '__main__':
run()

View file

@ -0,0 +1,4 @@
"""
Analyse recent batches looking for duplicates.
"""

28
mwmbl/background.py Normal file
View file

@ -0,0 +1,28 @@
"""
Script that updates data in a background process.
"""
from logging import getLogger
from mwmbl.indexer import historical
from mwmbl.indexer.preprocess import run_preprocessing
from mwmbl.indexer.retrieve import retrieve_batches
from mwmbl.indexer.update_pages import run_update
logger = getLogger(__name__)
def run(index_path: str):
historical.run()
while True:
try:
retrieve_batches()
except Exception:
logger.exception("Error retrieving batches")
try:
run_preprocessing(index_path)
except Exception:
logger.exception("Error preprocessing")
try:
run_update(index_path)
except Exception:
logger.exception("Error running index update")

View file

285
mwmbl/crawler/app.py Normal file
View file

@ -0,0 +1,285 @@
import gzip
import hashlib
import json
import os
import re
from collections import defaultdict
from datetime import datetime, timezone, timedelta, date
from typing import Union
from urllib.parse import urlparse
from uuid import uuid4
import boto3
import requests
from fastapi import HTTPException, APIRouter
from mwmbl.crawler.batch import Batch, NewBatchRequest, HashedBatch
from mwmbl.crawler.urls import URLDatabase, FoundURL, URLStatus
from mwmbl.database import Database
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.tinysearchengine.indexer import Document
APPLICATION_KEY = os.environ['MWMBL_APPLICATION_KEY']
KEY_ID = os.environ['MWMBL_KEY_ID']
ENDPOINT_URL = 'https://s3.us-west-004.backblazeb2.com'
BUCKET_NAME = 'mwmbl-crawl'
MAX_BATCH_SIZE = 100
USER_ID_LENGTH = 36
PUBLIC_USER_ID_LENGTH = 64
VERSION = 'v1'
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2}')
PUBLIC_URL_PREFIX = f'https://f004.backblazeb2.com/file/{BUCKET_NAME}/'
FILE_NAME_SUFFIX = '.json.gz'
SCORE_FOR_ROOT_PATH = 0.1
SCORE_FOR_DIFFERENT_DOMAIN = 1.0
SCORE_FOR_SAME_DOMAIN = 0.01
router = APIRouter(prefix="/crawler", tags=["crawler"])
def get_bucket(name):
s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
aws_secret_access_key=APPLICATION_KEY)
return s3.Object(BUCKET_NAME, name)
def upload(data: bytes, name: str):
bucket = get_bucket(name)
result = bucket.put(Body=data)
return result
last_batch = None
@router.on_event("startup")
async def on_startup():
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.create_tables()
@router.post('/batches/')
def create_batch(batch: Batch):
if len(batch.items) > MAX_BATCH_SIZE:
raise HTTPException(400, f"Batch size too large (maximum {MAX_BATCH_SIZE}), got {len(batch.items)}")
if len(batch.user_id) != USER_ID_LENGTH:
raise HTTPException(400, f"User ID length is incorrect, should be {USER_ID_LENGTH} characters")
if len(batch.items) == 0:
return {
'status': 'ok',
}
user_id_hash = _get_user_id_hash(batch)
now = datetime.now(timezone.utc)
seconds = (now - datetime(now.year, now.month, now.day, tzinfo=timezone.utc)).seconds
# How to pad a string with zeros: https://stackoverflow.com/a/39402910
# Maximum seconds in a day is 60*60*24 = 86400, so 5 digits is enough
padded_seconds = str(seconds).zfill(5)
# See discussion here: https://stackoverflow.com/a/13484764
uid = str(uuid4())[:8]
filename = f'1/{VERSION}/{now.date()}/1/{user_id_hash}/{padded_seconds}__{uid}.json.gz'
# Using an approach from https://stackoverflow.com/a/30476450
epoch_time = (now - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()
hashed_batch = HashedBatch(user_id_hash=user_id_hash, timestamp=epoch_time, items=batch.items)
data = gzip.compress(hashed_batch.json().encode('utf8'))
upload(data, filename)
record_urls_in_database(batch, user_id_hash, now)
queue_batch(hashed_batch)
global last_batch
last_batch = hashed_batch
return {
'status': 'ok',
'public_user_id': user_id_hash,
'url': f'{PUBLIC_URL_PREFIX}{filename}',
}
def _get_user_id_hash(batch: Union[Batch, NewBatchRequest]):
return hashlib.sha3_256(batch.user_id.encode('utf8')).hexdigest()
@router.post('/batches/new')
def request_new_batch(batch_request: NewBatchRequest):
user_id_hash = _get_user_id_hash(batch_request)
with Database() as db:
url_db = URLDatabase(db.connection)
return url_db.get_new_batch_for_user(user_id_hash)
@router.post('/batches/historical')
def create_historical_batch(batch: HashedBatch):
"""
Update the database state of URL crawling for old data
"""
user_id_hash = batch.user_id_hash
batch_datetime = get_datetime_from_timestamp(batch.timestamp)
record_urls_in_database(batch, user_id_hash, batch_datetime)
def get_datetime_from_timestamp(timestamp: int) -> datetime:
batch_datetime = datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(seconds=timestamp)
return batch_datetime
def record_urls_in_database(batch: Union[Batch, HashedBatch], user_id_hash: str, timestamp: datetime):
with Database() as db:
url_db = URLDatabase(db.connection)
url_scores = defaultdict(float)
for item in batch.items:
if item.content is not None:
crawled_page_domain = urlparse(item.url).netloc
if crawled_page_domain not in DOMAINS:
continue
for link in item.content.links:
parsed_link = urlparse(link)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH
found_urls = [FoundURL(url, user_id_hash, score, URLStatus.NEW, timestamp) for url, score in url_scores.items()]
if len(found_urls) > 0:
url_db.update_found_urls(found_urls)
crawled_urls = [FoundURL(item.url, user_id_hash, 0.0, URLStatus.CRAWLED, timestamp)
for item in batch.items]
url_db.update_found_urls(crawled_urls)
# TODO:
# - delete existing crawl data for change from INT to FLOAT
def get_batches_for_date(date_str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
cache_filename = prefix + 'batches.json.gz'
cache_url = PUBLIC_URL_PREFIX + cache_filename
try:
cached_batches = json.loads(gzip.decompress(requests.get(cache_url).content))
print(f"Got cached batches for {date_str}")
return cached_batches
except gzip.BadGzipFile:
pass
batches = get_batches_for_prefix(prefix)
result = {'batch_urls': [f'{PUBLIC_URL_PREFIX}{batch}' for batch in sorted(batches)]}
if date_str != str(date.today()):
# Don't cache data from today since it may change
data = gzip.compress(json.dumps(result).encode('utf8'))
upload(data, cache_filename)
print(f"Cached batches for {date_str} in {PUBLIC_URL_PREFIX}{cache_filename}")
return result
def get_user_id_hash_from_url(url):
return url.split('/')[9]
@router.get('/batches/{date_str}/users/{public_user_id}')
def get_batches_for_date_and_user(date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
prefix = f'1/{VERSION}/{date_str}/1/{public_user_id}/'
return get_batch_ids_for_prefix(prefix)
def check_public_user_id(public_user_id):
if len(public_user_id) != PUBLIC_USER_ID_LENGTH:
raise HTTPException(400, f"Incorrect public user ID length, should be {PUBLIC_USER_ID_LENGTH}")
@router.get('/batches/{date_str}/users/{public_user_id}/batch/{batch_id}')
def get_batch_from_id(date_str, public_user_id, batch_id):
url = get_batch_url(batch_id, date_str, public_user_id)
data = json.loads(gzip.decompress(requests.get(url).content))
return {
'url': url,
'batch': data,
}
def get_batch_url(batch_id, date_str, public_user_id):
check_date_str(date_str)
check_public_user_id(public_user_id)
url = f'{PUBLIC_URL_PREFIX}1/{VERSION}/{date_str}/1/{public_user_id}/{batch_id}{FILE_NAME_SUFFIX}'
return url
@router.get('/latest-batch', response_model=list[HashedBatch])
def get_latest_batch():
return [] if last_batch is None else [last_batch]
def get_batch_id_from_file_name(file_name: str):
assert file_name.endswith(FILE_NAME_SUFFIX)
return file_name[:-len(FILE_NAME_SUFFIX)]
def get_batch_ids_for_prefix(prefix):
filenames = get_batches_for_prefix(prefix)
filename_endings = sorted(filename.rsplit('/', 1)[1] for filename in filenames)
results = {'batch_ids': [get_batch_id_from_file_name(name) for name in filename_endings]}
return results
def get_batches_for_prefix(prefix):
s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
aws_secret_access_key=APPLICATION_KEY)
bucket = s3.Bucket(BUCKET_NAME)
items = bucket.objects.filter(Prefix=prefix)
filenames = [item.key for item in items]
return filenames
@router.get('/batches/{date_str}/users')
def get_user_id_hashes_for_date(date_str: str):
check_date_str(date_str)
prefix = f'1/{VERSION}/{date_str}/1/'
return get_subfolders(prefix)
def check_date_str(date_str):
if not DATE_REGEX.match(date_str):
raise HTTPException(400, f"Incorrect date format, should be YYYY-MM-DD")
def get_subfolders(prefix):
client = boto3.client('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
aws_secret_access_key=APPLICATION_KEY)
items = client.list_objects(Bucket=BUCKET_NAME,
Prefix=prefix,
Delimiter='/')
item_keys = [item['Prefix'][len(prefix):].strip('/') for item in items['CommonPrefixes']]
return item_keys
@router.get('/')
def status():
return {
'status': 'ok'
}
def queue_batch(batch: HashedBatch):
# TODO: get the score from the URLs database
# TODO: also queue documents for batches sent through the API
documents = [Document(item.content.title, item.url, item.content.extract, 1)
for item in batch.items if item.content is not None]
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.queue_documents(documents)

37
mwmbl/crawler/batch.py Normal file
View file

@ -0,0 +1,37 @@
from typing import Optional
from pydantic import BaseModel
class ItemContent(BaseModel):
title: str
extract: str
links: list[str]
class ItemError(BaseModel):
name: str
message: Optional[str]
class Item(BaseModel):
url: str
status: Optional[int]
timestamp: int
content: Optional[ItemContent]
error: Optional[ItemError]
class Batch(BaseModel):
user_id: str
items: list[Item]
class NewBatchRequest(BaseModel):
user_id: str
class HashedBatch(BaseModel):
user_id_hash: str
timestamp: int
items: list[Item]

141
mwmbl/crawler/urls.py Normal file
View file

@ -0,0 +1,141 @@
"""
Database storing info on URLs
"""
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from psycopg2 import connect
from psycopg2.extras import execute_values
# Client has one hour to crawl a URL that has been assigned to them, or it will be reassigned
from mwmbl.database import Database
REASSIGN_MIN_HOURS = 1
BATCH_SIZE = 100
class URLStatus(Enum):
"""
URL state update is idempotent and can only progress forwards.
"""
NEW = 0 # One user has identified this URL
ASSIGNED = 2 # The crawler has given the URL to a user to crawl
CRAWLED = 3 # At least one user has crawled the URL
@dataclass
class FoundURL:
url: str
user_id_hash: str
score: float
status: URLStatus
timestamp: datetime
class URLDatabase:
def __init__(self, connection):
self.connection = connection
def create_tables(self):
sql = """
CREATE TABLE IF NOT EXISTS urls (
url VARCHAR PRIMARY KEY,
status INT NOT NULL DEFAULT 0,
user_id_hash VARCHAR NOT NULL,
score FLOAT NOT NULL DEFAULT 1,
updated TIMESTAMP NOT NULL DEFAULT NOW()
)
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
def update_found_urls(self, found_urls: list[FoundURL]):
if len(found_urls) == 0:
return
get_urls_sql = """
SELECT url FROM urls
WHERE url in %(urls)s
"""
lock_urls_sql = """
SELECT url FROM urls
WHERE url in %(urls)s
FOR UPDATE SKIP LOCKED
"""
insert_sql = f"""
INSERT INTO urls (url, status, user_id_hash, score, updated) values %s
ON CONFLICT (url) DO UPDATE SET
status = GREATEST(urls.status, excluded.status),
user_id_hash = CASE
WHEN urls.status > excluded.status THEN urls.user_id_hash ELSE excluded.user_id_hash
END,
score = urls.score + excluded.score,
updated = CASE
WHEN urls.status > excluded.status THEN urls.updated ELSE excluded.updated
END
"""
input_urls = [x.url for x in found_urls]
assert len(input_urls) == len(set(input_urls))
with self.connection as connection:
with connection.cursor() as cursor:
cursor.execute(get_urls_sql, {'urls': tuple(input_urls)})
existing_urls = {x[0] for x in cursor.fetchall()}
new_urls = set(input_urls) - existing_urls
cursor.execute(lock_urls_sql, {'urls': tuple(input_urls)})
locked_urls = {x[0] for x in cursor.fetchall()}
urls_to_insert = new_urls | locked_urls
if len(urls_to_insert) != len(input_urls):
print(f"Only got {len(urls_to_insert)} instead of {len(input_urls)} - {len(new_urls)} new")
sorted_urls = sorted(found_urls, key=lambda x: x.url)
data = [
(found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp)
for found_url in sorted_urls if found_url.url in urls_to_insert]
execute_values(cursor, insert_sql, data)
def get_new_batch_for_user(self, user_id_hash: str):
sql = f"""
UPDATE urls SET status = {URLStatus.ASSIGNED.value}, user_id_hash = %(user_id_hash)s, updated = %(now)s
WHERE url IN (
SELECT url FROM urls
WHERE status = {URLStatus.NEW.value} OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
ORDER BY score DESC
LIMIT {BATCH_SIZE}
FOR UPDATE SKIP LOCKED
)
RETURNING url
"""
now = datetime.utcnow()
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
with self.connection.cursor() as cursor:
cursor.execute(sql, {'user_id_hash': user_id_hash, 'min_updated_date': min_updated_date, 'now': now})
results = cursor.fetchall()
return [result[0] for result in results]
if __name__ == "__main__":
with Database() as db:
url_db = URLDatabase(db.connection)
url_db.create_tables()
# update_url_status(conn, [URLStatus("https://mwmbl.org", URLState.NEW, "test-user", datetime.now())])
# url_db.user_found_urls("Test user", ["a", "b", "c"], datetime.utcnow())
# url_db.user_found_urls("Another user", ["b", "c", "d"], datetime.utcnow())
# url_db.user_crawled_urls("Test user", ["c"], datetime.utcnow())
batch = url_db.get_new_batch_for_user('test user 4')
print("Batch", len(batch), batch)

16
mwmbl/database.py Normal file
View file

@ -0,0 +1,16 @@
import os
from psycopg2 import connect
class Database:
def __init__(self):
self.connection = None
def __enter__(self):
self.connection = connect(os.environ["DATABASE_URL"])
self.connection.set_session(autocommit=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.close()

View file

@ -1,10 +0,0 @@
from itertools import islice
from typing import Iterator
def grouper(n: int, iterator: Iterator):
while True:
chunk = tuple(islice(iterator, n))
if not chunk:
return
yield chunk

View file

@ -4,8 +4,9 @@ Dedupe pages that have been crawled more than once and prepare them for indexing
import glob
import gzip
import json
from itertools import islice
from typing import Iterator
from mwmbl.indexer.batch import grouper
from mwmbl.indexer.fsqueue import FSQueue, GzipJsonBlobSerializer
from mwmbl.indexer.paths import CRAWL_GLOB, TINYSEARCH_DATA_DIR
@ -40,3 +41,11 @@ def run():
if __name__ == '__main__':
run()
def grouper(n: int, iterator: Iterator):
while True:
chunk = tuple(islice(iterator, n))
if not chunk:
return
yield chunk

View file

@ -11,7 +11,7 @@ import pandas as pd
DATA_DIR = Path(os.environ['HOME']) / 'data' / 'tinysearch'
ALL_DOMAINS_PATH = DATA_DIR / 'hn-top-domains.csv'
TOP_DOMAINS_PATH = '../tinysearchengine/hn_top_domains_filtered.py'
TOP_DOMAINS_PATH = '../hn_top_domains_filtered.py'
MIN_COUNT = 10
PROBABILITY_THRESHOLD = 0.8

View file

@ -0,0 +1,25 @@
from datetime import date, timedelta
from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_url, \
get_batches_for_date, get_user_id_hash_from_url
from mwmbl.database import Database
from mwmbl.indexer.indexdb import BatchInfo, BatchStatus, IndexDatabase
DAYS = 10
def run():
for day in range(DAYS):
date_str = str(date.today() - timedelta(days=day))
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
batches = get_batches_for_date(date_str)
batch_urls = batches['batch_urls']
print("Historical batches for date", date_str, len(batch_urls))
infos = [BatchInfo(url, get_user_id_hash_from_url(url), BatchStatus.REMOTE) for url in batch_urls]
index_db.record_batches(infos)
if __name__ == '__main__':
run()

View file

@ -45,19 +45,24 @@ def prepare_url_for_tokenizing(url: str):
def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedDocument]:
for i, (title_cleaned, url, extract) in enumerate(titles_urls_and_extracts):
title_tokens = tokenize(nlp, title_cleaned)
prepared_url = prepare_url_for_tokenizing(unquote(url))
url_tokens = tokenize(nlp, prepared_url)
extract_tokens = tokenize(nlp, extract)
print("Extract tokens", extract_tokens)
tokens = title_tokens | url_tokens | extract_tokens
score = link_counts.get(url, DEFAULT_SCORE)
yield TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract, score=score)
yield tokenize_document(url, title_cleaned, extract, score, nlp)
if i % 1000 == 0:
print("Processed", i)
def tokenize_document(url, title_cleaned, extract, score, nlp):
title_tokens = tokenize(nlp, title_cleaned)
prepared_url = prepare_url_for_tokenizing(unquote(url))
url_tokens = tokenize(nlp, prepared_url)
extract_tokens = tokenize(nlp, extract)
# print("Extract tokens", extract_tokens)
tokens = title_tokens | url_tokens | extract_tokens
document = TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract, score=score)
return document
def index_titles_urls_and_extracts(indexer: TinyIndex, nlp, titles_urls_and_extracts, link_counts, terms_path):
terms = Counter()
pages = get_pages(nlp, titles_urls_and_extracts, link_counts)

169
mwmbl/indexer/indexdb.py Normal file
View file

@ -0,0 +1,169 @@
"""
Database interface for batches of crawled data.
"""
from dataclasses import dataclass
from enum import Enum
from psycopg2.extras import execute_values
from mwmbl.tinysearchengine.indexer import Document
class BatchStatus(Enum):
REMOTE = 0 # The batch only exists in long term storage
LOCAL = 1 # We have a copy of the batch locally in Postgresql
class DocumentStatus(Enum):
NEW = 0
PREPROCESSING = 1
@dataclass
class BatchInfo:
url: str
user_id_hash: str
status: BatchStatus
class IndexDatabase:
def __init__(self, connection):
self.connection = connection
def create_tables(self):
batches_sql = """
CREATE TABLE IF NOT EXISTS batches (
url VARCHAR PRIMARY KEY,
user_id_hash VARCHAR NOT NULL,
status INT NOT NULL
)
"""
documents_sql = """
CREATE TABLE IF NOT EXISTS documents (
url VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
extract VARCHAR NOT NULL,
score FLOAT NOT NULL,
status INT NOT NULL
)
"""
document_pages_sql = """
CREATE TABLE IF NOT EXISTS document_pages (
url VARCHAR NOT NULL,
page INT NOT NULL
)
"""
document_pages_index_sql = """
CREATE INDEX IF NOT EXISTS document_pages_page_index ON document_pages (page)
"""
with self.connection.cursor() as cursor:
cursor.execute(batches_sql)
cursor.execute(documents_sql)
cursor.execute(document_pages_sql)
cursor.execute(document_pages_index_sql)
def record_batches(self, batch_infos: list[BatchInfo]):
sql = """
INSERT INTO batches (url, user_id_hash, status) values %s
ON CONFLICT (url) DO NOTHING
"""
data = [(info.url, info.user_id_hash, info.status.value) for info in batch_infos]
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)
def get_batches_by_status(self, status: BatchStatus) -> list[BatchInfo]:
sql = """
SELECT * FROM batches WHERE status = %(status)s LIMIT 1000
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'status': status.value})
results = cursor.fetchall()
return [BatchInfo(url, user_id_hash, status) for url, user_id_hash, status in results]
def update_batch_status(self, batch_urls: list[str], status: BatchStatus):
sql = """
UPDATE batches SET status = %(status)s
WHERE url IN %(urls)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'status': status.value, 'urls': tuple(batch_urls)})
def queue_documents(self, documents: list[Document]):
sql = """
INSERT INTO documents (url, title, extract, score, status)
VALUES %s
ON CONFLICT (url) DO NOTHING
"""
sorted_documents = sorted(documents, key=lambda x: x.url)
data = [(document.url, document.title, document.extract, document.score, DocumentStatus.NEW.value)
for document in sorted_documents]
print("Queueing documents", len(data))
with self.connection.cursor() as cursor:
execute_values(cursor, sql, data)
def get_documents_for_preprocessing(self):
sql = f"""
UPDATE documents SET status = {DocumentStatus.PREPROCESSING.value}
WHERE url IN (
SELECT url FROM documents
WHERE status = {DocumentStatus.NEW.value}
LIMIT 1000
FOR UPDATE SKIP LOCKED
)
RETURNING url, title, extract, score
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [Document(title, url, extract, score) for url, title, extract, score in results]
def queue_documents_for_page(self, urls_and_page_indexes: list[tuple[str, int]]):
sql = """
INSERT INTO document_pages (url, page) values %s
"""
print(f"Queuing {len(urls_and_page_indexes)} urls and page indexes")
with self.connection.cursor() as cursor:
execute_values(cursor, sql, urls_and_page_indexes)
def get_queued_documents_for_page(self, page_index: int) -> list[Document]:
sql = """
SELECT d.url, title, extract, score
FROM document_pages p INNER JOIN documents d ON p.url = d.url
WHERE p.page = %(page_index)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'page_index': page_index})
results = cursor.fetchall()
return [Document(title, url, extract, score) for url, title, extract, score in results]
def get_queued_pages(self) -> list[int]:
sql = """
SELECT DISTINCT page FROM document_pages ORDER BY page
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [x[0] for x in results]
def clear_queued_documents_for_page(self, page_index: int):
sql = """
DELETE FROM document_pages
WHERE page = %(page_index)s
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, {'page_index': page_index})

View file

@ -0,0 +1,42 @@
"""
Preprocess local documents for indexing.
"""
import traceback
from time import sleep
import spacy
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.indexer.index import tokenize_document
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run(index_path):
while True:
try:
run_preprocessing(index_path)
except Exception as e:
print("Exception preprocessing")
traceback.print_exception(type(e), e, e.__traceback__)
sleep(10)
def run_preprocessing(index_path):
nlp = spacy.load("en_core_web_sm")
with Database() as db:
index_db = IndexDatabase(db.connection)
for i in range(100):
documents = index_db.get_documents_for_preprocessing()
print(f"Got {len(documents)} documents for preprocessing")
if len(documents) == 0:
break
with TinyIndex(Document, index_path, 'w') as indexer:
for document in documents:
tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
if __name__ == '__main__':
run('data/index.tinysearch')

68
mwmbl/indexer/retrieve.py Normal file
View file

@ -0,0 +1,68 @@
"""
Retrieve remote batches and store them in Postgres locally
"""
import gzip
import json
import traceback
from multiprocessing.pool import ThreadPool
from time import sleep
from pydantic import ValidationError
from mwmbl.crawler.app import create_historical_batch, queue_batch
from mwmbl.crawler.batch import HashedBatch
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase, BatchStatus
from mwmbl.retry import retry_requests
NUM_THREADS = 5
def retrieve_batches():
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
with Database() as db:
index_db = IndexDatabase(db.connection)
for i in range(100):
batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
print(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
pool = ThreadPool(NUM_THREADS)
results = pool.imap_unordered(retrieve_batch, urls)
for result in results:
if result > 0:
print("Processed batch with items:", result)
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(url):
data = json.loads(gzip.decompress(retry_requests.get(url).content))
try:
batch = HashedBatch.parse_obj(data)
except ValidationError:
print("Failed to validate batch", data)
return 0
if len(batch.items) > 0:
print(f"Retrieved batch with {len(batch.items)} items")
create_historical_batch(batch)
queue_batch(batch)
return len(batch.items)
def run():
while True:
try:
retrieve_batches()
except Exception as e:
print("Exception retrieving batch")
traceback.print_exception(type(e), e, e.__traceback__)
sleep(10)
if __name__ == '__main__':
retrieve_batches()

View file

@ -0,0 +1,49 @@
"""
Iterate over each page in the index and update it based on what is in the index database.
"""
import traceback
from time import sleep
from mwmbl.database import Database
from mwmbl.indexer.indexdb import IndexDatabase
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
def run_update(index_path):
with Database() as db:
index_db = IndexDatabase(db.connection)
index_db.create_tables()
with TinyIndex(Document, index_path, 'w') as indexer:
with Database() as db:
index_db = IndexDatabase(db.connection)
pages_to_process = index_db.get_queued_pages()
print(f"Got {len(pages_to_process)} pages to process")
for i in pages_to_process:
documents = index_db.get_queued_documents_for_page(i)
print(f"Documents queued for page {i}: {len(documents)}")
if len(documents) > 0:
for j in range(3):
try:
indexer.add_to_page(i, documents)
break
except ValueError:
documents = documents[:len(documents)//2]
if len(documents) == 0:
break
print(f"Not enough space, adding {len(documents)}")
index_db.clear_queued_documents_for_page(i)
def run(index_path):
while True:
try:
run_update(index_path)
except Exception as e:
print("Exception updating pages in index")
traceback.print_exception(type(e), e, e.__traceback__)
sleep(10)
if __name__ == '__main__':
run_update('data/index.tinysearch')

67
mwmbl/main.py Normal file
View file

@ -0,0 +1,67 @@
import argparse
import logging
import os
from multiprocessing import Process
import uvicorn
from fastapi import FastAPI
from mwmbl import background
from mwmbl.indexer import historical, retrieve, preprocess, update_pages
from mwmbl.crawler.app import router as crawler_router
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, NUM_PAGES, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
logging.basicConfig()
def setup_args():
parser = argparse.ArgumentParser(description="mwmbl-tinysearchengine")
parser.add_argument("--index", help="Path to the tinysearchengine index file", default="/app/storage/index.tinysearch")
args = parser.parse_args()
return args
def run():
args = setup_args()
try:
existing_index = TinyIndex(item_factory=Document, index_path=args.index)
if existing_index.page_size != PAGE_SIZE or existing_index.num_pages != NUM_PAGES:
print(f"Existing index page sizes ({existing_index.page_size}) and number of pages "
f"({existing_index.num_pages}) does not match - removing.")
os.remove(args.index)
existing_index = None
except FileNotFoundError:
existing_index = None
if existing_index is None:
print("Creating a new index")
TinyIndex.create(item_factory=Document, index_path=args.index, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
Process(target=background.run, args=(args.index,)).start()
# Process(target=historical.run).start()
# Process(target=retrieve.run).start()
# Process(target=preprocess.run, args=(args.index,)).start()
# Process(target=update_pages.run, args=(args.index,)).start()
completer = Completer()
with TinyIndex(item_factory=Document, index_path=args.index) as tiny_index:
ranker = HeuristicRanker(tiny_index, completer)
# Initialize FastApi instance
app = FastAPI()
search_router = search.create_router(ranker)
app.include_router(search_router)
app.include_router(crawler_router)
# Initialize uvicorn server using global app instance and server config params
uvicorn.run(app, host="0.0.0.0", port=5000)
if __name__ == "__main__":
run()

File diff suppressed because it is too large Load diff

17
mwmbl/retry.py Normal file
View file

@ -0,0 +1,17 @@
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
MAX_RETRY = 2
MAX_RETRY_FOR_SESSION = 2
BACK_OFF_FACTOR = 0.3
TIME_BETWEEN_RETRIES = 1000
ERROR_CODES = (500, 502, 504)
retry_requests = requests.Session()
retry = Retry(total=5, backoff_factor=1)
adapter = HTTPAdapter(max_retries=retry)
retry_requests.mount('http://', adapter)
retry_requests.mount('https://', adapter)

View file

@ -1,50 +0,0 @@
import argparse
import logging
import pandas as pd
import uvicorn
from mwmbl.tinysearchengine import create_app
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
from mwmbl.tinysearchengine.rank import HeuristicRanker
logging.basicConfig()
def setup_args():
"""Read all the args."""
parser = argparse.ArgumentParser(description="mwmbl-tinysearchengine")
parser.add_argument("--index", help="Path to the tinysearchengine index file", required=True)
parser.add_argument("--terms", help="Path to the tinysearchengine terms CSV file", required=True)
args = parser.parse_args()
return args
def main():
"""Main entrypoint for tinysearchengine.
* Parses CLI args
* Parses and validates config
* Initializes TinyIndex
* Initialize a FastAPI app instance
* Starts uvicorn server using app instance
"""
args = setup_args()
# Load term data
terms = pd.read_csv(args.terms)
completer = Completer(terms)
with TinyIndex(item_factory=Document, index_path=args.index) as tiny_index:
ranker = HeuristicRanker(tiny_index, completer)
# Initialize FastApi instance
app = create_app.create(ranker)
# Initialize uvicorn server using global app instance and server config params
uvicorn.run(app, host="0.0.0.0", port=8080)
if __name__ == "__main__":
main()

View file

@ -1,12 +1,17 @@
from bisect import bisect_left, bisect_right
from datetime import datetime
from pathlib import Path
import pandas as pd
from pandas import DataFrame
TERMS_PATH = Path(__file__).parent.parent / 'resources' / 'mwmbl-crawl-terms.csv'
class Completer:
def __init__(self, terms: DataFrame, num_matches: int = 3):
def __init__(self, num_matches: int = 3):
# Load term data
terms = pd.read_csv(TERMS_PATH)
terms_dict = terms.sort_values('term').set_index('term')['count'].to_dict()
self.terms = list(terms_dict.keys())
self.counts = list(terms_dict.values())
@ -26,12 +31,3 @@ class Completer:
counts, terms = zip(*top_terms)
return list(terms)
if __name__ == '__main__':
data = pd.read_csv('data/mwmbl-crawl-terms.csv')
completer = Completer(data)
start = datetime.now()
completer.complete('fa')
end = datetime.now()
print("Time", end - start)

View file

@ -1,39 +0,0 @@
import re
from logging import getLogger
from operator import itemgetter
from pathlib import Path
from urllib.parse import urlparse
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.hn_top_domains_filtered import DOMAINS
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
from mwmbl.tinysearchengine.rank import HeuristicRanker
logger = getLogger(__name__)
SCORE_THRESHOLD = 0.25
def create(ranker: HeuristicRanker):
app = FastAPI()
# Allow CORS requests from any site
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_headers=["*"],
)
@app.get("/search")
def search(s: str):
return ranker.search(s)
@app.get("/complete")
def complete(q: str):
return ranker.complete(q)
return app

View file

@ -12,7 +12,7 @@ VERSION = 1
METADATA_CONSTANT = b'mwmbl-tiny-search'
METADATA_SIZE = 4096
NUM_PAGES = 128000
NUM_PAGES = 512000
PAGE_SIZE = 4096
@ -106,10 +106,10 @@ class TinyIndex(Generic[T]):
self.index_file.close()
def retrieve(self, key: str) -> List[T]:
index = self._get_key_page_index(key)
index = self.get_key_page_index(key)
return self.get_page(index)
def _get_key_page_index(self, key):
def get_key_page_index(self, key) -> int:
key_hash = mmh3.hash(key, signed=False)
return key_hash % self.num_pages
@ -128,16 +128,19 @@ class TinyIndex(Generic[T]):
def index(self, key: str, value: T):
assert type(value) == self.item_factory, f"Can only index the specified type" \
f" ({self.item_factory.__name__})"
page_index = self._get_key_page_index(key)
page_index = self.get_key_page_index(key)
try:
self.add_to_page(page_index, [value])
except ValueError:
pass
def add_to_page(self, page_index: int, values: list[T]):
current_page = self._get_page_tuples(page_index)
if current_page is None:
current_page = []
value_tuple = astuple(value)
current_page.append(value_tuple)
try:
self._write_page(current_page, page_index)
except ValueError:
pass
value_tuples = [astuple(value) for value in values]
current_page += value_tuples
self._write_page(current_page, page_index)
def _write_page(self, data, i):
"""

View file

@ -2,14 +2,10 @@ import re
from abc import abstractmethod
from logging import getLogger
from operator import itemgetter
from pathlib import Path
from urllib.parse import urlparse
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.hn_top_domains_filtered import DOMAINS
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.tinysearchengine.indexer import TinyIndex, Document
logger = getLogger(__name__)

View file

@ -0,0 +1,24 @@
from logging import getLogger
from fastapi import APIRouter
from mwmbl.tinysearchengine.rank import HeuristicRanker
logger = getLogger(__name__)
SCORE_THRESHOLD = 0.25
def create_router(ranker: HeuristicRanker) -> APIRouter:
router = APIRouter(prefix="/search", tags=["search"])
@router.get("")
def search(s: str):
return ranker.search(s)
@router.get("/complete")
def complete(q: str):
return ranker.complete(q)
return router

220
nginx.conf.sigil Normal file
View file

@ -0,0 +1,220 @@
{{ range $port_map := .PROXY_PORT_MAP | split " " }}
{{ $port_map_list := $port_map | split ":" }}
{{ $scheme := index $port_map_list 0 }}
{{ $listen_port := index $port_map_list 1 }}
{{ $upstream_port := index $port_map_list 2 }}
{{ if eq $scheme "http" }}
server {
listen [{{ $.NGINX_BIND_ADDRESS_IP6 }}]:{{ $listen_port }};
listen {{ if $.NGINX_BIND_ADDRESS_IP4 }}{{ $.NGINX_BIND_ADDRESS_IP4 }}:{{end}}{{ $listen_port }};
{{ if $.NOSSL_SERVER_NAME }}server_name {{ $.NOSSL_SERVER_NAME }}; {{ end }}
access_log {{ $.NGINX_ACCESS_LOG_PATH }}{{ if and ($.NGINX_ACCESS_LOG_FORMAT) (ne $.NGINX_ACCESS_LOG_PATH "off") }} {{ $.NGINX_ACCESS_LOG_FORMAT }}{{ end }};
error_log {{ $.NGINX_ERROR_LOG_PATH }};
{{ if (and (eq $listen_port "80") ($.SSL_INUSE)) }}
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
location / {
return 301 https://$host:{{ $.PROXY_SSL_PORT }}$request_uri;
}
{{ else }}
location / {
gzip on;
gzip_min_length 1100;
gzip_buffers 4 32k;
gzip_types text/css text/javascript text/xml text/plain text/x-component application/javascript application/x-javascript application/json application/xml application/rss+xml font/truetype application/x-font-ttf font/opentype application/vnd.ms-fontobject image/svg+xml;
gzip_vary on;
gzip_comp_level 6;
proxy_pass http://{{ $.APP }}-{{ $upstream_port }};
proxy_http_version 1.1;
proxy_read_timeout {{ $.PROXY_READ_TIMEOUT }};
proxy_buffer_size {{ $.PROXY_BUFFER_SIZE }};
proxy_buffering {{ $.PROXY_BUFFERING }};
proxy_buffers {{ $.PROXY_BUFFERS }};
proxy_busy_buffers_size {{ $.PROXY_BUSY_BUFFERS_SIZE }};
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $http_connection;
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For {{ $.PROXY_X_FORWARDED_FOR }};
proxy_set_header X-Forwarded-Port {{ $.PROXY_X_FORWARDED_PORT }};
proxy_set_header X-Forwarded-Proto {{ $.PROXY_X_FORWARDED_PROTO }};
proxy_set_header X-Request-Start $msec;
{{ if $.PROXY_X_FORWARDED_SSL }}proxy_set_header X-Forwarded-Ssl {{ $.PROXY_X_FORWARDED_SSL }};{{ end }}
}
{{ if $.CLIENT_MAX_BODY_SIZE }}client_max_body_size {{ $.CLIENT_MAX_BODY_SIZE }};{{ end }}
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
error_page 400 401 402 403 405 406 407 408 409 410 411 412 413 414 415 416 417 418 420 422 423 424 426 428 429 431 444 449 450 451 /400-error.html;
location /400-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
error_page 404 /404-error.html;
location /404-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
error_page 500 501 502 503 504 505 506 507 508 509 510 511 /500-error.html;
location /500-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
{{ end }}
}
{{ else if eq $scheme "https"}}
server {
listen [{{ $.NGINX_BIND_ADDRESS_IP6 }}]:{{ $listen_port }} ssl {{ if eq $.HTTP2_SUPPORTED "true" }}http2{{ else if eq $.SPDY_SUPPORTED "true" }}spdy{{ end }};
listen {{ if $.NGINX_BIND_ADDRESS_IP4 }}{{ $.NGINX_BIND_ADDRESS_IP4 }}:{{end}}{{ $listen_port }} ssl {{ if eq $.HTTP2_SUPPORTED "true" }}http2{{ else if eq $.SPDY_SUPPORTED "true" }}spdy{{ end }};
{{ if $.SSL_SERVER_NAME }}server_name {{ $.SSL_SERVER_NAME }}; {{ end }}
{{ if $.NOSSL_SERVER_NAME }}server_name {{ $.NOSSL_SERVER_NAME }}; {{ end }}
access_log {{ $.NGINX_ACCESS_LOG_PATH }}{{ if and ($.NGINX_ACCESS_LOG_FORMAT) (ne $.NGINX_ACCESS_LOG_PATH "off") }} {{ $.NGINX_ACCESS_LOG_FORMAT }}{{ end }};
error_log {{ $.NGINX_ERROR_LOG_PATH }};
ssl_certificate {{ $.APP_SSL_PATH }}/server.crt;
ssl_certificate_key {{ $.APP_SSL_PATH }}/server.key;
ssl_protocols TLSv1.2 {{ if eq $.TLS13_SUPPORTED "true" }}TLSv1.3{{ end }};
ssl_prefer_server_ciphers off;
keepalive_timeout 70;
{{ if and (eq $.SPDY_SUPPORTED "true") (ne $.HTTP2_SUPPORTED "true") }}add_header Alternate-Protocol {{ $.PROXY_SSL_PORT }}:npn-spdy/2;{{ end }}
location / {
## Start CORS here. See http://enable-cors.org/server_nginx.html for comments
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
#
# Custom headers and headers various browsers *should* be OK with but aren't
#
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range';
#
# Tell client that this pre-flight info is valid for 20 days
#
add_header 'Access-Control-Max-Age' 1728000;
add_header 'Content-Type' 'text/plain; charset=utf-8';
add_header 'Content-Length' 0;
return 204;
}
if ($request_method = 'POST') {
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range' always;
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
}
if ($request_method = 'GET') {
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range' always;
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range' always;
}
### End CORS
gzip on;
gzip_min_length 1100;
gzip_buffers 4 32k;
gzip_types text/css text/javascript text/xml text/plain text/x-component application/javascript application/x-javascript application/json application/xml application/rss+xml font/truetype application/x-font-ttf font/opentype application/vnd.ms-fontobject image/svg+xml;
gzip_vary on;
gzip_comp_level 6;
proxy_pass http://{{ $.APP }}-{{ $upstream_port }};
{{ if eq $.HTTP2_PUSH_SUPPORTED "true" }}http2_push_preload on; {{ end }}
proxy_http_version 1.1;
proxy_read_timeout {{ $.PROXY_READ_TIMEOUT }};
proxy_buffer_size {{ $.PROXY_BUFFER_SIZE }};
proxy_buffering {{ $.PROXY_BUFFERING }};
proxy_buffers {{ $.PROXY_BUFFERS }};
proxy_busy_buffers_size {{ $.PROXY_BUSY_BUFFERS_SIZE }};
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $http_connection;
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For {{ $.PROXY_X_FORWARDED_FOR }};
proxy_set_header X-Forwarded-Port {{ $.PROXY_X_FORWARDED_PORT }};
proxy_set_header X-Forwarded-Proto {{ $.PROXY_X_FORWARDED_PROTO }};
proxy_set_header X-Request-Start $msec;
{{ if $.PROXY_X_FORWARDED_SSL }}proxy_set_header X-Forwarded-Ssl {{ $.PROXY_X_FORWARDED_SSL }};{{ end }}
}
{{ if $.CLIENT_MAX_BODY_SIZE }}client_max_body_size {{ $.CLIENT_MAX_BODY_SIZE }};{{ end }}
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
error_page 400 401 402 403 405 406 407 408 409 410 411 412 413 414 415 416 417 418 420 422 423 424 426 428 429 431 444 449 450 451 /400-error.html;
location /400-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
error_page 404 /404-error.html;
location /404-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
error_page 500 501 503 504 505 506 507 508 509 510 511 /500-error.html;
location /500-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
error_page 502 /502-error.html;
location /502-error.html {
root {{ $.DOKKU_LIB_ROOT }}/data/nginx-vhosts/dokku-errors;
internal;
}
}
{{ else if eq $scheme "grpc"}}
{{ if eq $.GRPC_SUPPORTED "true"}}{{ if eq $.HTTP2_SUPPORTED "true"}}
server {
listen [{{ $.NGINX_BIND_ADDRESS_IP6 }}]:{{ $listen_port }} http2;
listen {{ if $.NGINX_BIND_ADDRESS_IP4 }}{{ $.NGINX_BIND_ADDRESS_IP4 }}:{{end}}{{ $listen_port }} http2;
{{ if $.NOSSL_SERVER_NAME }}server_name {{ $.NOSSL_SERVER_NAME }}; {{ end }}
access_log {{ $.NGINX_ACCESS_LOG_PATH }}{{ if and ($.NGINX_ACCESS_LOG_FORMAT) (ne $.NGINX_ACCESS_LOG_PATH "off") }} {{ $.NGINX_ACCESS_LOG_FORMAT }}{{ end }};
error_log {{ $.NGINX_ERROR_LOG_PATH }};
location / {
grpc_pass grpc://{{ $.APP }}-{{ $upstream_port }};
}
{{ if $.CLIENT_MAX_BODY_SIZE }}client_max_body_size {{ $.CLIENT_MAX_BODY_SIZE }};{{ end }}
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
}
{{ end }}{{ end }}
{{ else if eq $scheme "grpcs"}}
{{ if eq $.GRPC_SUPPORTED "true"}}{{ if eq $.HTTP2_SUPPORTED "true"}}
server {
listen [{{ $.NGINX_BIND_ADDRESS_IP6 }}]:{{ $listen_port }} ssl http2;
listen {{ if $.NGINX_BIND_ADDRESS_IP4 }}{{ $.NGINX_BIND_ADDRESS_IP4 }}:{{end}}{{ $listen_port }} ssl http2;
{{ if $.NOSSL_SERVER_NAME }}server_name {{ $.NOSSL_SERVER_NAME }}; {{ end }}
access_log {{ $.NGINX_ACCESS_LOG_PATH }}{{ if and ($.NGINX_ACCESS_LOG_FORMAT) (ne $.NGINX_ACCESS_LOG_PATH "off") }} {{ $.NGINX_ACCESS_LOG_FORMAT }}{{ end }};
error_log {{ $.NGINX_ERROR_LOG_PATH }};
ssl_certificate {{ $.APP_SSL_PATH }}/server.crt;
ssl_certificate_key {{ $.APP_SSL_PATH }}/server.key;
ssl_protocols TLSv1.2 {{ if eq $.TLS13_SUPPORTED "true" }}TLSv1.3{{ end }};
ssl_prefer_server_ciphers off;
location / {
grpc_pass grpc://{{ $.APP }}-{{ $upstream_port }};
}
{{ if $.CLIENT_MAX_BODY_SIZE }}client_max_body_size {{ $.CLIENT_MAX_BODY_SIZE }};{{ end }}
include {{ $.DOKKU_ROOT }}/{{ $.APP }}/nginx.conf.d/*.conf;
}
{{ end }}{{ end }}
{{ end }}
{{ end }}
{{ if $.DOKKU_APP_WEB_LISTENERS }}
{{ range $upstream_port := $.PROXY_UPSTREAM_PORTS | split " " }}
upstream {{ $.APP }}-{{ $upstream_port }} {
{{ range $listeners := $.DOKKU_APP_WEB_LISTENERS | split " " }}
{{ $listener_list := $listeners | split ":" }}
{{ $listener_ip := index $listener_list 0 }}
server {{ $listener_ip }}:{{ $upstream_port }};{{ end }}
}
{{ end }}{{ end }}

636
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -14,11 +14,13 @@ mmh3 = "^3.0.0"
fastapi = "^0.70.1"
uvicorn = "^0.16.0"
pyyaml = "==6.0"
boto3 = "^1.20.37"
requests = "^2.27.1"
psycopg2-binary = "^2.9.3"
spacy = "==3.2.1"
# Optional dependencies do not get installed by default. Look under tool.poetry.extras section
# to see which extras to use.
botocore = {version= "==1.23.20", optional = true}
boto3 = {version= "==1.20.20", optional = true}
ujson = {version= "==4.3.0", optional = true}
warcio = {version= "==1.7.4", optional = true}
idna = {version= "==3.3", optional = true}
@ -30,15 +32,11 @@ pyarrow = {version= "==6.0.0", optional = true}
pyspark = {version= "==3.2.0", optional = true}
Levenshtein = {version= "==0.16.0", optional = true}
# en-core-web-sm requires a compatible version of spacy
spacy = {version= "==3.2.1", optional = true}
en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.2.0/en_core_web_sm-3.2.0.tar.gz", optional = true}
requests = "^2.27.1"
en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.2.0/en_core_web_sm-3.2.0.tar.gz"}
[tool.poetry.extras]
indexer = [
"botocore",
"boto3",
"ujson",
"warcio",
"idna",
@ -50,8 +48,6 @@ indexer = [
"pyspark",
"Levenshtein",
# en-core-web-sm requires a compatible version of spacy
"spacy",
"en-core-web-sm",
]
[tool.poetry.dev-dependencies]
@ -74,4 +70,4 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
mwmbl-tinysearchengine = "mwmbl.tinysearchengine.app:main"
mwmbl-tinysearchengine = "mwmbl.main:run"