From e1e9e404a3f2c1f41947c61ea7a2ca37435bafa8 Mon Sep 17 00:00:00 2001
From: Daoud Clarke <daoud.clarke@gmail.com>
Date: Thu, 24 Feb 2022 22:01:42 +0000
Subject: [PATCH] Dedupe before indexing

---
 mwmbl/indexer/batch.py   | 10 ++++++++++
 mwmbl/indexer/dedupe.py  | 42 ++++++++++++++++++++++++++++++++++++++++
 mwmbl/indexer/fsqueue.py |  8 ++++----
 mwmbl/indexer/index.py   | 11 +----------
 4 files changed, 57 insertions(+), 14 deletions(-)
 create mode 100644 mwmbl/indexer/batch.py
 create mode 100644 mwmbl/indexer/dedupe.py

diff --git a/mwmbl/indexer/batch.py b/mwmbl/indexer/batch.py
new file mode 100644
index 0000000..86887ac
--- /dev/null
+++ b/mwmbl/indexer/batch.py
@@ -0,0 +1,10 @@
+from itertools import islice
+from typing import Iterator
+
+
+def grouper(n: int, iterator: Iterator):
+    while True:
+        chunk = tuple(islice(iterator, n))
+        if not chunk:
+            return
+        yield chunk
\ No newline at end of file
diff --git a/mwmbl/indexer/dedupe.py b/mwmbl/indexer/dedupe.py
new file mode 100644
index 0000000..5a09f4d
--- /dev/null
+++ b/mwmbl/indexer/dedupe.py
@@ -0,0 +1,42 @@
+"""
+Dedupe pages that have been crawled more than once and prepare them for indexing
+"""
+import glob
+import gzip
+import json
+
+from mwmbl.indexer.batch import grouper
+from mwmbl.indexer.fsqueue import FSQueue, GzipJsonBlobSerializer
+from mwmbl.indexer.paths import CRAWL_GLOB, TINYSEARCH_DATA_DIR
+
+BATCH_SIZE = 100
+
+
+def get_deduped_pages():
+    seen_urls = set()
+    for path in sorted(glob.glob(CRAWL_GLOB), reverse=True):
+        data = json.load(gzip.open(path))
+        for item in data['items']:
+            url = item['url']
+            if url in seen_urls:
+                continue
+
+            seen_urls.add(url)
+            yield item
+
+
+def queue_deduped_items(deduped_pages):
+    output_queue = FSQueue(TINYSEARCH_DATA_DIR, 'mwmbl-search-items', GzipJsonBlobSerializer())
+
+    for batch in grouper(BATCH_SIZE, deduped_pages):
+        data = {'items': batch}
+        output_queue.put(data)
+
+
+def run():
+    deduped_pages = get_deduped_pages()
+    queue_deduped_items(deduped_pages)
+
+
+if __name__ == '__main__':
+    run()
diff --git a/mwmbl/indexer/fsqueue.py b/mwmbl/indexer/fsqueue.py
index f05b2d4..88787d9 100644
--- a/mwmbl/indexer/fsqueue.py
+++ b/mwmbl/indexer/fsqueue.py
@@ -7,7 +7,7 @@ import json
 import os
 from abc import ABC
 from enum import Enum
-from typing import Union
+from typing import Union, Any
 from uuid import uuid4
 from pathlib import Path
 
@@ -59,10 +59,10 @@ class GzipJsonRowSerializer(Serializer):
 
 
 class GzipJsonBlobSerializer(Serializer):
-    def serialize(self, items: list[object]) -> bytes:
-        raise NotImplementedError("Serializer not needed - blob is generated by browser extension")
+    def serialize(self, items: Any) -> bytes:
+        return gzip.compress(json.dumps(items).encode('utf8'))
 
-    def deserialize(self, serialized_items: bytes) -> list[object]:
+    def deserialize(self, serialized_items: bytes) -> Any:
         data = gzip.decompress(serialized_items).decode('utf8')
         return json.loads(data)
 
diff --git a/mwmbl/indexer/index.py b/mwmbl/indexer/index.py
index 5fb6304..c772c55 100644
--- a/mwmbl/indexer/index.py
+++ b/mwmbl/indexer/index.py
@@ -2,8 +2,7 @@
 Create a search index
 """
 from collections import Counter
-from itertools import islice
-from typing import Iterator, Iterable
+from typing import Iterable
 from urllib.parse import unquote
 
 import pandas as pd
@@ -59,14 +58,6 @@ def get_pages(nlp, titles_urls_and_extracts, link_counts) -> Iterable[TokenizedD
             print("Processed", i)
 
 
-def grouper(n: int, iterator: Iterator):
-    while True:
-        chunk = tuple(islice(iterator, n))
-        if not chunk:
-            return
-        yield chunk
-
-
 def index_titles_urls_and_extracts(indexer: TinyIndex, nlp, titles_urls_and_extracts, link_counts, terms_path):
     terms = Counter()
     pages = get_pages(nlp, titles_urls_and_extracts, link_counts)