Queue new batches for indexing
This commit is contained in:
parent
b8c495bda8
commit
2b52b50569
3 changed files with 15 additions and 14 deletions
|
@ -17,6 +17,8 @@ from mwmbl.crawler.batch import Batch, NewBatchRequest, HashedBatch
|
|||
from mwmbl.crawler.urls import URLDatabase, FoundURL, URLStatus
|
||||
from mwmbl.database import Database
|
||||
from mwmbl.hn_top_domains_filtered import DOMAINS
|
||||
from mwmbl.indexer.indexdb import IndexDatabase
|
||||
from mwmbl.tinysearchengine.indexer import Document
|
||||
|
||||
APPLICATION_KEY = os.environ['MWMBL_APPLICATION_KEY']
|
||||
KEY_ID = os.environ['MWMBL_KEY_ID']
|
||||
|
@ -93,6 +95,7 @@ def create_batch(batch: Batch):
|
|||
upload(data, filename)
|
||||
|
||||
record_urls_in_database(batch, user_id_hash, now)
|
||||
queue_batch(batch)
|
||||
|
||||
global last_batch
|
||||
last_batch = hashed_batch
|
||||
|
@ -271,3 +274,13 @@ def status():
|
|||
return {
|
||||
'status': 'ok'
|
||||
}
|
||||
|
||||
|
||||
def queue_batch(batch: HashedBatch):
|
||||
# TODO: get the score from the URLs database
|
||||
# TODO: also queue documents for batches sent through the API
|
||||
documents = [Document(item.content.title, item.url, item.content.extract, 1)
|
||||
for item in batch.items if item.content is not None]
|
||||
with Database() as db:
|
||||
index_db = IndexDatabase(db.connection)
|
||||
index_db.queue_documents(documents)
|
|
@ -96,7 +96,7 @@ class URLDatabase:
|
|||
urls_to_insert = new_urls | locked_urls
|
||||
|
||||
if len(urls_to_insert) != len(input_urls):
|
||||
print(f"Only got {len(locked_urls)} instead of {len(input_urls)}")
|
||||
print(f"Only got {len(locked_urls)} instead of {len(input_urls)} - {len(new_urls)} new")
|
||||
|
||||
sorted_urls = sorted(found_urls, key=lambda x: x.url)
|
||||
data = [
|
||||
|
|
|
@ -7,15 +7,13 @@ import traceback
|
|||
from multiprocessing.pool import ThreadPool
|
||||
from time import sleep
|
||||
|
||||
import requests
|
||||
from pydantic import ValidationError
|
||||
|
||||
from mwmbl.crawler.app import create_historical_batch
|
||||
from mwmbl.crawler.app import create_historical_batch, queue_batch
|
||||
from mwmbl.crawler.batch import HashedBatch
|
||||
from mwmbl.database import Database
|
||||
from mwmbl.indexer.indexdb import IndexDatabase, BatchStatus
|
||||
from mwmbl.retry import retry_requests
|
||||
from mwmbl.tinysearchengine.indexer import Document
|
||||
|
||||
NUM_THREADS = 5
|
||||
|
||||
|
@ -56,16 +54,6 @@ def retrieve_batch(url):
|
|||
return len(batch.items)
|
||||
|
||||
|
||||
def queue_batch(batch: HashedBatch):
|
||||
# TODO: get the score from the URLs database
|
||||
# TODO: also queue documents for batches sent through the API
|
||||
documents = [Document(item.content.title, item.url, item.content.extract, 1)
|
||||
for item in batch.items if item.content is not None]
|
||||
with Database() as db:
|
||||
index_db = IndexDatabase(db.connection)
|
||||
index_db.queue_documents(documents)
|
||||
|
||||
|
||||
def run():
|
||||
while True:
|
||||
try:
|
||||
|
|
Loading…
Add table
Reference in a new issue