156 lines
4.8 KiB
Python
156 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from io import BytesIO
|
|
from threading import Thread
|
|
from uuid import UUID
|
|
|
|
from django.conf import settings
|
|
|
|
from hc.lib.statsd import statsd
|
|
|
|
try:
|
|
from minio import InvalidResponseError, Minio, S3Error
|
|
from minio.deleteobjects import DeleteObject
|
|
from urllib3 import PoolManager
|
|
from urllib3.exceptions import HTTPError, ReadTimeoutError
|
|
from urllib3.util import Retry
|
|
except ImportError:
|
|
# Enforce
|
|
settings.S3_BUCKET = None
|
|
|
|
_client = None
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def client() -> Minio:
|
|
if not settings.S3_BUCKET:
|
|
raise Exception("Object storage is not configured")
|
|
|
|
global _client
|
|
if _client is None:
|
|
assert settings.S3_ENDPOINT
|
|
_client = Minio(
|
|
settings.S3_ENDPOINT,
|
|
settings.S3_ACCESS_KEY,
|
|
settings.S3_SECRET_KEY,
|
|
region=settings.S3_REGION,
|
|
secure=settings.S3_SECURE,
|
|
http_client=PoolManager(
|
|
timeout=settings.S3_TIMEOUT, retries=Retry(total=1)
|
|
),
|
|
)
|
|
|
|
return _client
|
|
|
|
|
|
ASCII_J = ord("j")
|
|
ASCII_Z = ord("z")
|
|
|
|
|
|
def enc(n: int) -> str:
|
|
"""Generate an object key in the "<sorting prefix>-<n>" form.
|
|
|
|
>>> [enc(i) for i in range(0, 5)]
|
|
['zj-0', 'zi-1', 'zh-2', 'zg-3', 'zf-4']
|
|
|
|
The purpose of the sorting prefix is to sort keys with smaller n values
|
|
last:
|
|
|
|
>>> sorted([enc(i) for i in range(0, 5)])
|
|
['zf-4', 'zg-3', 'zh-2', 'zi-1', 'zj-0']
|
|
|
|
This allows efficient lookup of objects with n
|
|
values below a specific threshold. For example, the following
|
|
retrieves all keys at bucket's root directory with n < 123:
|
|
|
|
>>> client.list_objects(bucket_name, start_after=enc(123))
|
|
"""
|
|
s = str(n)
|
|
len_inverted = chr(ASCII_Z - len(s) + 1)
|
|
inverted = "".join(chr(ASCII_J - int(c)) for c in s)
|
|
return len_inverted + inverted + "-" + s
|
|
|
|
|
|
def get_object(code: str, n: int) -> bytes | None:
|
|
if not settings.S3_BUCKET:
|
|
return None
|
|
|
|
with statsd.timer("hc.lib.s3.getObjectTime"):
|
|
key = "%s/%s" % (code, enc(n))
|
|
response = None
|
|
try:
|
|
response = client().get_object(settings.S3_BUCKET, key)
|
|
return response.read()
|
|
except S3Error as e:
|
|
if e.code == "NoSuchKey":
|
|
# It's not an error condition if an object does not exist.
|
|
# Return None, don't log exception, don't increase error counter.
|
|
return None
|
|
|
|
logger.exception("S3Error in hc.lib.s3.get_object")
|
|
statsd.incr("hc.lib.s3.getObjectErrors")
|
|
return None
|
|
except InvalidResponseError:
|
|
logger.exception("InvalidResponseError in hc.lib.s3.get_object")
|
|
statsd.incr("hc.lib.s3.getObjectErrors")
|
|
return None
|
|
except HTTPError:
|
|
logger.exception("HTTPError in hc.lib.s3.get_object")
|
|
statsd.incr("hc.lib.s3.getObjectErrors")
|
|
return None
|
|
finally:
|
|
if response:
|
|
response.close()
|
|
response.release_conn()
|
|
|
|
|
|
def put_object(code: UUID, n: int, data: bytes) -> None:
|
|
assert settings.S3_BUCKET
|
|
key = "%s/%s" % (code, enc(n))
|
|
retries = 10
|
|
while True:
|
|
try:
|
|
client().put_object(settings.S3_BUCKET, key, BytesIO(data), len(data))
|
|
break
|
|
except S3Error as e:
|
|
if e.code == "InternalError" and retries > 0:
|
|
retries -= 1
|
|
print("InternalError, retrying (retries=%d)..." % retries)
|
|
continue
|
|
|
|
raise e
|
|
|
|
|
|
def _remove_objects(code: UUID, upto_n: int) -> None:
|
|
assert settings.S3_BUCKET
|
|
if upto_n <= 0:
|
|
return
|
|
|
|
prefix = "%s/" % code
|
|
start_after = prefix + enc(upto_n + 1)
|
|
q = client().list_objects(settings.S3_BUCKET, prefix, start_after=start_after)
|
|
delete_objs = [DeleteObject(obj.object_name) for obj in q]
|
|
if delete_objs:
|
|
num_objs = len(delete_objs)
|
|
try:
|
|
with statsd.timer("hc.lib.s3.removeObjectsTime"):
|
|
errors = client().remove_objects(settings.S3_BUCKET, delete_objs)
|
|
for e in errors:
|
|
statsd.incr("hc.lib.s3.removeObjectsErrors")
|
|
logger.error("remove_objects error: [%s] %s", e.code, e.message)
|
|
except ReadTimeoutError:
|
|
logger.exception("ReadTimeoutError while removing %d objects", num_objs)
|
|
statsd.incr("hc.lib.s3.removeObjectsErrors")
|
|
|
|
|
|
def remove_objects(check_code: str, upto_n: int, wait: bool = False) -> None:
|
|
"""Remove keys with n values below or equal to `upto_n`.
|
|
|
|
The S3 API calls can take seconds to complete,
|
|
therefore run the removal code on thread.
|
|
"""
|
|
t = Thread(target=_remove_objects, args=(check_code, upto_n))
|
|
t.start()
|
|
if wait:
|
|
t.join()
|