From 974f18647a7479c5f2535f3aadef67e68ed58918 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Wed, 19 May 2021 21:48:03 +0100 Subject: [PATCH] Index queued items --- fsqueue.py | 8 ++++++++ index.py | 38 +------------------------------------- index_glob.py | 38 ++++++++++++++++++++++++++++++++++++++ index_queue.py | 30 ++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 37 deletions(-) create mode 100644 index_glob.py create mode 100644 index_queue.py diff --git a/fsqueue.py b/fsqueue.py index 2482feb..59e5c05 100644 --- a/fsqueue.py +++ b/fsqueue.py @@ -95,3 +95,11 @@ class FSQueue: """ self._move(item_id, FSState.LOCKED, FSState.DONE) + + def unlock_all(self): + paths = sorted(Path(self._get_dir(FSState.LOCKED)).iterdir(), key=os.path.getmtime) + + for path in paths: + # Try and lock the file + self._move(path.name, FSState.LOCKED, FSState.READY) + diff --git a/index.py b/index.py index 830ec09..32f4ea6 100644 --- a/index.py +++ b/index.py @@ -1,25 +1,18 @@ """ Create a search index """ -import gzip import json import os -import sqlite3 from dataclasses import dataclass -from glob import glob -from itertools import chain, count, islice +from itertools import islice from mmap import mmap, PROT_READ from typing import List, Iterator from urllib.parse import unquote -import bs4 import justext import mmh3 -from spacy.lang.en import English from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError -from paths import CRAWL_GLOB, INDEX_PATH - NUM_PAGES = 8192 PAGE_SIZE = 512 @@ -160,32 +153,6 @@ class TinyIndexer(TinyIndexBase): raise NotImplementedError() -def run(): - indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) - indexer.create_if_not_exists() - nlp = English() - for path in glob(CRAWL_GLOB): - print("Path", path) - with gzip.open(path, 'rt') as html_file: - url = html_file.readline().strip() - content = html_file.read() - - if indexer.document_indexed(url): - print("Page exists, skipping", url) - continue - - cleaned_text = clean(content) - try: - title = bs4.BeautifulSoup(content, features="lxml").find('title').string - except AttributeError: - title = cleaned_text[:80] - tokens = tokenize(nlp, cleaned_text) - print("URL", url) - print("Tokens", tokens) - print("Title", title) - indexer.index(tokens, url, title) - - def prepare_url_for_tokenizing(url: str): if url.startswith(HTTP_START): url = url[len(HTTP_START):] @@ -224,6 +191,3 @@ def index_titles_and_urls(indexer: TinyIndexer, nlp, titles_and_urls): for chunk in grouper(BATCH_SIZE, pages): indexer.index(list(chunk)) - -if __name__ == '__main__': - run() diff --git a/index_glob.py b/index_glob.py new file mode 100644 index 0000000..76a2b0c --- /dev/null +++ b/index_glob.py @@ -0,0 +1,38 @@ +import gzip +from glob import glob + +import bs4 +from spacy.lang.en import English + +from index import TinyIndexer, NUM_PAGES, PAGE_SIZE, clean, tokenize +from paths import INDEX_PATH, CRAWL_GLOB + + +def run(): + indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) + indexer.create_if_not_exists() + nlp = English() + for path in glob(CRAWL_GLOB): + print("Path", path) + with gzip.open(path, 'rt') as html_file: + url = html_file.readline().strip() + content = html_file.read() + + if indexer.document_indexed(url): + print("Page exists, skipping", url) + continue + + cleaned_text = clean(content) + try: + title = bs4.BeautifulSoup(content, features="lxml").find('title').string + except AttributeError: + title = cleaned_text[:80] + tokens = tokenize(nlp, cleaned_text) + print("URL", url) + print("Tokens", tokens) + print("Title", title) + indexer.index(tokens, url, title) + + +if __name__ == '__main__': + run() diff --git a/index_queue.py b/index_queue.py new file mode 100644 index 0000000..2f303e2 --- /dev/null +++ b/index_queue.py @@ -0,0 +1,30 @@ +""" +Index items in the file-system queue +""" +from spacy.lang.en import English + +from fsqueue import FSQueue, ZstdJsonSerializer +from index import TinyIndexer, NUM_PAGES, PAGE_SIZE, index_titles_and_urls +from paths import DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, INDEX_PATH + + +def get_queue_items(): + titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer()) + titles_queue.unlock_all() + while True: + items_id, items = titles_queue.get() + for item in items: + if item['title'] is None: + continue + yield item['title'], item['url'] + + +def index_queue_items(): + nlp = English() + with TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer: + titles_and_urls = get_queue_items() + index_titles_and_urls(indexer, nlp, titles_and_urls) + + +if __name__ == '__main__': + index_queue_items()