we have dynamic batching at home

This commit is contained in:
mertalev 2023-07-13 14:15:55 -04:00
parent 4b15607fd7
commit ab4cb43ef7
No known key found for this signature in database
GPG key ID: 9181CD92C0A1C5E3
9 changed files with 340 additions and 157 deletions

View file

@ -14,7 +14,6 @@ from PIL import Image
from app.models.cache import ModelCache
from .config import settings
from .models.base import InferenceModel
from .schemas import (
EmbeddingResponse,
FaceResponse,
@ -69,7 +68,7 @@ async def image_classification(
image: Image.Image = Depends(dep_pil_image),
) -> list[str]:
model = await app.state.model_cache.get(settings.classification_model, ModelType.IMAGE_CLASSIFICATION)
labels = await run_in_thread(model, image)
labels = await model.predict(model, image)
return labels
@ -82,7 +81,7 @@ async def clip_encode_image(
image: Image.Image = Depends(dep_pil_image),
) -> list[float]:
model = await app.state.model_cache.get(settings.clip_image_model, ModelType.CLIP)
embedding = await run_in_thread(model, image)
embedding = await model.predict(image)
return embedding
@ -93,7 +92,8 @@ async def clip_encode_image(
)
async def clip_encode_text(payload: TextModelRequest) -> list[float]:
model = await app.state.model_cache.get(settings.clip_text_model, ModelType.CLIP)
embedding = await run_in_thread(model, payload.text)
embedding = await model.predict(payload.text)
print(embedding)
return embedding
@ -106,14 +106,14 @@ async def facial_recognition(
image: cv2.Mat = Depends(dep_cv_image),
) -> list[dict[str, Any]]:
model = await app.state.model_cache.get(settings.facial_recognition_model, ModelType.FACIAL_RECOGNITION)
faces = await run_in_thread(model, image)
faces = await model.predict(model, image)
return faces
async def run_in_thread(model: InferenceModel, inputs) -> Any:
outputs = await asyncio.get_running_loop().run_in_executor(app.state.thread_pool, lambda: model.predict(inputs))
app.state.last_called = time.time()
return outputs
# async def run_in_thread(model: InferenceModel, inputs) -> Any:
# outputs = await asyncio.get_running_loop().run_in_executor(app.state.thread_pool, lambda: model.predict(inputs))
# app.state.last_called = time.time()
# return outputs
async def schedule_idle_shutdown() -> None:

View file

@ -1,15 +1,119 @@
from __future__ import annotations
import asyncio
import inspect
import logging
import time
from abc import ABC, abstractmethod
from functools import wraps
from pathlib import Path
from shutil import rmtree
from typing import Any
from typing import Any, Awaitable, Callable, TypeVar
from concurrent.futures import ThreadPoolExecutor
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf # type: ignore
from ..config import get_cache_dir
from ..schemas import ModelType
F = TypeVar("F")
P = TypeVar("P")
R = TypeVar("R")
# start = time.monotonic()
# async with lock:
# batch: list[R] = []
# batch_ids: list[int] = []
# while len(batch) < max_size and time.monotonic() - start < timeout_s:
# try:
# cur = queue.get_nowait()
# batch_ids.append(cur)
# batch.append(processing.pop(cur))
# except asyncio.QueueEmpty:
# await asyncio.sleep(0)
# if len(args) == 1:
# output = await func(batch)
# else:
# output = await func(args[0], batch)
# batch = []
# for i, id in enumerate(batch_ids):
# processed[id] = output[i]
def batched(
max_size: int = 16, timeout_s: float = 0.005
) -> Callable[[Callable[..., Awaitable[list[R]]]], Callable[..., Awaitable[R]]]:
"""
Batches async calls into lists until the list reaches length
`max_size` or `timeout_s` seconds pass, whichever comes first.
Calls should pass an element as their only argument.
Callables should take a list as their only argument and return a list of the same length.
Inspired by Ray's @serve.batch decorator.
"""
# -> _Wrapped[Callable[[P], Awaitable[R]], Callable[[list[P]], Awaitable[list[R]]]]:
# -> Callable[[Callable[[P], Awaitable[R]]], Callable[[list[P]], Awaitable[list[R]]]]
def decorator_factory(func: Callable[..., Awaitable[list[R]]]) -> Callable[..., Awaitable[R]]:
func_args = inspect.getfullargspec(func).args
is_method = func_args[0] == "self"
if is_method and len(func_args) != 2:
print(func_args)
raise TypeError("Methods must take exactly two arguments (including `self`).")
elif not is_method and len(func_args) != 1:
print(func_args)
raise TypeError(f"Functions must take exactly one argument. Got {func_args}")
del func_args
# thread_pool = ThreadPoolExecutor(max_workers=4)
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=max_size)
lock = asyncio.Lock()
element_id = 0
processing: dict[int, Any] = {}
processed = {}
async def process(self: Any = None) -> None:
start = time.monotonic()
batch: list[Any] = []
batch_ids: list[int] = []
while len(batch) < max_size and time.monotonic() - start < timeout_s:
try:
cur = queue.get_nowait()
batch_ids.append(cur)
batch.append(processing.pop(cur))
except asyncio.QueueEmpty:
await asyncio.sleep(0)
logging.getLogger('uvicorn.access').info(f'Batch size: {len(batch)}')
if self:
outputs = await func(self, batch)
# outputs = await asyncio.get_running_loop().run_in_executor(thread_pool, lambda: func(self, batch))
else:
outputs = await func(batch)
for i, id in enumerate(batch_ids):
processed[id] = outputs[i]
@wraps(func)
async def decorator(*args: Any) -> R:
nonlocal element_id
cur_idx = element_id
processing[cur_idx] = args[-1]
element_id += 1
await queue.put(cur_idx)
while cur_idx not in processed:
async with lock:
if is_method:
await process(args[0])
else:
await process()
return processed.pop(cur_idx)
return decorator
return decorator_factory
class InferenceModel(ABC):
_model_type: ModelType
@ -29,7 +133,7 @@ class InferenceModel(ABC):
...
@abstractmethod
def predict(self, inputs: Any) -> Any:
def _predict_batch(self, inputs: list[Any]) -> list[Any]:
...
@property

