Jelajahi Sumber

Dedupe before indexing

Daoud Clarke 3 tahun lalu
induk
melakukan
e1e9e404a3
4 mengubah file dengan 57 tambahan dan 14 penghapusan
  1. 10 0
      mwmbl/indexer/batch.py
  2. 42 0
      mwmbl/indexer/dedupe.py
  3. 4 4
      mwmbl/indexer/fsqueue.py
  4. 1 10
      mwmbl/indexer/index.py

+ 10 - 0
mwmbl/indexer/batch.py

@@ -0,0 +1,10 @@
+from itertools import islice
+from typing import Iterator
+
+
+def grouper(n: int, iterator: Iterator):
+    while True:
+        chunk = tuple(islice(iterator, n))
+        if not chunk:
+            return
+        yield chunk

+ 42 - 0
mwmbl/indexer/dedupe.py

@@ -0,0 +1,42 @@
+"""
+Dedupe pages that have been crawled more than once and prepare them for indexing
+"""
+import glob
+import gzip
+import json
+
+from mwmbl.indexer.batch import grouper
+from mwmbl.indexer.fsqueue import FSQueue, GzipJsonBlobSerializer
+from mwmbl.indexer.paths import CRAWL_GLOB, TINYSEARCH_DATA_DIR
+
+BATCH_SIZE = 100
+
+
+def get_deduped_pages():
+    seen_urls = set()
+    for path in sorted(glob.glob(CRAWL_GLOB), reverse=True):
+        data = json.load(gzip.open(path))
+        for item in data['items']:
+            url = item['url']
+            if url in seen_urls:
+                continue
+
+            seen_urls.add(url)
+            yield item
+
+
+def queue_deduped_items(deduped_pages):
+    output_queue = FSQueue(TINYSEARCH_DATA_DIR, 'mwmbl-search-items', GzipJsonBlobSerializer())
+
+    for batch in grouper(BATCH_SIZE, deduped_pages):
+        data = {'items': batch}
+        output_queue.put(data)
+
+
+def run():
+    deduped_pages = get_deduped_pages()
+    queue_deduped_items(deduped_pages)
+
+
+if __name__ == '__main__':
+    run()

+ 4 - 4
mwmbl/indexer/fsqueue.py

@@ -7,7 +7,7 @@ import json
 import os
 from abc import ABC
 from enum import Enum
-from typing import Union
+from typing import Union, Any
 from uuid import uuid4
 from pathlib import Path
 
@@ -59,10 +59,10 @@ class GzipJsonRowSerializer(Serializer):
 
 
 class GzipJsonBlobSerializer(Serializer):
-    def serialize(self, items: list[object]) -> bytes:
-        raise NotImplementedError("Serializer not needed - blob is generated by browser extension")
+    def serialize(self, items: Any) -> bytes:
+        return gzip.compress(json.dumps(items).encode('utf8'))
 
-    def deserialize(self, serialized_items: bytes) -> list[object]:
+    def deserialize(self, serialized_items: bytes) -> Any:
         data = gzip.decompress(serialized_items).decode('utf8')
         return json.loads(data)
 

+ 1 - 10
mwmbl/indexer/index.py

@@ -2,8 +2,7 @@
 Create a search index
 """
 from collections import Counter
-from itertools import islice
-from typing import Iterator, Iterable
+from typing import Iterable
 from urllib.parse import unquote
 
 import pandas as pd
@@ -59,14 +58,6 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD
             print("Processed", i)
 
 
-def grouper(n: int, iterator: Iterator):
-    while True:
-        chunk = tuple(islice(iterator, n))
-        if not chunk:
-            return
-        yield chunk
-
-
 def index_titles_urls_and_extracts(indexer: TinyIndex, nlp, titles_urls_and_extracts, link_counts, terms_path):
     terms = Counter()
     pages = get_pages(nlp, titles_urls_and_extracts, link_counts)