浏览代码

Make more robust

Daoud Clarke 3 年之前
父节点
当前提交
1d9b5cb3ca

+ 0 - 54
mwmbl/historical.py

@@ -1,54 +0,0 @@
-from datetime import date, datetime
-
-import spacy
-
-from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_from_id, \
-    create_historical_batch, HashedBatch, get_batch_url
-from mwmbl.database import Database
-from mwmbl.indexdb import BatchInfo, BatchStatus, IndexDatabase
-from mwmbl.indexer.index import tokenize_document
-from mwmbl.indexer.paths import INDEX_PATH
-from mwmbl.tinysearchengine.indexer import TinyIndex, Document
-
-
-def run():
-    date_str = str(date.today())
-    users = get_user_id_hashes_for_date(date_str)
-    print("Users", users)
-    with Database() as db:
-        index_db = IndexDatabase(db.connection)
-        index_db.create_tables()
-        for user in users:
-            batches = get_batches_for_date_and_user(date_str, user)
-            print("Batches", batches)
-            batch_urls = [get_batch_url(batch_id, date_str, user) for batch_id in batches["batch_ids"]]
-            infos = [BatchInfo(url, user, BatchStatus.REMOTE) for url in batch_urls]
-            index_db.record_batches(infos)
-
-
-def index_batches(index_path: str):
-    nlp = spacy.load("en_core_web_sm")
-    with TinyIndex(Document, index_path, 'w') as indexer:
-        for batch_id in batch_ids["batch_ids"]:
-            start = datetime.now()
-            batch_dict = get_batch_from_id(date_str, user, batch_id)
-            get_batch_time = datetime.now()
-            print("Get batch time", get_batch_time - start)
-            batch = HashedBatch.parse_obj(batch_dict['batch'])
-            create_historical_batch(batch)
-            create_historical_time = datetime.now()
-            print("Create historical time", create_historical_time - get_batch_time)
-
-            for item in batch.items:
-                if item.content is None:
-                    continue
-
-                page = tokenize_document(item.url, item.content.title, item.content.extract, 1, nlp)
-                for token in page.tokens:
-                    indexer.index(token, Document(url=page.url, title=page.title, extract=page.extract, score=page.score))
-            tokenize_time = datetime.now()
-            print("Tokenize time", tokenize_time - create_historical_time)
-
-
-if __name__ == '__main__':
-    run()

+ 33 - 0
mwmbl/indexer/historical.py

@@ -0,0 +1,33 @@
+from datetime import date, datetime, timedelta
+
+import spacy
+
+from mwmbl.crawler.app import get_user_id_hashes_for_date, get_batches_for_date_and_user, get_batch_from_id, \
+    create_historical_batch, HashedBatch, get_batch_url
+from mwmbl.database import Database
+from mwmbl.indexer.indexdb import BatchInfo, BatchStatus, IndexDatabase
+from mwmbl.indexer.index import tokenize_document
+from mwmbl.tinysearchengine.indexer import TinyIndex, Document
+
+
+DAYS = 10
+
+
+def run():
+    for day in range(DAYS):
+        date_str = str(date.today() - timedelta(days=day))
+        users = get_user_id_hashes_for_date(date_str)
+        print("Users", users)
+        with Database() as db:
+            index_db = IndexDatabase(db.connection)
+            index_db.create_tables()
+            for user in users:
+                batches = get_batches_for_date_and_user(date_str, user)
+                print("Batches", batches)
+                batch_urls = [get_batch_url(batch_id, date_str, user) for batch_id in batches["batch_ids"]]
+                infos = [BatchInfo(url, user, BatchStatus.REMOTE) for url in batch_urls]
+                index_db.record_batches(infos)
+
+
+if __name__ == '__main__':
+    run()

+ 0 - 0
mwmbl/indexdb.py → mwmbl/indexer/indexdb.py


+ 41 - 0
mwmbl/indexer/preprocess.py

@@ -0,0 +1,41 @@
+"""
+Preprocess local documents for indexing.
+"""
+import traceback
+from time import sleep
+
+import spacy
+
+from mwmbl.database import Database
+from mwmbl.indexer.indexdb import IndexDatabase
+from mwmbl.indexer.index import tokenize_document
+from mwmbl.tinysearchengine.indexer import TinyIndex, Document
+
+
+def run(index_path):
+    while True:
+        try:
+            run_preprocessing(index_path)
+        except Exception as e:
+            print("Exception preprocessing")
+            traceback.print_exception(type(e), e, e.__traceback__)
+            sleep(10)
+
+
+def run_preprocessing(index_path):
+    nlp = spacy.load("en_core_web_sm")
+    with Database() as db:
+        index_db = IndexDatabase(db.connection)
+        documents = index_db.get_documents_for_preprocessing()
+        print(f"Got {len(documents)} documents")
+        if len(documents) == 0:
+            sleep(10)
+        with TinyIndex(Document, index_path, 'w') as indexer:
+            for document in documents:
+                tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
+                page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
+                index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
+
+
+if __name__ == '__main__':
+    run('data/index.tinysearch')

