Explorar el Código

Index queued items

Daoud Clarke hace 4 años
padre
commit
974f18647a
Se han modificado 4 ficheros con 77 adiciones y 37 borrados
  1. 8 0
      fsqueue.py
  2. 1 37
      index.py
  3. 38 0
      index_glob.py
  4. 30 0
      index_queue.py

+ 8 - 0
fsqueue.py

@@ -95,3 +95,11 @@ class FSQueue:
         """
 
         self._move(item_id, FSState.LOCKED, FSState.DONE)
+
+    def unlock_all(self):
+        paths = sorted(Path(self._get_dir(FSState.LOCKED)).iterdir(), key=os.path.getmtime)
+
+        for path in paths:
+            # Try and lock the file
+            self._move(path.name, FSState.LOCKED, FSState.READY)
+

+ 1 - 37
index.py

@@ -1,25 +1,18 @@
 """
 Create a search index
 """
-import gzip
 import json
 import os
-import sqlite3
 from dataclasses import dataclass
-from glob import glob
-from itertools import chain, count, islice
+from itertools import islice
 from mmap import mmap, PROT_READ
 from typing import List, Iterator
 from urllib.parse import unquote
 
-import bs4
 import justext
 import mmh3
-from spacy.lang.en import English
 from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError
 
-from paths import CRAWL_GLOB, INDEX_PATH
-
 NUM_PAGES = 8192
 PAGE_SIZE = 512
 
@@ -160,32 +153,6 @@ class TinyIndexer(TinyIndexBase):
         raise NotImplementedError()
 
 
-def run():
-    indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE)
-    indexer.create_if_not_exists()
-    nlp = English()
-    for path in glob(CRAWL_GLOB):
-        print("Path", path)
-        with gzip.open(path, 'rt') as html_file:
-            url = html_file.readline().strip()
-            content = html_file.read()
-
-        if indexer.document_indexed(url):
-            print("Page exists, skipping", url)
-            continue
-
-        cleaned_text = clean(content)
-        try:
-            title = bs4.BeautifulSoup(content, features="lxml").find('title').string
-        except AttributeError:
-            title = cleaned_text[:80]
-        tokens = tokenize(nlp, cleaned_text)
-        print("URL", url)
-        print("Tokens", tokens)
-        print("Title", title)
-        indexer.index(tokens, url, title)
-
-
 def prepare_url_for_tokenizing(url: str):
     if url.startswith(HTTP_START):
         url = url[len(HTTP_START):]
@@ -224,6 +191,3 @@ def index_titles_and_urls(indexer: TinyIndexer, nlp, titles_and_urls):
     for chunk in grouper(BATCH_SIZE, pages):
         indexer.index(list(chunk))
 
-
-if __name__ == '__main__':
-    run()

+ 38 - 0
index_glob.py

@@ -0,0 +1,38 @@
+import gzip
+from glob import glob
+
+import bs4
+from spacy.lang.en import English
+
+from index import TinyIndexer, NUM_PAGES, PAGE_SIZE, clean, tokenize
+from paths import INDEX_PATH, CRAWL_GLOB
+
+
+def run():
+    indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE)
+    indexer.create_if_not_exists()
+    nlp = English()
+    for path in glob(CRAWL_GLOB):
+        print("Path", path)
+        with gzip.open(path, 'rt') as html_file:
+            url = html_file.readline().strip()
+            content = html_file.read()
+
+        if indexer.document_indexed(url):
+            print("Page exists, skipping", url)
+            continue
+
+        cleaned_text = clean(content)
+        try:
+            title = bs4.BeautifulSoup(content, features="lxml").find('title').string
+        except AttributeError:
+            title = cleaned_text[:80]
+        tokens = tokenize(nlp, cleaned_text)
+        print("URL", url)
+        print("Tokens", tokens)
+        print("Title", title)
+        indexer.index(tokens, url, title)
+
+
+if __name__ == '__main__':
+    run()

+ 30 - 0
index_queue.py

@@ -0,0 +1,30 @@
+"""
+Index items in the file-system queue
+"""
+from spacy.lang.en import English
+
+from fsqueue import FSQueue, ZstdJsonSerializer
+from index import TinyIndexer, NUM_PAGES, PAGE_SIZE, index_titles_and_urls
+from paths import DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, INDEX_PATH
+
+
+def get_queue_items():
+    titles_queue = FSQueue(DATA_DIR, DOMAINS_TITLES_QUEUE_NAME, ZstdJsonSerializer())
+    titles_queue.unlock_all()
+    while True:
+        items_id, items = titles_queue.get()
+        for item in items:
+            if item['title'] is None:
+                continue
+            yield item['title'], item['url']
+
+
+def index_queue_items():
+    nlp = English()
+    with TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE) as indexer:
+        titles_and_urls = get_queue_items()
+        index_titles_and_urls(indexer, nlp, titles_and_urls)
+
+
+if __name__ == '__main__':
+    index_queue_items()