More threads for retrieving batches
This commit is contained in:
parent
e79f1ce10b
commit
3137068c77
3 changed files with 8 additions and 7 deletions
|
@ -18,7 +18,7 @@ def run(data_path: str):
|
|||
batch_cache = BatchCache(Path(data_path) / BATCH_DIR_NAME)
|
||||
while True:
|
||||
try:
|
||||
batch_cache.retrieve_batches(1000)
|
||||
batch_cache.retrieve_batches(num_batches=10000)
|
||||
except Exception:
|
||||
logger.exception("Error retrieving batches")
|
||||
try:
|
||||
|
|
|
@ -26,7 +26,7 @@ from mwmbl.settings import (
|
|||
PUBLIC_USER_ID_LENGTH,
|
||||
FILE_NAME_SUFFIX,
|
||||
DATE_REGEX)
|
||||
from mwmbl.tinysearchengine.indexer import Document
|
||||
|
||||
|
||||
def get_bucket(name):
|
||||
s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=KEY_ID,
|
||||
|
|
|
@ -20,7 +20,7 @@ from mwmbl.retry import retry_requests
|
|||
|
||||
|
||||
class BatchCache:
|
||||
num_threads = 8
|
||||
num_threads = 20
|
||||
|
||||
def __init__(self, repo_path):
|
||||
os.makedirs(repo_path, exist_ok=True)
|
||||
|
@ -35,7 +35,7 @@ class BatchCache:
|
|||
batches[url] = batch
|
||||
return batches
|
||||
|
||||
def retrieve_batches(self, num_batches=1000):
|
||||
def retrieve_batches(self, num_batches):
|
||||
with Database() as db:
|
||||
index_db = IndexDatabase(db.connection)
|
||||
index_db.create_tables()
|
||||
|
@ -49,9 +49,10 @@ class BatchCache:
|
|||
urls = [batch.url for batch in batches]
|
||||
pool = ThreadPool(self.num_threads)
|
||||
results = pool.imap_unordered(self.retrieve_batch, urls)
|
||||
total_processed = 0
|
||||
for result in results:
|
||||
if result > 0:
|
||||
print("Processed batch with items:", result)
|
||||
total_processed += result
|
||||
print("Processed batches with items:", total_processed)
|
||||
index_db.update_batch_status(urls, BatchStatus.LOCAL)
|
||||
|
||||
def retrieve_batch(self, url):
|
||||
|
@ -67,7 +68,7 @@ class BatchCache:
|
|||
|
||||
def store(self, batch, url):
|
||||
path = self.get_path_from_url(url)
|
||||
print("Path", path)
|
||||
print(f"Storing local batch at {path}")
|
||||
os.makedirs(path.parent, exist_ok=True)
|
||||
with open(path, 'wb') as output_file:
|
||||
data = gzip.compress(batch.json().encode('utf8'))
|
||||
|
|
Loading…
Reference in a new issue