123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from io import BytesIO
- from typing import TypeAlias
- from unittest import mock
- import cv2
- import numpy as np
- import pytest
- from fastapi.testclient import TestClient
- from PIL import Image
- from pytest_mock import MockerFixture
- from .config import settings
- from .models.cache import ModelCache
- from .models.clip import CLIPSTEncoder
- from .models.facial_recognition import FaceRecognizer
- from .models.image_classification import ImageClassifier
- from .schemas import ModelType
- ndarray: TypeAlias = np.ndarray[int, np.dtype[np.float32]]
- class TestImageClassifier:
- classifier_preds = [
- {"label": "that's an image alright", "score": 0.8},
- {"label": "well it ends with .jpg", "score": 0.1},
- {"label": "idk, im just seeing bytes", "score": 0.05},
- {"label": "not sure", "score": 0.04},
- {"label": "probably a virus", "score": 0.01},
- ]
- def test_eager_init(self, mocker: MockerFixture) -> None:
- mocker.patch.object(ImageClassifier, "download")
- mock_load = mocker.patch.object(ImageClassifier, "load")
- classifier = ImageClassifier("test_model_name", cache_dir="test_cache", eager=True, test_arg="test_arg")
- assert classifier.model_name == "test_model_name"
- mock_load.assert_called_once_with(test_arg="test_arg")
- def test_lazy_init(self, mocker: MockerFixture) -> None:
- mock_download = mocker.patch.object(ImageClassifier, "download")
- mock_load = mocker.patch.object(ImageClassifier, "load")
- face_model = ImageClassifier("test_model_name", cache_dir="test_cache", eager=False, test_arg="test_arg")
- assert face_model.model_name == "test_model_name"
- mock_download.assert_called_once_with(test_arg="test_arg")
- mock_load.assert_not_called()
- def test_min_score(self, pil_image: Image.Image, mocker: MockerFixture) -> None:
- mocker.patch.object(ImageClassifier, "load")
- classifier = ImageClassifier("test_model_name", min_score=0.0)
- assert classifier.min_score == 0.0
- classifier.model = mock.Mock()
- classifier.model.return_value = self.classifier_preds
- all_labels = classifier.predict(pil_image)
- classifier.min_score = 0.5
- filtered_labels = classifier.predict(pil_image)
- assert all_labels == [
- "that's an image alright",
- "well it ends with .jpg",
- "idk",
- "im just seeing bytes",
- "not sure",
- "probably a virus",
- ]
- assert filtered_labels == ["that's an image alright"]
- class TestCLIP:
- embedding = np.random.rand(512).astype(np.float32)
- def test_eager_init(self, mocker: MockerFixture) -> None:
- mocker.patch.object(CLIPSTEncoder, "download")
- mock_load = mocker.patch.object(CLIPSTEncoder, "load")
- clip_model = CLIPSTEncoder("test_model_name", cache_dir="test_cache", eager=True, test_arg="test_arg")
- assert clip_model.model_name == "test_model_name"
- mock_load.assert_called_once_with(test_arg="test_arg")
- def test_lazy_init(self, mocker: MockerFixture) -> None:
- mock_download = mocker.patch.object(CLIPSTEncoder, "download")
- mock_load = mocker.patch.object(CLIPSTEncoder, "load")
- clip_model = CLIPSTEncoder("test_model_name", cache_dir="test_cache", eager=False, test_arg="test_arg")
- assert clip_model.model_name == "test_model_name"
- mock_download.assert_called_once_with(test_arg="test_arg")
- mock_load.assert_not_called()
- def test_basic_image(self, pil_image: Image.Image, mocker: MockerFixture) -> None:
- mocker.patch.object(CLIPSTEncoder, "load")
- clip_encoder = CLIPSTEncoder("test_model_name", cache_dir="test_cache")
- clip_encoder.model = mock.Mock()
- clip_encoder.model.encode.return_value = self.embedding
- embedding = clip_encoder.predict(pil_image)
- assert isinstance(embedding, list)
- assert len(embedding) == 512
- assert all([isinstance(num, float) for num in embedding])
- clip_encoder.model.encode.assert_called_once()
- def test_basic_text(self, mocker: MockerFixture) -> None:
- mocker.patch.object(CLIPSTEncoder, "load")
- clip_encoder = CLIPSTEncoder("test_model_name", cache_dir="test_cache")
- clip_encoder.model = mock.Mock()
- clip_encoder.model.encode.return_value = self.embedding
- embedding = clip_encoder.predict("test search query")
- assert isinstance(embedding, list)
- assert len(embedding) == 512
- assert all([isinstance(num, float) for num in embedding])
- clip_encoder.model.encode.assert_called_once()
- class TestFaceRecognition:
- def test_eager_init(self, mocker: MockerFixture) -> None:
- mocker.patch.object(FaceRecognizer, "download")
- mock_load = mocker.patch.object(FaceRecognizer, "load")
- face_model = FaceRecognizer("test_model_name", cache_dir="test_cache", eager=True, test_arg="test_arg")
- assert face_model.model_name == "test_model_name"
- mock_load.assert_called_once_with(test_arg="test_arg")
- def test_lazy_init(self, mocker: MockerFixture) -> None:
- mock_download = mocker.patch.object(FaceRecognizer, "download")
- mock_load = mocker.patch.object(FaceRecognizer, "load")
- face_model = FaceRecognizer("test_model_name", cache_dir="test_cache", eager=False, test_arg="test_arg")
- assert face_model.model_name == "test_model_name"
- mock_download.assert_called_once_with(test_arg="test_arg")
- mock_load.assert_not_called()
- def test_set_min_score(self, mocker: MockerFixture) -> None:
- mocker.patch.object(FaceRecognizer, "load")
- face_recognizer = FaceRecognizer("test_model_name", cache_dir="test_cache", min_score=0.5)
- assert face_recognizer.min_score == 0.5
- def test_basic(self, cv_image: cv2.Mat, mocker: MockerFixture) -> None:
- mocker.patch.object(FaceRecognizer, "load")
- face_recognizer = FaceRecognizer("test_model_name", min_score=0.0, cache_dir="test_cache")
- det_model = mock.Mock()
- num_faces = 2
- bbox = np.random.rand(num_faces, 4).astype(np.float32)
- score = np.array([[0.67]] * num_faces).astype(np.float32)
- kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
- det_model.detect.return_value = (np.concatenate([bbox, score], axis=-1), kpss)
- face_recognizer.det_model = det_model
- rec_model = mock.Mock()
- embedding = np.random.rand(num_faces, 512).astype(np.float32)
- rec_model.get_feat.return_value = embedding
- face_recognizer.rec_model = rec_model
- faces = face_recognizer.predict(cv_image)
- assert len(faces) == num_faces
- for face in faces:
- assert face["imageHeight"] == 800
- assert face["imageWidth"] == 600
- assert isinstance(face["embedding"], list)
- assert len(face["embedding"]) == 512
- assert all([isinstance(num, float) for num in face["embedding"]])
- det_model.detect.assert_called_once()
- assert rec_model.get_feat.call_count == num_faces
- @pytest.mark.asyncio
- class TestCache:
- async def test_caches(self, mock_get_model: mock.Mock) -> None:
- model_cache = ModelCache()
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
- assert len(model_cache.cache._cache) == 1
- mock_get_model.assert_called_once()
- async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None:
- model_cache = ModelCache()
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION, cache_dir="test_cache")
- mock_get_model.assert_called_once_with(
- ModelType.IMAGE_CLASSIFICATION, "test_model_name", cache_dir="test_cache"
- )
- async def test_different_clip(self, mock_get_model: mock.Mock) -> None:
- model_cache = ModelCache()
- await model_cache.get("test_image_model_name", ModelType.CLIP)
- await model_cache.get("test_text_model_name", ModelType.CLIP)
- mock_get_model.assert_has_calls(
- [
- mock.call(ModelType.CLIP, "test_image_model_name"),
- mock.call(ModelType.CLIP, "test_text_model_name"),
- ]
- )
- assert len(model_cache.cache._cache) == 2
- @mock.patch("app.models.cache.OptimisticLock", autospec=True)
- async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None:
- model_cache = ModelCache(ttl=100)
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
- mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100)
- @mock.patch("app.models.cache.SimpleMemoryCache.expire")
- async def test_revalidate(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None:
- model_cache = ModelCache(ttl=100, revalidate=True)
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
- await model_cache.get("test_model_name", ModelType.IMAGE_CLASSIFICATION)
- mock_cache_expire.assert_called_once_with(mock.ANY, 100)
- @pytest.mark.skipif(
- not settings.test_full,
- reason="More time-consuming since it deploys the app and loads models.",
- )
- class TestEndpoints:
- def test_tagging_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
- byte_image = BytesIO()
- pil_image.save(byte_image, format="jpeg")
- headers = {"Content-Type": "image/jpg"}
- response = deployed_app.post(
- "http://localhost:3003/image-classifier/tag-image",
- content=byte_image.getvalue(),
- headers=headers,
- )
- assert response.status_code == 200
- def test_clip_image_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
- byte_image = BytesIO()
- pil_image.save(byte_image, format="jpeg")
- headers = {"Content-Type": "image/jpg"}
- response = deployed_app.post(
- "http://localhost:3003/sentence-transformer/encode-image",
- content=byte_image.getvalue(),
- headers=headers,
- )
- assert response.status_code == 200
- def test_clip_text_endpoint(self, deployed_app: TestClient) -> None:
- response = deployed_app.post(
- "http://localhost:3003/sentence-transformer/encode-text",
- json={"text": "test search query"},
- )
- assert response.status_code == 200
- def test_face_endpoint(self, pil_image: Image.Image, deployed_app: TestClient) -> None:
- byte_image = BytesIO()
- pil_image.save(byte_image, format="jpeg")
- headers = {"Content-Type": "image/jpg"}
- response = deployed_app.post(
- "http://localhost:3003/facial-recognition/detect-faces",
- content=byte_image.getvalue(),
- headers=headers,
- )
- assert response.status_code == 200
|