diff --git a/mwmbl/indexer/batch.py b/mwmbl/indexer/batch.py new file mode 100644 index 0000000..86887ac --- /dev/null +++ b/mwmbl/indexer/batch.py @@ -0,0 +1,10 @@ +from itertools import islice +from typing import Iterator + + +def grouper(n: int, iterator: Iterator): + while True: + chunk = tuple(islice(iterator, n)) + if not chunk: + return + yield chunk \ No newline at end of file diff --git a/mwmbl/indexer/dedupe.py b/mwmbl/indexer/dedupe.py new file mode 100644 index 0000000..5a09f4d --- /dev/null +++ b/mwmbl/indexer/dedupe.py @@ -0,0 +1,42 @@ +""" +Dedupe pages that have been crawled more than once and prepare them for indexing +""" +import glob +import gzip +import json + +from mwmbl.indexer.batch import grouper +from mwmbl.indexer.fsqueue import FSQueue, GzipJsonBlobSerializer +from mwmbl.indexer.paths import CRAWL_GLOB, TINYSEARCH_DATA_DIR + +BATCH_SIZE = 100 + + +def get_deduped_pages(): + seen_urls = set() + for path in sorted(glob.glob(CRAWL_GLOB), reverse=True): + data = json.load(gzip.open(path)) + for item in data['items']: + url = item['url'] + if url in seen_urls: + continue + + seen_urls.add(url) + yield item + + +def queue_deduped_items(deduped_pages): + output_queue = FSQueue(TINYSEARCH_DATA_DIR, 'mwmbl-search-items', GzipJsonBlobSerializer()) + + for batch in grouper(BATCH_SIZE, deduped_pages): + data = {'items': batch} + output_queue.put(data) + + +def run(): + deduped_pages = get_deduped_pages() + queue_deduped_items(deduped_pages) + + +if __name__ == '__main__': + run() diff --git a/mwmbl/indexer/fsqueue.py b/mwmbl/indexer/fsqueue.py index f05b2d4..88787d9 100644 --- a/mwmbl/indexer/fsqueue.py +++ b/mwmbl/indexer/fsqueue.py @@ -7,7 +7,7 @@ import json import os from abc import ABC from enum import Enum -from typing import Union +from typing import Union, Any from uuid import uuid4 from pathlib import Path @@ -59,10 +59,10 @@ class GzipJsonRowSerializer(Serializer): class GzipJsonBlobSerializer(Serializer): - def serialize(self, items: list[object]) -> bytes: - raise NotImplementedError("Serializer not needed - blob is generated by browser extension") + def serialize(self, items: Any) -> bytes: + return gzip.compress(json.dumps(items).encode('utf8')) - def deserialize(self, serialized_items: bytes) -> list[object]: + def deserialize(self, serialized_items: bytes) -> Any: data = gzip.decompress(serialized_items).decode('utf8') return json.loads(data) diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py index 5fb6304..c772c55 100644 --- a/mwmbl/indexer/index.py +++ b/mwmbl/indexer/index.py @@ -2,8 +2,7 @@ Create a search index """ from collections import Counter -from itertools import islice -from typing import Iterator, Iterable +from typing import Iterable from urllib.parse import unquote import pandas as pd @@ -59,14 +58,6 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD print("Processed", i) -def grouper(n: int, iterator: Iterator): - while True: - chunk = tuple(islice(iterator, n)) - if not chunk: - return - yield chunk - - def index_titles_urls_and_extracts(indexer: TinyIndex, nlp, titles_urls_and_extracts, link_counts, terms_path): terms = Counter() pages = get_pages(nlp, titles_urls_and_extracts, link_counts)