View file

@ -1,6 +1,3 @@
import asyncio
import time
from functools import wraps
from typing import Any
from aiocache.backends.memory import SimpleMemoryCache
@ -98,97 +95,3 @@ class RevalidationPlugin(BasePlugin):
key = client.build_key(key, namespace)
if val is not None and key in client._handlers:
await client.expire(key, client.ttl)
def batched(max_size: int = 16, timeout_s: float = 0.01):
"""
Batches async calls into lists until the list reaches length `max_size` or `timeout_s` seconds pass, whichever comes first.
Calls should pass an element as their only argument.
Callables should take a list as their only argument and return a list of the same length.
Inspired by Ray's @serve.batch decorator.
"""
def decorator_factory(func):
queue = asyncio.Queue(maxsize=max_size)
lock = asyncio.Lock()
output = None
element_id = 0
processing = {}
processed = {}
batch = []
@wraps(func)
async def decorator(element):
nonlocal element_id
nonlocal batch
nonlocal output
cur_idx = element_id
processing[cur_idx] = element
element_id += 1
await queue.put(cur_idx)
while cur_idx not in processed:
start = time.monotonic()
async with lock:
batch_ids = []
while len(batch) < max_size and time.monotonic() - start < timeout_s:
try:
cur = queue.get_nowait()
batch_ids.append(cur)
batch.append(processing.pop(cur))
except asyncio.QueueEmpty:
await asyncio.sleep(0)
output = await func(batch)
batch = []
for i, id in enumerate(batch_ids):
processed[id] = output[i]
return processed.pop(cur_idx)
return decorator
return decorator_factory
def batched_method(max_size: int = 16, timeout_s: float = 0.001):
"""
Batches async calls into lists until the list reaches length `max_size` or `timeout_s` seconds pass, whichever comes first.
Calls should pass an element as their only argument.
Callables should take a list as their only argument and return a list of the same length.
Inspired by Ray's @serve.batch decorator.
"""
def decorator_factory(func):
queue = asyncio.Queue(maxsize=max_size)
lock = asyncio.Lock()
output = None
element_id = 0
processing = {}
processed = {}
batch = []
@wraps(func)
async def decorator(self, element):
nonlocal element_id
nonlocal batch
nonlocal output
cur_idx = element_id
processing[cur_idx] = element
element_id += 1
await queue.put(cur_idx)
while cur_idx not in processed:
start = time.monotonic()
async with lock:
batch_ids = []
while len(batch) < max_size and time.monotonic() - start < timeout_s:
try:
cur = queue.get_nowait()
batch_ids.append(cur)
batch.append(processing.pop(cur))
except asyncio.QueueEmpty:
await asyncio.sleep(0)
output = await func(self, batch)
batch = []
for i, id in enumerate(batch_ids):
processed[id] = output[i]
return processed.pop(cur_idx)
return decorator
return decorator_factory