+ 13 - 7
mwmbl/retrieve.py → mwmbl/indexer/retrieve.py

@@ -3,6 +3,7 @@ Retrieve remote batches and store them in Postgres locally
 """
 """
 import gzip
 import gzip
 import json
 import json
+import traceback
 from multiprocessing.pool import ThreadPool
 from multiprocessing.pool import ThreadPool
 from time import sleep
 from time import sleep
 
 
@@ -10,10 +11,11 @@ import requests
 
 
 from mwmbl.crawler.app import HashedBatch
 from mwmbl.crawler.app import HashedBatch
 from mwmbl.database import Database
 from mwmbl.database import Database
-from mwmbl.indexdb import IndexDatabase, BatchStatus
+from mwmbl.indexer.indexdb import IndexDatabase, BatchStatus
+from mwmbl.retry import retry_requests
 from mwmbl.tinysearchengine.indexer import Document
 from mwmbl.tinysearchengine.indexer import Document
 
 
-NUM_THREADS = 10
+NUM_THREADS = 5
 
 
 
 
 def retrieve_batches():
 def retrieve_batches():
@@ -25,7 +27,7 @@ def retrieve_batches():
         index_db = IndexDatabase(db.connection)
         index_db = IndexDatabase(db.connection)
         batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
         batches = index_db.get_batches_by_status(BatchStatus.REMOTE)
         print("Batches", batches)
         print("Batches", batches)
-        urls = [batch.url for batch in batches][:10]
+        urls = [batch.url for batch in batches]
         pool = ThreadPool(NUM_THREADS)
         pool = ThreadPool(NUM_THREADS)
         results = pool.imap_unordered(retrieve_batch, urls)
         results = pool.imap_unordered(retrieve_batch, urls)
         for result in results:
         for result in results:
@@ -37,13 +39,13 @@ def retrieve_batches():
 
 
 
 
 def retrieve_batch(url):
 def retrieve_batch(url):
-    data = json.loads(gzip.decompress(requests.get(url).content))
+    data = json.loads(gzip.decompress(retry_requests.get(url).content))
     batch = HashedBatch.parse_obj(data)
     batch = HashedBatch.parse_obj(data)
-    queue_batch(url, batch)
+    queue_batch(batch)
     return len(batch.items)
     return len(batch.items)
 
 
 
 
-def queue_batch(batch_url: str, batch: HashedBatch):
+def queue_batch(batch: HashedBatch):
     # TODO get the score from the URLs database
     # TODO get the score from the URLs database
     documents = [Document(item.content.title, item.url, item.content.extract, 1)
     documents = [Document(item.content.title, item.url, item.content.extract, 1)
                  for item in batch.items if item.content is not None]
                  for item in batch.items if item.content is not None]
@@ -54,7 +56,11 @@ def queue_batch(batch_url: str, batch: HashedBatch):
 
 
 def run():
 def run():
     while True:
     while True:
-        retrieve_batches()
+        try:
+            retrieve_batches()
+        except Exception as e:
+            print("Exception retrieving batch")
+            traceback.print_exception(type(e), e, e.__traceback__)
         sleep(10)
         sleep(10)
 
 
 
 

+ 16 - 3
mwmbl/update_pages.py → mwmbl/indexer/update_pages.py

@@ -1,12 +1,15 @@
 """
 """
 Iterate over each page in the index and update it based on what is in the index database.
 Iterate over each page in the index and update it based on what is in the index database.
 """
 """
+import traceback
+from time import sleep
+
 from mwmbl.database import Database
 from mwmbl.database import Database
-from mwmbl.indexdb import IndexDatabase
+from mwmbl.indexer.indexdb import IndexDatabase
 from mwmbl.tinysearchengine.indexer import TinyIndex, Document
 from mwmbl.tinysearchengine.indexer import TinyIndex, Document
 
 
 
 
-def run(index_path):
+def run_update(index_path):
     with Database() as db:
     with Database() as db:
         index_db = IndexDatabase(db.connection)
         index_db = IndexDatabase(db.connection)
         index_db.create_tables()
         index_db.create_tables()
@@ -33,5 +36,15 @@ def run(index_path):
                 index_db.clear_queued_documents_for_page(i)
                 index_db.clear_queued_documents_for_page(i)
 
 
 
 
+def run(index_path):
+    while True:
+        try:
+            run_update(index_path)
+        except Exception as e:
+            print("Exception updating pages in index")
+            traceback.print_exception(type(e), e, e.__traceback__)
+            sleep(10)
+
+
 if __name__ == '__main__':
 if __name__ == '__main__':
