Merge branch 'master' into user-registration

This commit is contained in:
Daoud Clarke 2023-03-18 22:37:45 +00:00
commit 23688bd3ad
28 changed files with 1720 additions and 1106 deletions

57
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,57 @@
name: CI
on:
push:
branches: [main]
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
#----------------------------------------------
# check-out repo and set-up python
#----------------------------------------------
- name: Check out repository
uses: actions/checkout@v3
- name: Set up python
id: setup-python
uses: actions/setup-python@v4
with:
python-version: '3.10'
#----------------------------------------------
# ----- install & configure poetry -----
#----------------------------------------------
- name: Install Poetry
uses: snok/install-poetry@v1.3.3
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
#----------------------------------------------
# load cached venv if cache exists
#----------------------------------------------
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}
#----------------------------------------------
# install dependencies if cache does not exist
#----------------------------------------------
- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root
#----------------------------------------------
# install your root project, if required
#----------------------------------------------
- name: Install project
run: poetry install --no-interaction
#----------------------------------------------
# run test suite
#----------------------------------------------
- name: Run tests
run: |
poetry run pytest

15
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "mwmbl",
"type": "python",
"request": "launch",
"module": "mwmbl.main",
"python": "${workspaceFolder}/.venv/bin/python",
"stopOnEntry": false,
"console": "integratedTerminal",
"justMyCode": true
}
]
}

128
CODE_OF_CONDUCT.md Normal file
View file

@ -0,0 +1,128 @@
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
https://matrix.to/#/#mwmbl:matrix.org.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.

5
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,5 @@
Contributions are very welcome!
Please join the discussion at https://matrix.to/#/#mwmbl:matrix.org and let us know what you're planning to do.
See https://book.mwmbl.org/page/developers/ for a guide to development.

View file

@ -13,6 +13,7 @@ ENV PIP_DEFAULT_TIMEOUT=100 \
PIP_NO_CACHE_DIR=1 \
POETRY_VERSION=1.1.12
# Create a /venv directory & environment.
# This directory will be copied into the final stage of docker build.
RUN python -m venv /venv
@ -25,11 +26,16 @@ COPY mwmbl /app/mwmbl
# Use pip to install the mwmbl python package
# PEP 518, PEP 517 and others have allowed for a standardized python packaging API, which allows
# pip to be able to install poetry packages.
RUN /venv/bin/pip install pip --upgrade && \
/venv/bin/pip install .
# en-core-web-sm requires a compatible version of spacy
RUN /venv/bin/pip install pip wheel --upgrade && \
/venv/bin/pip install . && \
/venv/bin/python -m spacy download en_core_web_sm-3.2.0 --direct && \
/venv/bin/python -m spacy validate
FROM base as final
RUN apt-get update && apt-get install -y postgresql-client
# Copy only the required /venv directory from the builder image that contains mwmbl and its dependencies
COPY --from=builder /venv /venv
@ -41,4 +47,4 @@ VOLUME ["/data"]
EXPOSE 5000
# Using the mwmbl-tinysearchengine binary/entrypoint which comes packaged with mwmbl
CMD ["/venv/bin/mwmbl-tinysearchengine", "--num-pages", "10240000", "--background"]
CMD ["/venv/bin/mwmbl-tinysearchengine", "--num-pages", "10240000", "--background", "--data", "/app/storage"]

View file

@ -8,12 +8,13 @@ Mwmbl is a non-profit, ad-free, free-libre and free-lunch search
engine with a focus on useability and speed. At the moment it is
little more than an idea together with a [proof of concept
implementation](https://mwmbl.org/) of
the web front-end and search technology on a very small index. A
crawler is still to be implemented.
the web front-end and search technology on a small index.
Our vision is a community working to provide top quality search
particularly for hackers, funded purely by donations.
![mwmbl](https://user-images.githubusercontent.com/1283077/218265959-be4220b4-dcf0-47ab-acd3-f06df0883b52.gif)
Crawling
========

26
analyse/update_urls.py Normal file
View file

@ -0,0 +1,26 @@
import os
import pickle
from datetime import datetime
from pathlib import Path
from queue import Queue
from mwmbl.indexer.update_urls import record_urls_in_database
def run_update_urls_on_fixed_batches():
with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "hashed-batches.pickle", "rb") as file:
batches = pickle.load(file)
# print("Batches", batches[:3])
queue = Queue()
start = datetime.now()
record_urls_in_database(batches, queue)
total_time = (datetime.now() - start).total_seconds()
print("Total time:", total_time)
if __name__ == '__main__':
run_update_urls_on_fixed_batches()

35
analyse/url_queue.py Normal file
View file

@ -0,0 +1,35 @@
import logging
import os
import pickle
import sys
from datetime import datetime
from pathlib import Path
from queue import Queue
from mwmbl.url_queue import URLQueue
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT)
def run_url_queue():
data = pickle.load(open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "rb"))
print("First URLs", [x.url for x in data[:1000]])
new_item_queue = Queue()
queued_batches = Queue()
queue = URLQueue(new_item_queue, queued_batches)
new_item_queue.put(data)
start = datetime.now()
queue.update()
total_time = (datetime.now() - start).total_seconds()
print(f"Total time: {total_time}")
if __name__ == '__main__':
run_url_queue()

