facial_recognition.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from pathlib import Path
  2. from typing import Any
  3. import cv2
  4. import numpy as np
  5. import onnxruntime as ort
  6. from insightface.model_zoo import ArcFaceONNX, RetinaFace
  7. from insightface.utils.face_align import norm_crop
  8. from app.config import clean_name
  9. from app.schemas import BoundingBox, Face, ModelType, ndarray_f32
  10. from .base import InferenceModel
  11. class FaceRecognizer(InferenceModel):
  12. _model_type = ModelType.FACIAL_RECOGNITION
  13. def __init__(
  14. self,
  15. model_name: str,
  16. min_score: float = 0.7,
  17. cache_dir: Path | str | None = None,
  18. **model_kwargs: Any,
  19. ) -> None:
  20. self.min_score = model_kwargs.pop("minScore", min_score)
  21. super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
  22. def _load(self) -> None:
  23. self.det_model = RetinaFace(
  24. session=ort.InferenceSession(
  25. self.det_file.as_posix(),
  26. sess_options=self.sess_options,
  27. providers=self.providers,
  28. provider_options=self.provider_options,
  29. ),
  30. )
  31. self.rec_model = ArcFaceONNX(
  32. self.rec_file.as_posix(),
  33. session=ort.InferenceSession(
  34. self.rec_file.as_posix(),
  35. sess_options=self.sess_options,
  36. providers=self.providers,
  37. provider_options=self.provider_options,
  38. ),
  39. )
  40. self.det_model.prepare(
  41. ctx_id=0,
  42. det_thresh=self.min_score,
  43. input_size=(640, 640),
  44. )
  45. self.rec_model.prepare(ctx_id=0)
  46. def _predict(self, image: ndarray_f32 | bytes) -> list[Face]:
  47. if isinstance(image, bytes):
  48. image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
  49. bboxes, kpss = self.det_model.detect(image)
  50. if bboxes.size == 0:
  51. return []
  52. assert isinstance(image, np.ndarray) and isinstance(kpss, np.ndarray)
  53. scores = bboxes[:, 4].tolist()
  54. bboxes = bboxes[:, :4].round().tolist()
  55. results = []
  56. height, width, _ = image.shape
  57. for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
  58. cropped_img = norm_crop(image, kps)
  59. embedding: ndarray_f32 = self.rec_model.get_feat(cropped_img)[0]
  60. face: Face = {
  61. "imageWidth": width,
  62. "imageHeight": height,
  63. "boundingBox": {
  64. "x1": x1,
  65. "y1": y1,
  66. "x2": x2,
  67. "y2": y2,
  68. },
  69. "score": score,
  70. "embedding": embedding,
  71. }
  72. results.append(face)
  73. return results
  74. @property
  75. def cached(self) -> bool:
  76. return self.det_file.is_file() and self.rec_file.is_file()
  77. @property
  78. def det_file(self) -> Path:
  79. return self.cache_dir / "detection" / "model.onnx"
  80. @property
  81. def rec_file(self) -> Path:
  82. return self.cache_dir / "recognition" / "model.onnx"
  83. def configure(self, **model_kwargs: Any) -> None:
  84. self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)