View file

@ -1,11 +1,10 @@
from pathlib import Path
from typing import Any
from PIL.Image import Image
from sentence_transformers import SentenceTransformer
from ..schemas import ModelType
from .base import InferenceModel
from .base import InferenceModel, batched
class CLIPSTEncoder(InferenceModel):
@ -18,5 +17,12 @@ class CLIPSTEncoder(InferenceModel):
**model_kwargs,
)
def predict(self, image_or_text: Image | str) -> list[float]:
return self.model.encode(image_or_text).tolist()
@batched()
async def predict(self, inputs: list[Any]) -> list[Any]:
return self._predict_batch(inputs)
def _predict_batch(
self,
images_or_texts: list[Image] | list[str],
) -> list[list[float]]:
return self.model.encode(images_or_texts).tolist()

View file

@ -2,11 +2,13 @@ from pathlib import Path
from typing import Any
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from insightface.utils.face_align import norm_crop # type: ignore
from ..config import settings
from ..schemas import ModelType
from .base import InferenceModel
from ..schemas import ModelType, ndarray
from .base import InferenceModel, batched
class FaceRecognizer(InferenceModel):
@ -35,26 +37,84 @@ class FaceRecognizer(InferenceModel):
det_size=(640, 640),
)
def predict(self, image: cv2.Mat) -> list[dict[str, Any]]:
height, width, _ = image.shape
results = []
faces = self.model.get(image)
@batched()
async def predict(self, inputs: list[Any]) -> list[Any]:
return self._predict_batch(inputs)
def _predict_batch(self, images: list[cv2.Mat]) -> list[list[dict[str, Any]]]:
batch_det, batch_kpss = self._detect(images)
batch_cropped_images, batch_offsets = self._preprocess(images, batch_kpss)
if batch_cropped_images:
batch_embeddings = self._recognize(batch_cropped_images)
results = self._postprocess(images, batch_det, batch_embeddings, batch_offsets)
else:
results = self._postprocess(images, batch_det)
return results
for face in faces:
x1, y1, x2, y2 = face.bbox
def _detect(self, images: list[cv2.Mat]) -> tuple[list[ndarray], ...]:
batch_det: list[ndarray] = []
batch_kpss: list[ndarray] = []
for image in images:
# detection model doesn't support batching, but recognition model does
bboxes, kpss = self.model.det_model.detect(image)
batch_det.append(bboxes)
batch_kpss.append(kpss)
return batch_det, batch_kpss
results.append(
{
def _preprocess(self, images: list[cv2.Mat], batch_kpss: list[ndarray]) -> tuple[list[cv2.Mat], list[int]]:
batch_cropped_images = []
batch_offsets = []
total_faces = 0
for i, image in enumerate(images):
kpss = batch_kpss[i]
total_faces += kpss.shape[0]
batch_offsets.append(total_faces)
for kps in kpss:
batch_cropped_images.append(norm_crop(image, kps))
return batch_cropped_images, batch_offsets
def _recognize(self, images: list[cv2.Mat]) -> ndarray:
embeddings = self.model.models["recognition"].get_feat(images)
return embeddings
def _postprocess(
self,
images: list[cv2.Mat],
batch_det: list[ndarray],
batch_embeddings: ndarray | None = None,
batch_offsets: list[int] | None = None,
) -> list[list[dict[str, Any]]]:
if batch_embeddings is not None and batch_offsets is not None:
image_embeddings = np.array_split(batch_embeddings, batch_offsets)
else:
image_embeddings = None
batch_faces: list[list[dict[str, Any]]] = []
for i, image in enumerate(images):
faces: list[dict[str, Any]] = []
batch_faces.append(faces)
if image_embeddings is None or image_embeddings[i].shape[0] == 0:
continue
height, width, _ = image.shape
embeddings = image_embeddings[i].tolist()
bboxes = batch_det[i][:, :4].round().tolist()
det_scores = batch_det[i][:, 4].tolist()
for embedding, bbox, det_score in zip(embeddings, bboxes, det_scores):
x1, y1, x2, y2 = bbox
face = {
"imageWidth": width,
"imageHeight": height,
"boundingBox": {
"x1": round(x1),
"y1": round(y1),
"x2": round(x2),
"y2": round(y2),
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
},
"score": face.det_score.item(),
"embedding": face.normed_embedding.tolist(),
"score": det_score,
"embedding": embedding,
}
)
return results
faces.append(face)
return batch_faces

View file

@ -6,7 +6,7 @@ from transformers.pipelines import pipeline
from ..config import settings
from ..schemas import ModelType
from .base import InferenceModel
from .base import InferenceModel, batched
class ImageClassifier(InferenceModel):
@ -29,8 +29,15 @@ class ImageClassifier(InferenceModel):
model_kwargs={"cache_dir": self.cache_dir, **model_kwargs},
)
def predict(self, image: Image) -> list[str]:
predictions: list[dict[str, Any]] = self.model(image) # type: ignore
tags = [tag for pred in predictions for tag in pred["label"].split(", ") if pred["score"] >= self.min_score]
@batched()
async def predict(self, inputs: list[Any]) -> list[Any]:
return self._predict_batch(inputs)
return tags
def _predict_batch(self, images: list[Image]) -> list[list[str]]:
batch_predictions: list[list[dict[str, Any]]] = self.model(images) # type: ignore
results = [self._postprocess(predictions) for predictions in batch_predictions]
return results
def _postprocess(self, predictions: list[dict[str, Any]]) -> list[str]:
return [tag for pred in predictions for tag in pred["label"].split(", ") if pred["score"] >= self.min_score]