Binary file not shown.

View file

@ -2,36 +2,34 @@
Script that updates data in a background process.
"""
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from time import sleep
from mwmbl.indexer import index_batches, historical, update_urls
from mwmbl.crawler.urls import URLDatabase
from mwmbl.database import Database
from mwmbl.indexer import index_batches, historical
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import BATCH_DIR_NAME, INDEX_NAME
from mwmbl.url_queue import update_url_queue, initialize_url_queue
logger = getLogger(__name__)
def run(data_path: str, url_queue: Queue):
initialize_url_queue(url_queue)
def run(data_path: str):
logger.info("Started background process")
with Database() as db:
url_db = URLDatabase(db.connection)
url_db.create_tables()
historical.run()
index_path = Path(data_path) / INDEX_NAME
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
update_url_queue(url_queue)
except Exception:
logger.exception("Error updating URL queue")
try:
batch_cache.retrieve_batches(num_batches=10000)
except Exception:
logger.exception("Error retrieving batches")
try:
update_urls.run(batch_cache)
except Exception:
logger.exception("Error updating URLs")
try:
index_batches.run(batch_cache, index_path)
except Exception:

View file

@ -2,8 +2,7 @@ import gzip
import hashlib
import json
from datetime import datetime, timezone, date
from multiprocessing import Queue
from queue import Empty
from queue import Queue, Empty
from typing import Union
from uuid import uuid4
@ -34,6 +33,7 @@ from mwmbl.settings import (
FILE_NAME_SUFFIX,
DATE_REGEX, NUM_EXTRACT_CHARS, NUM_TITLE_CHARS)
from mwmbl.tinysearchengine.indexer import Document
from mwmbl.url_queue import URLQueue
def get_bucket(name):
@ -77,7 +77,7 @@ def justext_with_dom(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT,
return paragraphs, title
def get_router(batch_cache: BatchCache, url_queue: Queue):
def get_router(batch_cache: BatchCache, queued_batches: Queue):
router = APIRouter(prefix="/crawler", tags=["crawler"])
@router.on_event("startup")
@ -100,7 +100,7 @@ def get_router(batch_cache: BatchCache, url_queue: Queue):
return format_result(result, query)
@router.post('/batches/')
def create_batch(batch: Batch):
def post_batch(batch: Batch):
if len(batch.items) > MAX_BATCH_SIZE:
raise HTTPException(400, f"Batch size too large (maximum {MAX_BATCH_SIZE}), got {len(batch.items)}")
@ -154,7 +154,7 @@ def get_router(batch_cache: BatchCache, url_queue: Queue):
def request_new_batch(batch_request: NewBatchRequest) -> list[str]:
user_id_hash = _get_user_id_hash(batch_request)
try:
urls = url_queue.get(block=False)
urls = queued_batches.get(block=False)
except Empty:
return []

View file

@ -7,6 +7,7 @@ class ItemContent(BaseModel):
title: str
extract: str
links: list[str]
extra_links: Optional[list[str]]
class ItemError(BaseModel):

View file

@ -1,18 +1,27 @@
"""
Database storing info on URLs
"""
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from logging import getLogger
from psycopg2.extras import execute_values
from mwmbl.hn_top_domains_filtered import DOMAINS
from mwmbl.settings import CORE_DOMAINS
# Client has one hour to crawl a URL that has been assigned to them, or it will be reassigned
from mwmbl.utils import batch
REASSIGN_MIN_HOURS = 5
BATCH_SIZE = 100
MAX_URLS_PER_TOP_DOMAIN = 100
MAX_TOP_DOMAINS = 500
MAX_OTHER_DOMAINS = 50000
logger = getLogger(__name__)
class URLStatus(Enum):
@ -43,6 +52,8 @@ class URLDatabase:
self.connection = connection
def create_tables(self):
logger.info("Creating URL tables")
sql = """
CREATE TABLE IF NOT EXISTS urls (
url VARCHAR PRIMARY KEY,
@ -55,10 +66,12 @@ class URLDatabase:
with self.connection.cursor() as cursor:
cursor.execute(sql)
# cursor.execute(index_sql)
# cursor.execute(view_sql)
def update_found_urls(self, found_urls: list[FoundURL]):
def update_found_urls(self, found_urls: list[FoundURL]) -> list[FoundURL]:
if len(found_urls) == 0:
return
return []
get_urls_sql = """
SELECT url FROM urls
@ -82,6 +95,7 @@ class URLDatabase:
updated = CASE
WHEN urls.status > excluded.status THEN urls.updated ELSE excluded.updated
END
RETURNING url, user_id_hash, score, status, updated
"""
input_urls = [x.url for x in found_urls]
@ -89,6 +103,7 @@ class URLDatabase:
with self.connection as connection:
with connection.cursor() as cursor:
logger.info(f"Input URLs: {len(input_urls)}")
cursor.execute(get_urls_sql, {'urls': tuple(input_urls)})
existing_urls = {x[0] for x in cursor.fetchall()}
new_urls = set(input_urls) - existing_urls
@ -97,6 +112,7 @@ class URLDatabase:
locked_urls = {x[0] for x in cursor.fetchall()}
urls_to_insert = new_urls | locked_urls
logger.info(f"URLs to insert: {len(urls_to_insert)}")
if len(urls_to_insert) != len(input_urls):
print(f"Only got {len(urls_to_insert)} instead of {len(input_urls)} - {len(new_urls)} new")
@ -106,34 +122,15 @@ class URLDatabase:
(found_url.url, found_url.status.value, found_url.user_id_hash, found_url.score, found_url.timestamp)
for found_url in sorted_urls if found_url.url in urls_to_insert]
execute_values(cursor, insert_sql, data)
logger.info(f"Data: {len(data)}")
results = execute_values(cursor, insert_sql, data, fetch=True)
logger.info(f"Results: {len(results)}")
updated = [FoundURL(*result) for result in results]
return updated
def get_urls_for_crawling(self, num_urls: int):
def get_urls(self, status: URLStatus, num_urls: int) -> list[FoundURL]:
sql = f"""
UPDATE urls SET status = {URLStatus.QUEUED.value}, updated = %(now)s
WHERE url IN (
SELECT url FROM urls
WHERE status IN ({URLStatus.NEW.value}) OR (
status = {URLStatus.ASSIGNED.value} AND updated < %(min_updated_date)s
)
ORDER BY score DESC
LIMIT %(num_urls)s
FOR UPDATE SKIP LOCKED
)
RETURNING url
"""
now = datetime.utcnow()
min_updated_date = now - timedelta(hours=REASSIGN_MIN_HOURS)
with self.connection.cursor() as cursor:
cursor.execute(sql, {'min_updated_date': min_updated_date, 'now': now, 'num_urls': num_urls})
results = cursor.fetchall()
return [result[0] for result in results]
def get_urls(self, status: URLStatus, num_urls: int):
sql = f"""
SELECT url FROM urls
SELECT url, status, user_id_hash, score, updated FROM urls
WHERE status = %(status)s
ORDER BY score DESC
LIMIT %(num_urls)s
@ -143,7 +140,7 @@ class URLDatabase:
cursor.execute(sql, {'status': status.value, 'num_urls': num_urls})
results = cursor.fetchall()
return [result[0] for result in results]
return [FoundURL(url, user_id_hash, score, status, updated) for url, status, user_id_hash, score, updated in results]
def get_url_scores(self, urls: list[str]) -> dict[str, float]:
sql = f"""