-    run('data/index.tinysearch')
+    run_update('data/index.tinysearch')

+ 0 - 31
mwmbl/local.py

@@ -1,31 +0,0 @@
-"""
-Preprocess local documents for indexing.
-"""
-from time import sleep
-
-import spacy
-
-from mwmbl.database import Database
-from mwmbl.indexdb import IndexDatabase
-from mwmbl.indexer.index import tokenize_document
-from mwmbl.tinysearchengine.indexer import TinyIndex, Document
-
-
-def run(index_path):
-    nlp = spacy.load("en_core_web_sm")
-    while True:
-        with Database() as db:
-            index_db = IndexDatabase(db.connection)
-            documents = index_db.get_documents_for_preprocessing()
-            print(f"Got {len(documents)} documents")
-            if len(documents) == 0:
-                sleep(10)
-            with TinyIndex(Document, index_path, 'w') as indexer:
-                for document in documents:
-                    tokenized = tokenize_document(document.url, document.title, document.extract, 1, nlp)
-                    page_indexes = [indexer.get_key_page_index(token) for token in tokenized.tokens]
-                    index_db.queue_documents_for_page([(tokenized.url, i) for i in page_indexes])
-
-
-if __name__ == '__main__':
-    run('data/index.tinysearch')

+ 16 - 13
mwmbl/main.py

@@ -1,11 +1,12 @@
 import argparse
 import argparse
 import logging
 import logging
+import os
 from multiprocessing import Process
 from multiprocessing import Process
 
 
 import uvicorn
 import uvicorn
 from fastapi import FastAPI
 from fastapi import FastAPI
 
 
-from mwmbl import historical
+from mwmbl.indexer import historical, retrieve, preprocess
 from mwmbl.crawler.app import router as crawler_router
 from mwmbl.crawler.app import router as crawler_router
 from mwmbl.tinysearchengine import search
 from mwmbl.tinysearchengine import search
 from mwmbl.tinysearchengine.completer import Completer
 from mwmbl.tinysearchengine.completer import Completer
@@ -23,23 +24,25 @@ def setup_args():
 
 
 
 
 def run():
 def run():
-    """Main entrypoint for tinysearchengine.
-
-    * Parses CLI args
-    * Parses and validates config
-    * Initializes TinyIndex
-    * Initialize a FastAPI app instance
-    * Starts uvicorn server using app instance
-    """
     args = setup_args()
     args = setup_args()
 
 
     try:
     try:
+        existing_index = TinyIndex(item_factory=Document, index_path=args.index)
+        if existing_index.page_size != PAGE_SIZE or existing_index.num_pages != NUM_PAGES:
+            print(f"Existing index page sizes ({existing_index.page_size}) and number of pages "
+                  f"({existing_index.num_pages}) does not match - removing.")
+            os.remove(args.index)
+            existing_index = None
+    except FileNotFoundError:
+        existing_index = None
+
+    if existing_index is None:
+        print("Creating a new index")
         TinyIndex.create(item_factory=Document, index_path=args.index, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
         TinyIndex.create(item_factory=Document, index_path=args.index, num_pages=NUM_PAGES, page_size=PAGE_SIZE)
-    except FileExistsError:
-        print("Index already exists")
 
 
-    historical_batch_process = Process(target=historical.run, args=(args.index,))
-    historical_batch_process.start()
+    Process(target=historical.run).start()
+    Process(target=retrieve.run).start()
+    Process(target=preprocess.run, args=(args.index,)).start()
 
 
     completer = Completer()
     completer = Completer()
 
 

+ 17 - 0
mwmbl/retry.py

@@ -0,0 +1,17 @@
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+
+MAX_RETRY = 2
+MAX_RETRY_FOR_SESSION = 2
+BACK_OFF_FACTOR = 0.3
+TIME_BETWEEN_RETRIES = 1000
+ERROR_CODES = (500, 502, 504)
+
+
+retry_requests = requests.Session()
+retry = Retry(total=5, backoff_factor=1)
+adapter = HTTPAdapter(max_retries=retry)
+retry_requests.mount('http://', adapter)
+retry_requests.mount('https://', adapter)

+ 1 - 1
mwmbl/tinysearchengine/indexer.py

@@ -12,7 +12,7 @@ VERSION = 1
 METADATA_CONSTANT = b'mwmbl-tiny-search'
 METADATA_CONSTANT = b'mwmbl-tiny-search'
 METADATA_SIZE = 4096
 METADATA_SIZE = 4096
 
 
-NUM_PAGES = 128000
+NUM_PAGES = 512000
 PAGE_SIZE = 4096
 PAGE_SIZE = 4096