View file

@ -1,5 +1,7 @@
from enum import Enum
from typing import TypeAlias
import numpy as np
from pydantic import BaseModel
@ -59,3 +61,6 @@ class ModelType(Enum):
IMAGE_CLASSIFICATION = "image-classification"
CLIP = "clip"
FACIAL_RECOGNITION = "facial-recognition"
ndarray: TypeAlias = np.ndarray[int, np.dtype[np.float32]]

View file

@ -15,6 +15,7 @@ from .models.image_classification import ImageClassifier
from .schemas import ModelType
@pytest.mark.asyncio
class TestImageClassifier:
def test_init(self, mock_classifier_pipeline: mock.Mock) -> None:
cache_dir = Path("test_cache")
@ -27,12 +28,12 @@ class TestImageClassifier:
model_kwargs={"cache_dir": cache_dir},
)
def test_min_score(self, pil_image: Image.Image, mock_classifier_pipeline: mock.Mock) -> None:
async def test_min_score(self, pil_image: Image.Image, mock_classifier_pipeline: mock.Mock) -> None:
classifier = ImageClassifier("test_model_name", min_score=0.0)
classifier.min_score = 0.0
all_labels = classifier.predict(pil_image)
all_labels = await classifier.predict(pil_image)
classifier.min_score = 0.5
filtered_labels = classifier.predict(pil_image)
filtered_labels = await classifier.predict(pil_image)
assert all_labels == [
"that's an image alright",
@ -45,24 +46,25 @@ class TestImageClassifier:
assert filtered_labels == ["that's an image alright"]
@pytest.mark.asyncio
class TestCLIP:
def test_init(self, mock_st: mock.Mock) -> None:
CLIPSTEncoder("test_model_name", cache_dir="test_cache")
mock_st.assert_called_once_with("test_model_name", cache_folder="test_cache")
def test_basic_image(self, pil_image: Image.Image, mock_st: mock.Mock) -> None:
async def test_basic_image(self, pil_image: Image.Image, mock_st: mock.Mock) -> None:
clip_encoder = CLIPSTEncoder("test_model_name", cache_dir="test_cache")
embedding = clip_encoder.predict(pil_image)
embedding = await clip_encoder.predict(pil_image)
assert isinstance(embedding, list)
assert len(embedding) == 512
assert all([isinstance(num, float) for num in embedding])
mock_st.assert_called_once()
def test_basic_text(self, mock_st: mock.Mock) -> None:
async def test_basic_text(self, mock_st: mock.Mock) -> None:
clip_encoder = CLIPSTEncoder("test_model_name", cache_dir="test_cache")
embedding = clip_encoder.predict("test search query")
embedding = await clip_encoder.predict("test search query")
assert isinstance(embedding, list)
assert len(embedding) == 512
@ -70,6 +72,7 @@ class TestCLIP:
mock_st.assert_called_once()
@pytest.mark.asyncio
class TestFaceRecognition:
def test_init(self, mock_faceanalysis: mock.Mock) -> None:
FaceRecognizer("test_model_name", cache_dir="test_cache")
@ -80,9 +83,9 @@ class TestFaceRecognition:
allowed_modules=["detection", "recognition"],
)
def test_basic(self, cv_image: cv2.Mat, mock_faceanalysis: mock.Mock) -> None:
async def test_basic(self, cv_image: cv2.Mat, mock_faceanalysis: mock.Mock) -> None:
face_recognizer = FaceRecognizer("test_model_name", min_score=0.0, cache_dir="test_cache")
faces = face_recognizer.predict(cv_image)
faces = await face_recognizer.predict(cv_image)
assert len(faces) == 2
for face in faces:

View file

@ -2693,6 +2693,61 @@
]
}
},
"/person/{id}/merge": {
"post": {
"operationId": "mergePerson",
"parameters": [
{
"name": "id",
"required": true,
"in": "path",
"schema": {
"format": "uuid",
"type": "string"
}
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/MergePersonDto"
}
}
}
},
"responses": {
"201": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/BulkIdResponseDto"
}
}
}
}
}
},
"tags": [
"Person"
],
"security": [
{
"bearer": []
},
{
"cookie": []
},
{
"api_key": []
}
]
}
},
"/person/{id}/thumbnail": {
"get": {
"operationId": "getPersonThumbnail",
@ -4963,6 +5018,30 @@
"deviceOS"
]
},
"BulkIdResponseDto": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"success": {
"type": "boolean"
},
"error": {
"type": "string",
"enum": [
"duplicate",
"no_permission",
"not_found",
"unknown"
]
}
},
"required": [
"id",
"success"
]
},
"ChangePasswordDto": {
"type": "object",
"properties": {
@ -5069,9 +5148,6 @@
"CreateAssetDto": {
"type": "object",
"properties": {
"assetType": {
"$ref": "#/components/schemas/AssetTypeEnum"
},
"assetData": {
"type": "string",
"format": "binary"
@ -5088,9 +5164,6 @@
"type": "boolean",
"default": false
},
"fileExtension": {
"type": "string"
},
"deviceAssetId": {
"type": "string"
},
@ -5119,9 +5192,7 @@
}
},
"required": [
"assetType",
"assetData",
"fileExtension",
"deviceAssetId",
"deviceId",
"fileCreatedAt",
@ -5492,9 +5563,6 @@
"ImportAssetDto": {
"type": "object",
"properties": {
"assetType": {
"$ref": "#/components/schemas/AssetTypeEnum"
},
"isReadOnly": {
"type": "boolean",
"default": true
@ -5533,7 +5601,6 @@
}
},
"required": [
"assetType",
"assetPath",
"deviceAssetId",
"deviceId",
@ -5756,6 +5823,21 @@
"assets"
]
},
"MergePersonDto": {
"type": "object",
"properties": {
"ids": {
"type": "array",
"items": {
"type": "string",
"format": "uuid"
}
}
},
"required": [
"ids"
]
},
"OAuthCallbackDto": {
"type": "object",
"properties": {
@ -6359,6 +6441,9 @@
"transcode": {
"$ref": "#/components/schemas/TranscodePolicy"
},
"accel": {
"$ref": "#/components/schemas/TranscodeHWAccel"
},
"preset": {
"type": "string"
},
@ -6378,6 +6463,7 @@
"targetVideoCodec",
"targetAudioCodec",
"transcode",
"accel",
"preset",
"targetResolution",
"maxBitrate",
@ -6601,6 +6687,15 @@
"month"
]
},
"TranscodeHWAccel": {
"type": "string",
"enum": [
"nvenc",
"qsv",
"vaapi",
"disabled"
]
},
"TranscodePolicy": {
"type": "string",
"enum": [