View file

@ -103,7 +103,6 @@ DOMAINS = {'blog.samaltman.com': 0.9906157038365982,
'lists.gnu.org': 0.9719999849041815,
'www.ccc.de': 0.9719484596362211,
'googleprojectzero.blogspot.com': 0.9719076672640107,
'plus.google.com': 0.9718907817464862,
'blog.cloudflare.com': 0.9718848285343656,
'jeffhuang.com': 0.9718720207664465,
'duckduckgo.com': 0.9718309347264379,
@ -3379,7 +3378,6 @@ DOMAINS = {'blog.samaltman.com': 0.9906157038365982,
'mathpix.com': 0.8899039505478837,
'www.vulture.com': 0.8899034479557729,
'bair.berkeley.edu': 0.8898667877223271,
'picolisp.com': 0.8898372822592416,
'www.goldsborough.me': 0.8897894354492999,
'arkadiyt.com': 0.8897865060368211,
'flowingdata.com': 0.8897859193800971,

View file

@ -51,7 +51,7 @@ class BatchCache:
with Database() as db:
index_db = IndexDatabase(db.connection)
batches = index_db.get_batches_by_status(BatchStatus.REMOTE, num_batches)
print(f"Found {len(batches)} remote batches")
logger.info(f"Found {len(batches)} remote batches")
if len(batches) == 0:
return
urls = [batch.url for batch in batches]
@ -60,7 +60,7 @@ class BatchCache:
total_processed = 0
for result in results:
total_processed += result
print("Processed batches with items:", total_processed)
logger.info(f"Processed batches with {total_processed} items")
index_db.update_batch_status(urls, BatchStatus.LOCAL)
def retrieve_batch(self, url):
@ -68,7 +68,7 @@ class BatchCache:
try:
batch = HashedBatch.parse_obj(data)
except ValidationError:
print("Failed to validate batch", data)
logger.info(f"Failed to validate batch {data}")
return 0
if len(batch.items) > 0:
self.store(batch, url)
@ -76,7 +76,7 @@ class BatchCache:
def store(self, batch, url):
path = self.get_path_from_url(url)
print(f"Storing local batch at {path}")
logger.debug(f"Storing local batch at {path}")
os.makedirs(path.parent, exist_ok=True)
with open(path, 'wb') as output_file:
data = gzip.compress(batch.json().encode('utf8'))

View file

@ -59,6 +59,9 @@ class IndexDatabase:
return [BatchInfo(url, user_id_hash, status) for url, user_id_hash, status in results]
def update_batch_status(self, batch_urls: list[str], status: BatchStatus):
if not batch_urls:
return
sql = """
UPDATE batches SET status = %(status)s
WHERE url IN %(urls)s

View file

@ -10,7 +10,7 @@ logger = getLogger(__name__)
def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchStatus,
process: Callable[[Collection[HashedBatch]], None]):
process: Callable[[Collection[HashedBatch], ...], None], *args):
with Database() as db:
index_db = IndexDatabase(db.connection)
@ -24,6 +24,10 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta
batch_data = batch_cache.get_cached([batch.url for batch in batches])
logger.info(f"Got {len(batch_data)} cached batches")
process(batch_data.values())
missing_batches = {batch.url for batch in batches} - batch_data.keys()
logger.info(f"Got {len(missing_batches)} missing batches")
index_db.update_batch_status(list(missing_batches), BatchStatus.REMOTE)
process(batch_data.values(), *args)
index_db.update_batch_status(list(batch_data.keys()), end_status)

View file

@ -1,6 +1,11 @@
import os
import pickle
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from logging import getLogger
from multiprocessing import Queue
from pathlib import Path
from time import sleep
from typing import Iterable, Collection
from urllib.parse import urlparse
@ -12,18 +17,29 @@ from mwmbl.indexer import process_batch
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.index_batches import get_url_error_status
from mwmbl.indexer.indexdb import BatchStatus
from mwmbl.indexer.paths import BATCH_DIR_NAME
from mwmbl.settings import UNKNOWN_DOMAIN_MULTIPLIER, EXCLUDED_DOMAINS, SCORE_FOR_SAME_DOMAIN, \
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH
SCORE_FOR_DIFFERENT_DOMAIN, SCORE_FOR_ROOT_PATH, EXTRA_LINK_MULTIPLIER
from mwmbl.utils import get_domain
logger = getLogger(__name__)
def run(batch_cache: BatchCache):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, process=record_urls_in_database)
def update_urls_continuously(data_path: str, new_item_queue: Queue):
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
while True:
try:
run(batch_cache, new_item_queue)
except Exception:
logger.exception("Error updating URLs")
sleep(10)
def record_urls_in_database(batches: Collection[HashedBatch]):
def run(batch_cache: BatchCache, new_item_queue: Queue):
process_batch.run(batch_cache, BatchStatus.LOCAL, BatchStatus.URLS_UPDATED, record_urls_in_database, new_item_queue)
def record_urls_in_database(batches: Collection[HashedBatch], new_item_queue: Queue):
logger.info(f"Recording URLs in database for {len(batches)} batches")
with Database() as db:
url_db = URLDatabase(db.connection)
@ -40,26 +56,44 @@ def record_urls_in_database(batches: Collection[HashedBatch]):
url_statuses[item.url] = get_url_error_status(item)
else:
url_statuses[item.url] = URLStatus.CRAWLED
crawled_page_domain = urlparse(item.url).netloc
try:
crawled_page_domain = get_domain(item.url)
except ValueError:
logger.info(f"Couldn't parse URL {item.url}")
continue
score_multiplier = 1 if crawled_page_domain in DOMAINS else UNKNOWN_DOMAIN_MULTIPLIER
for link in item.content.links:
parsed_link = urlparse(link)
if parsed_link.netloc in EXCLUDED_DOMAINS:
continue
process_link(batch, crawled_page_domain, link, score_multiplier, timestamp, url_scores,
url_timestamps, url_users, False)
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * score_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * score_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
if item.content.extra_links:
for link in item.content.extra_links:
process_link(batch, crawled_page_domain, link, score_multiplier, timestamp, url_scores,
url_timestamps, url_users, True)
found_urls = [FoundURL(url, url_users[url], url_scores[url], url_statuses[url], url_timestamps[url])
for url in url_scores.keys() | url_statuses.keys()]
url_db.update_found_urls(found_urls)
logger.info(f"Found URLs, {len(found_urls)}")
urls = url_db.update_found_urls(found_urls)
new_item_queue.put(urls)
logger.info(f"Put {len(urls)} new items in the URL queue")
def process_link(batch, crawled_page_domain, link, unknown_domain_multiplier, timestamp, url_scores, url_timestamps, url_users, is_extra: bool):
parsed_link = urlparse(link)
if parsed_link.netloc in EXCLUDED_DOMAINS:
return
extra_multiplier = EXTRA_LINK_MULTIPLIER if is_extra else 1.0
score = SCORE_FOR_SAME_DOMAIN if parsed_link.netloc == crawled_page_domain else SCORE_FOR_DIFFERENT_DOMAIN
url_scores[link] += score * unknown_domain_multiplier * extra_multiplier
url_users[link] = batch.user_id_hash
url_timestamps[link] = timestamp
domain = f'{parsed_link.scheme}://{parsed_link.netloc}/'
url_scores[domain] += SCORE_FOR_ROOT_PATH * unknown_domain_multiplier
url_users[domain] = batch.user_id_hash
url_timestamps[domain] = timestamp
def get_datetime_from_timestamp(timestamp: float) -> datetime:

View file

@ -1,6 +1,5 @@
import argparse
import logging
import os
import sys
from multiprocessing import Process, Queue
from pathlib import Path
@ -14,12 +13,15 @@ from mwmbl.crawler import app as crawler
from mwmbl.indexer.batch_cache import BatchCache
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
from mwmbl.platform import user
from mwmbl.indexer.update_urls import update_urls_continuously
from mwmbl.tinysearchengine import search
from mwmbl.tinysearchengine.completer import Completer
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
from mwmbl.tinysearchengine.rank import HeuristicRanker
from mwmbl.url_queue import update_queue_continuously
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
@ -27,8 +29,9 @@ MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
def setup_args():
parser = argparse.ArgumentParser(description="Mwmbl API server and background task processor")
parser.add_argument("--num-pages", help="Number of pages of memory (4096 bytes) to use for the index", default=2560)
parser.add_argument("--num-pages", type=int, help="Number of pages of memory (4096 bytes) to use for the index", default=2560)
parser.add_argument("--data", help="Path to the data folder for storing index and cached batches", default="./devdata")
parser.add_argument("--port", type=int, help="Port for the server to listen at", default=5000)
parser.add_argument("--background", help="Enable running the background tasks to process batches",
action='store_true')
args = parser.parse_args()
@ -42,21 +45,19 @@ def run():
try:
existing_index = TinyIndex(item_factory=Document, index_path=index_path)
if existing_index.page_size != PAGE_SIZE or existing_index.num_pages != args.num_pages:
print(f"Existing index page sizes ({existing_index.page_size}) and number of pages "
f"({existing_index.num_pages}) does not match - removing.")
os.remove(index_path)
existing_index = None
raise ValueError(f"Existing index page sizes ({existing_index.page_size}) or number of pages "
f"({existing_index.num_pages}) do not match")
except FileNotFoundError:
existing_index = None
if existing_index is None:
print("Creating a new index")
TinyIndex.create(item_factory=Document, index_path=index_path, num_pages=args.num_pages, page_size=PAGE_SIZE)
url_queue = Queue()
new_item_queue = Queue()
queued_batches = Queue()
if args.background:
Process(target=background.run, args=(args.data, url_queue)).start()
Process(target=background.run, args=(args.data,)).start()
Process(target=update_queue_continuously, args=(new_item_queue, queued_batches,)).start()
Process(target=update_urls_continuously, args=(args.data, new_item_queue)).start()
completer = Completer()
@ -80,14 +81,14 @@ def run():
app.include_router(search_router)
batch_cache = BatchCache(Path(args.data) / BATCH_DIR_NAME)
crawler_router = crawler.get_router(batch_cache, url_queue)
crawler_router = crawler.get_router(batch_cache, queued_batches)
app.include_router(crawler_router)
user_router = user.create_router()
app.include_router(user_router)
# Initialize uvicorn server using global app instance and server config params
uvicorn.run(app, host="0.0.0.0", port=5000)
uvicorn.run(app, host="0.0.0.0", port=args.port)
if __name__ == "__main__":

View file

@ -29,5 +29,16 @@ NUM_EXTRACT_CHARS = 155
SCORE_FOR_ROOT_PATH = 0.1
SCORE_FOR_DIFFERENT_DOMAIN = 1.0
SCORE_FOR_SAME_DOMAIN = 0.01
EXTRA_LINK_MULTIPLIER = 0.001
UNKNOWN_DOMAIN_MULTIPLIER = 0.001
EXCLUDED_DOMAINS = {'web.archive.org', 'forums.giantitp.com'}
EXCLUDED_DOMAINS = {'web.archive.org', 'forums.giantitp.com', 'www.crutchfield.com', 'plus.google.com'}
CORE_DOMAINS = {
'github.com',
'en.wikipedia.org',
'stackoverflow.com',
'docs.google.com',
'programmers.stackexchange.com',
'developer.mozilla.org',
'arxiv.org',
'www.python.org',
}

View file

@ -10,13 +10,16 @@ TERMS_PATH = Path(__file__).parent.parent / 'resources' / 'mwmbl-crawl-terms.csv
class Completer:
def __init__(self, num_matches: int = 3):
# Load term data
terms = pd.read_csv(TERMS_PATH)
terms = self.get_terms()
terms_dict = terms.sort_values('term').set_index('term')['count'].to_dict()
self.terms = list(terms_dict.keys())
self.counts = list(terms_dict.values())
self.num_matches = num_matches
print("Terms", self.terms[:100], self.counts[:100])
def get_terms(self):
return pd.read_csv(TERMS_PATH)
def complete(self, term) -> list[str]:
term_length = len(term)

View file

@ -122,7 +122,7 @@ class TinyIndex(Generic[T]):
def __enter__(self):
self.index_file = open(self.index_path, 'r+b')
prot = PROT_READ if self.mode == 'r' else PROT_READ | PROT_WRITE
self.mmap = mmap(self.index_file.fileno(), 0, offset=METADATA_SIZE, prot=prot)
self.mmap = mmap(self.index_file.fileno(), 0, prot=prot)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
@ -146,7 +146,7 @@ class TinyIndex(Generic[T]):
return [self.item_factory(*item) for item in results]
def _get_page_tuples(self, i):
page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
page_data = self.mmap[i * self.page_size + METADATA_SIZE:(i + 1) * self.page_size + METADATA_SIZE]
try:
decompressed_data = self.decompressor.decompress(page_data)
except ZstdError:
@ -186,7 +186,7 @@ class TinyIndex(Generic[T]):
page_data = _get_page_data(self.compressor, self.page_size, data)
logger.debug(f"Got page data of length {len(page_data)}")
self.mmap[i * self.page_size:(i+1) * self.page_size] = page_data
self.mmap[i * self.page_size + METADATA_SIZE:(i+1) * self.page_size + METADATA_SIZE] = page_data
@staticmethod
def create(item_factory: Callable[..., T], index_path: str, num_pages: int, page_size: int):

View file

@ -1,39 +1,153 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from logging import getLogger
from multiprocessing import Queue
from queue import Empty
from random import Random
from typing import KeysView, Union
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus
from mwmbl.crawler.urls import BATCH_SIZE, URLDatabase, URLStatus, FoundURL, REASSIGN_MIN_HOURS
from mwmbl.database import Database
from mwmbl.utils import batch
from mwmbl.hn_top_domains_filtered import DOMAINS as TOP_DOMAINS
from mwmbl.settings import CORE_DOMAINS
from mwmbl.utils import batch, get_domain
logger = getLogger(__name__)
MAX_QUEUE_SIZE = 5000
def update_url_queue(url_queue: Queue):
current_size = url_queue.qsize()
if current_size >= MAX_QUEUE_SIZE:
logger.info(f"Skipping queue update, current size {current_size}")
return
num_urls_to_fetch = (MAX_QUEUE_SIZE - current_size) * BATCH_SIZE
with Database() as db:
url_db = URLDatabase(db.connection)
urls = url_db.get_urls_for_crawling(num_urls_to_fetch)
queue_batches(url_queue, urls)
logger.info(f"Queued {len(urls)} urls, current queue size: {url_queue.qsize()}")
MAX_TOP_URLS = 100000
MAX_OTHER_URLS = 1000
MAX_URLS_PER_CORE_DOMAIN = 1000
MAX_URLS_PER_TOP_DOMAIN = 100
MAX_URLS_PER_OTHER_DOMAIN = 5
MAX_OTHER_DOMAINS = 10000
INITIALIZE_URLS = 10000
def initialize_url_queue(url_queue: Queue):
with Database() as db:
url_db = URLDatabase(db.connection)
urls = url_db.get_urls(URLStatus.QUEUED, MAX_QUEUE_SIZE * BATCH_SIZE)
queue_batches(url_queue, urls)
logger.info(f"Initialized URL queue with {len(urls)} urls, current queue size: {url_queue.qsize()}")
random = Random(1)
class URLQueue:
def __init__(self, new_item_queue: Queue, queued_batches: Queue, min_top_domains: int = 5):
"""
new_item_queue: each item in the queue is a list of FoundURLs
queued_batches: each item in the queue is a list of URLs (strings)
"""
self._new_item_queue = new_item_queue
self._queued_batches = queued_batches
self._other_urls = defaultdict(dict)
self._top_urls = defaultdict(dict)
self._min_top_domains = min_top_domains
assert min_top_domains > 0, "Need a minimum greater than 0 to prevent a never-ending loop"
def initialize(self):
logger.info(f"Initializing URL queue")
with Database() as db:
url_db = URLDatabase(db.connection)
found_urls = url_db.get_urls(URLStatus.NEW, INITIALIZE_URLS)
self._process_found_urls(found_urls)
logger.info(f"Initialized URL queue with {len(found_urls)} urls, current queue size: {self.num_queued_batches}")
def update(self):
num_processed = 0
while True:
try:
new_batch = self._new_item_queue.get_nowait()
num_processed += 1
except Empty:
break
self._process_found_urls(new_batch)
return num_processed
def _process_found_urls(self, found_urls: list[FoundURL]):
min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
logger.info(f"Found URLS: {len(found_urls)}")
valid_urls = [found_url for found_url in found_urls if found_url.status == URLStatus.NEW.value or (
found_url.status == URLStatus.ASSIGNED.value and found_url.timestamp < min_updated_date)]
logger.info(f"Valid URLs: {len(valid_urls)}")
self._sort_urls(valid_urls)
logger.info(f"Queue size: {self.num_queued_batches}")
while self.num_queued_batches < MAX_QUEUE_SIZE and len(self._top_urls) >= self._min_top_domains:
total_top_urls = sum(len(urls) for urls in self._top_urls.values())
logger.info(f"Total top URLs stored: {total_top_urls}")
total_other_urls = sum(len(urls) for urls in self._other_urls.values())
logger.info(f"Total other URLs stored: {total_other_urls}")
self._batch_urls()
logger.info(f"Queue size after batching: {self.num_queued_batches}")
def _sort_urls(self, valid_urls: list[FoundURL]):
for found_url in valid_urls:
try:
domain = get_domain(found_url.url)
except ValueError:
continue
url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
url_store[domain][found_url.url] = found_url.score
logger.info(f"URL store updated: {len(self._top_urls)} top domains, {len(self._other_urls)} other domains")
_sort_and_limit_urls(self._top_urls, MAX_TOP_URLS)
_sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS)
# Keep only the top "other" domains, ranked by the top item for that domain
top_other_urls = sorted(self._other_urls.items(), key=lambda x: next(iter(x[1].values())), reverse=True)[:MAX_OTHER_DOMAINS]
self._other_urls = defaultdict(dict, dict(top_other_urls))
def _batch_urls(self):
urls = []
logger.info("Adding core domains")
_add_urls(CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_CORE_DOMAIN)
logger.info("Adding top domains")
_add_urls(TOP_DOMAINS.keys() - CORE_DOMAINS, self._top_urls, urls, MAX_URLS_PER_TOP_DOMAIN)
logger.info("Adding other domains")
_add_urls(self._other_urls.keys(), self._other_urls, urls, MAX_URLS_PER_OTHER_DOMAIN)
self._queue_urls(urls)
def _queue_urls(self, valid_urls: list[str]):
random.shuffle(valid_urls)
for url_batch in batch(valid_urls, BATCH_SIZE):
self._queued_batches.put(url_batch, block=False)
@property
def num_queued_batches(self) -> int:
return self._queued_batches.qsize()
@property
def num_top_domains(self) -> int:
return len(self._top_urls)
def _sort_and_limit_urls(domain_urls: dict[str, dict[str, float]], max_urls: int):
for domain, urls in domain_urls.items():
domain_urls[domain] = dict(sorted(urls.items(), key=lambda url_score: url_score[1], reverse=True)[:max_urls])
def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, dict[str, float]], urls: list[str], max_urls: int):
for domain in list(domains & domain_urls.keys()):
urls += list(domain_urls[domain].keys())[:max_urls]
new_domain_urls = list(domain_urls[domain].items())[max_urls:]
if len(new_domain_urls) > 0:
domain_urls[domain] = dict(new_domain_urls)
else:
del domain_urls[domain]
def update_queue_continuously(new_item_queue: Queue, queued_batches: Queue):
queue = URLQueue(new_item_queue, queued_batches)
queue.initialize()
while True:
num_processed = queue.update()
logger.info(f"Queue update, num processed: {num_processed}, queue size: {queue.num_queued_batches}, num top "
f"domains: {queue.num_top_domains}")
if num_processed == 0:
time.sleep(5)
def queue_batches(url_queue, urls):
for url_batch in batch(urls, BATCH_SIZE):
url_queue.put(url_batch, block=False)

View file

@ -1,3 +1,8 @@
import re
DOMAIN_REGEX = re.compile(r".*://([^/]*)")
def batch(items: list, batch_size):
"""
Adapted from https://stackoverflow.com/a/8290508
@ -5,3 +10,10 @@ def batch(items: list, batch_size):
length = len(items)
for ndx in range(0, length, batch_size):
yield items[ndx:min(ndx + batch_size, length)]
def get_domain(url):
results = DOMAIN_REGEX.match(url)
if results is None or len(results.groups()) == 0:
raise ValueError(f"Unable to parse domain from URL {url}")
return results.group(1)

2008
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,8 @@ boto3 = "^1.20.37"
requests = "^2.27.1"
psycopg2-binary = "^2.9.3"
spacy = "==3.2.1"
jusText = "==3.0.0"
pytest = "^7.2.1"
pytest-mock = "^3.10.0"
# Optional dependencies do not get installed by default. Look under tool.poetry.extras section
# to see which extras to use.
@ -27,13 +28,11 @@ warcio = {version= "==1.7.4", optional = true}
idna = {version= "==3.3", optional = true}
beautifulsoup4 = {version= "==4.10.0", optional = true}
lxml = {version= "==4.6.4", optional = true}
jusText = {version= "==3.0.0", optional = true}
langdetect = {version= "==1.0.9", optional = true}
pyarrow = {version= "==6.0.0", optional = true}
pyspark = {version= "==3.2.0", optional = true}
Levenshtein = {version= "==0.16.0", optional = true}
# en-core-web-sm requires a compatible version of spacy
en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.2.0/en_core_web_sm-3.2.0.tar.gz"}
[tool.poetry.extras]
indexer = [
@ -47,7 +46,6 @@ indexer = [
"pyarrow",
"pyspark",
"Levenshtein",
# en-core-web-sm requires a compatible version of spacy
]
[tool.poetry.dev-dependencies]

78
test/test_completer.py Normal file
View file

@ -0,0 +1,78 @@
import mwmbl.tinysearchengine.completer
import pytest
import pandas as pd
def mockCompleterData(mocker, data):
testDataFrame = pd.DataFrame(data, columns=['','term','count'])
mocker.patch('mwmbl.tinysearchengine.completer.Completer.get_terms',
return_value = testDataFrame)
def test_correctCompletions(mocker):
# Mock completer with custom data
testdata = [
[0, 'build', 4],
[1, 'builder', 3],
[2, 'announce', 2],
[3, 'buildings', 1]]
mockCompleterData(mocker, testdata)
completer = mwmbl.tinysearchengine.completer.Completer()
completion = completer.complete('build')
assert ['build', 'builder', 'buildings'] == completion
def test_correctSortOrder(mocker):
# Mock completer with custom data
testdata = [
[0, 'build', 4],
[1, 'builder', 1],
[2, 'announce', 2],
[3, 'buildings', 3]]
mockCompleterData(mocker, testdata)
completer = mwmbl.tinysearchengine.completer.Completer()
completion = completer.complete('build')
assert ['build', 'buildings', 'builder'] == completion
def test_noCompletions(mocker):
# Mock completer with custom data
testdata = [
[0, 'build', 4],
[1, 'builder', 3],
[2, 'announce', 2],
[3, 'buildings', 1]]
mockCompleterData(mocker, testdata)
completer = mwmbl.tinysearchengine.completer.Completer()
completion = completer.complete('test')
assert [] == completion
def test_singleCompletions(mocker):
# Mock completer with custom data
testdata = [
[0, 'build', 4],
[1, 'builder', 3],
[2, 'announce', 2],
[3, 'buildings', 1]]
mockCompleterData(mocker, testdata)
completer = mwmbl.tinysearchengine.completer.Completer()
completion = completer.complete('announce')
assert ['announce'] == completion
def test_idempotencyWithSameScoreCompletions(mocker):
# Mock completer with custom data
testdata = [
[0, 'build', 1],
[1, 'builder', 1],
[2, 'announce', 1],
[3, 'buildings', 1]]
mockCompleterData(mocker, testdata)
completer = mwmbl.tinysearchengine.completer.Completer()
for i in range(3):
print(f"iteration: {i}")
completion = completer.complete('build')
# Results expected in reverse order
expected = ['buildings','builder','build']
assert expected == completion

37
test/test_url_queue.py Normal file
View file

@ -0,0 +1,37 @@
from datetime import datetime
from queue import Queue
from mwmbl.crawler.urls import FoundURL, URLStatus
from mwmbl.url_queue import URLQueue
def test_url_queue_empties():
new_item_queue = Queue()
queued_batches = Queue()
url_queue = URLQueue(new_item_queue, queued_batches, min_top_domains=1)
new_item_queue.put([FoundURL("https://google.com", "123", 10.0, URLStatus.NEW.value, datetime(2023, 1, 19))])
url_queue.update()
items = queued_batches.get(block=False)
assert items == ["https://google.com"]
def test_url_queue_multiple_puts():
new_item_queue = Queue()
queued_batches = Queue()
url_queue = URLQueue(new_item_queue, queued_batches, min_top_domains=1)
new_item_queue.put([FoundURL("https://google.com", "123", 10.0, URLStatus.NEW.value, datetime(2023, 1, 19))])
url_queue.update()
new_item_queue.put([FoundURL("https://www.supermemo.com", "124", 10.0, URLStatus.NEW.value, datetime(2023, 1, 20))])
url_queue.update()
items = queued_batches.get(block=False)
assert items == ["https://google.com"]
items_2 = queued_batches.get(block=False)
assert items_2 == ["https://www.supermemo.com"]