271 lines
11 KiB
Python
271 lines
11 KiB
Python
import pickle
|
|
from io import BytesIO
|
|
from typing import TypeAlias
|
|
from unittest import mock
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import onnxruntime as ort
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from PIL import Image
|
|
from pytest_mock import MockerFixture
|
|
|
|
from .config import settings
|
|
from .models.base import PicklableSessionOptions
|
|
from .models.cache import ModelCache
|
|
from .models.clip import CLIPEncoder
|
|
from .models.facial_recognition import FaceRecognizer
|
|
from .models.image_classification import ImageClassifier
|
|
from .schemas import ModelType
|
|
|
|
ndarray: TypeAlias = np.ndarray[int, np.dtype[np.float32]]
|
|
|
|
|
|
class TestImageClassifier:
|
|
classifier_preds = [
|
|
{"label": "that's an image alright", "score": 0.8},
|
|
{"label": "well it ends with .jpg", "score": 0.1},
|
|
{"label": "idk, im just seeing bytes", "score": 0.05},
|
|
{"label": "not sure", "score": 0.04},
|
|
{"label": "probably a virus", "score": 0.01},
|
|
]
|
|
|
|
def test_eager_init(self, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(ImageClassifier, "download")
|
|
mock_load = mocker.patch.object(ImageClassifier, "load")
|
|
classifier = ImageClassifier("test_model_name", cache_dir="test_cache", eager=True, test_arg="test_arg")
|
|
|
|
assert classifier.model_name == "test_model_name"
|
|
mock_load.assert_called_once_with(test_arg="test_arg")
|
|
|
|
def test_lazy_init(self, mocker: MockerFixture) -> None:
|
|
mock_download = mocker.patch.object(ImageClassifier, "download")
|
|
mock_load = mocker.patch.object(ImageClassifier, "load")
|
|
face_model = ImageClassifier("test_model_name", cache_dir="test_cache", eager=False, test_arg="test_arg")
|
|
|
|
assert face_model.model_name == "test_model_name"
|
|
mock_download.assert_called_once_with(test_arg="test_arg")
|
|
mock_load.assert_not_called()
|
|
|
|
def test_min_score(self, pil_image: Image.Image, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(ImageClassifier, "load")
|
|
classifier = ImageClassifier("test_model_name", min_score=0.0)
|
|
assert classifier.min_score == 0.0
|
|
|
|
classifier.model = mock.Mock()
|
|
classifier.model.return_value = self.classifier_preds
|
|
|
|
all_labels = classifier.predict(pil_image)
|
|
classifier.min_score = 0.5
|
|
filtered_labels = classifier.predict(pil_image)
|
|
|
|
assert all_labels == [
|
|
"that's an image alright",
|
|
"well it ends with .jpg",
|
|
"idk",
|
|
"im just seeing bytes",
|
|
"not sure",
|
|
"probably a virus",
|
|
]
|
|
assert filtered_labels == ["that's an image alright"]
|
|
|
|
|
|
class TestCLIP:
|
|
embedding = np.random.rand(512).astype(np.float32)
|
|
|
|
def test_eager_init(self, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(CLIPEncoder, "download")
|
|
mock_load = mocker.patch.object(CLIPEncoder, "load")
|
|
clip_model = CLIPEncoder("ViT-B-32::openai", cache_dir="test_cache", eager=True, test_arg="test_arg")
|
|
|
|
assert clip_model.model_name == "ViT-B-32::openai"
|
|
mock_load.assert_called_once_with(test_arg="test_arg")
|
|
|
|
def test_lazy_init(self, mocker: MockerFixture) -> None:
|
|
mock_download = mocker.patch.object(CLIPEncoder, "download")
|
|
mock_load = mocker.patch.object(CLIPEncoder, "load")
|
|
clip_model = CLIPEncoder("ViT-B-32::openai", cache_dir="test_cache", eager=False, test_arg="test_arg")
|
|
|
|
assert clip_model.model_name == "ViT-B-32::openai"
|
|
mock_download.assert_called_once_with(test_arg="test_arg")
|
|
mock_load.assert_not_called()
|
|
|
|
def test_basic_image(self, pil_image: Image.Image, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(CLIPEncoder, "download")
|
|
mocked = mocker.patch("app.models.clip.ort.InferenceSession", autospec=True)
|
|
mocked.return_value.run.return_value = [[self.embedding]]
|
|
clip_encoder = CLIPEncoder("ViT-B-32::openai", cache_dir="test_cache", mode="vision")
|
|
assert clip_encoder.mode == "vision"
|
|
embedding = clip_encoder.predict(pil_image)
|
|
|
|
assert isinstance(embedding, list)
|
|
assert len(embedding) == 512
|
|
assert all([isinstance(num, float) for num in embedding])
|
|
clip_encoder.vision_model.run.assert_called_once()
|
|
|
|
def test_basic_text(self, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(CLIPEncoder, "download")
|
|
mocked = mocker.patch("app.models.clip.ort.InferenceSession", autospec=True)
|
|
mocked.return_value.run.return_value = [[self.embedding]]
|
|
clip_encoder = CLIPEncoder("ViT-B-32::openai", cache_dir="test_cache", mode="text")
|
|
assert clip_encoder.mode == "text"
|
|
embedding = clip_encoder.predict("test search query")
|
|
|
|
assert isinstance(embedding, list)
|
|
assert len(embedding) == 512
|
|
assert all([isinstance(num, float) for num in embedding])
|
|
clip_encoder.text_model.run.assert_called_once()
|
|
|
|
|
|
class TestFaceRecognition:
|
|
def test_eager_init(self, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(FaceRecognizer, "download")
|
|
mock_load = mocker.patch.object(FaceRecognizer, "load")
|
|
face_model = FaceRecognizer("test_model_name", cache_dir="test_cache", eager=True, test_arg="test_arg")
|
|
|
|
assert face_model.model_name == "test_model_name"
|
|
mock_load.assert_called_once_with(test_arg="test_arg")
|
|
|
|
def test_lazy_init(self, mocker: MockerFixture) -> None:
|
|
mock_download = mocker.patch.object(FaceRecognizer, "download")
|
|
mock_load = mocker.patch.object(FaceRecognizer, "load")
|
|
face_model = FaceRecognizer("test_model_name", cache_dir="test_cache", eager=False, test_arg="test_arg")
|
|
|
|
assert face_model.model_name == "test_model_name"
|
|
mock_download.assert_called_once_with(test_arg="test_arg")
|
|
mock_load.assert_not_called()
|
|
|
|
def test_set_min_score(self, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(FaceRecognizer, "load")
|
|
face_recognizer = FaceRecognizer("test_model_name", cache_dir="test_cache", min_score=0.5)
|
|
|
|
assert face_recognizer.min_score == 0.5
|
|
|
|
def test_basic(self, cv_image: cv2.Mat, mocker: MockerFixture) -> None:
|
|
mocker.patch.object(FaceRecognizer, "load")
|
|
face_recognizer = FaceRecognizer("test_model_name", min_score=0.0, cache_dir="test_cache")
|
|
|
|
det_model = mock.Mock()
|
|
num_faces = 2
|
|
bbox = np.random.rand(num_faces, 4).astype(np.float32)
|
|
score = np.array([[0.67]] * num_faces).astype(np.float32)
|
|
kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
|
|
det_model.detect.return_value = (np.concatenate([bbox, score], axis=-1), kpss)
|
|
face_recognizer.det_model = det_model
|
|
|
|
rec_model = mock.Mock()
|
|
embedding = np.random.rand(num_faces, 512).astype(np.float32)
|
|
rec_model.get_feat.return_value = embedding
|
|
face_recognizer.rec_model = rec_model
|
|
|
|
faces = face_recognizer.predict(cv_image)
|
|
|
|
assert len(faces) == num_faces
|
|
for face in faces:
|
|
assert face["imageHeight"] == 800
|
|
assert face["imageWidth"] == 600
|
|
assert isinstance(face["embedding"], list)
|
|
assert len(face["embedding"]) == 512
|
|
assert all([isinstance(num, float) for num in face["embedding"]])
|
|
|
|
det_model.detect.assert_called_once()
|
|
assert rec_model.get_feat.call_count == num_faces
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCache:
|
|
async def test_caches(self, mock_get_model: mock.Mock) -> None:
|
|
model_cache = ModelCache()
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
|
|
assert len(model_cache.cache._cache) == 1
|
|
mock_get_model.assert_called_once()
|
|
|
|
async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None:
|
|
model_cache = ModelCache()
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION, cache_dir="test_cache")
|
|
mock_get_model.assert_called_once_with(
|
|
ModelType.IMAGE_CLASSIFICATION, "test_model_name", cache_dir="test_cache"
|
|
)
|
|
|
|
async def test_different_clip(self, mock_get_model: mock.Mock) -> None:
|
|
model_cache = ModelCache()
|
|
await model_cache.get("test_image_model_name", ModelType.CLIP)
|
|
await model_cache.get("test_text_model_name", ModelType.CLIP)
|
|
mock_get_model.assert_has_calls(
|
|
[
|
|
mock.call(ModelType.CLIP, "test_image_model_name"),
|
|
mock.call(ModelType.CLIP, "test_text_model_name"),
|
|
]
|
|
)
|
|
assert len(model_cache.cache._cache) == 2
|
|
|
|
@mock.patch("app.models.cache.OptimisticLock", autospec=True)
|
|
async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None:
|
|
model_cache = ModelCache(ttl=100)
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
|
|
mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100)
|
|
|
|
@mock.patch("app.models.cache.SimpleMemoryCache.expire")
|
|
async def test_revalidate(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None:
|
|
model_cache = ModelCache(ttl=100, revalidate=True)
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
|
|
await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
|
|
mock_cache_expire.assert_called_once_with(mock.ANY, 100)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not settings.test_full,
|
|
reason="More time-consuming since it deploys the app and loads models.",
|
|
)
|
|
class TestEndpoints:
|
|
def test_tagging_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
|
|
byte_image = BytesIO()
|
|
pil_image.save(byte_image, format="jpeg")
|
|
headers = {"Content-Type": "image/jpg"}
|
|
response = deployed_app.post(
|
|
"http://localhost:3003/image-classifier/tag-image",
|
|
content=byte_image.getvalue(),
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
def test_clip_image_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
|
|
byte_image = BytesIO()
|
|
pil_image.save(byte_image, format="jpeg")
|
|
headers = {"Content-Type": "image/jpg"}
|
|
response = deployed_app.post(
|
|
"http://localhost:3003/sentence-transformer/encode-image",
|
|
content=byte_image.getvalue(),
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
def test_clip_text_endpoint(self, deployed_app: TestClient) -> None:
|
|
response = deployed_app.post(
|
|
"http://localhost:3003/sentence-transformer/encode-text",
|
|
json={"text": "test search query"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
def test_face_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
|
|
byte_image = BytesIO()
|
|
pil_image.save(byte_image, format="jpeg")
|
|
headers = {"Content-Type": "image/jpg"}
|
|
response = deployed_app.post(
|
|
"http://localhost:3003/facial-recognition/detect-faces",
|
|
content=byte_image.getvalue(),
|
|
headers=headers,
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_sess_options() -> None:
|
|
sess_options = PicklableSessionOptions()
|
|
sess_options.intra_op_num_threads = 1
|
|
sess_options.inter_op_num_threads = 1
|
|
pickled = pickle.dumps(sess_options)
|
|
unpickled = pickle.loads(pickled)
|
|
assert unpickled.intra_op_num_threads == 1
|
|
assert unpickled.inter_op_num_threads == 1
|