facial_recognition.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import zipfile
  2. from pathlib import Path
  3. from typing import Any
  4. import cv2
  5. import numpy as np
  6. import onnxruntime as ort
  7. from insightface.model_zoo import ArcFaceONNX, RetinaFace
  8. from insightface.utils.face_align import norm_crop
  9. from insightface.utils.storage import BASE_REPO_URL, download_file
  10. from ..config import settings
  11. from ..schemas import ModelType
  12. from .base import InferenceModel
  13. class FaceRecognizer(InferenceModel):
  14. _model_type = ModelType.FACIAL_RECOGNITION
  15. def __init__(
  16. self,
  17. model_name: str,
  18. min_score: float = settings.min_face_score,
  19. cache_dir: Path | str | None = None,
  20. **model_kwargs: Any,
  21. ) -> None:
  22. self.min_score = min_score
  23. super().__init__(model_name, cache_dir, **model_kwargs)
  24. def _download(self, **model_kwargs: Any) -> None:
  25. zip_file = self.cache_dir / f"{self.model_name}.zip"
  26. download_file(f"{BASE_REPO_URL}/{self.model_name}.zip", zip_file)
  27. with zipfile.ZipFile(zip_file, "r") as zip:
  28. members = zip.namelist()
  29. det_file = next(model for model in members if model.startswith("det_"))
  30. rec_file = next(model for model in members if model.startswith("w600k_"))
  31. zip.extractall(self.cache_dir, members=[det_file, rec_file])
  32. zip_file.unlink()
  33. def _load(self, **model_kwargs: Any) -> None:
  34. try:
  35. det_file = next(self.cache_dir.glob("det_*.onnx"))
  36. rec_file = next(self.cache_dir.glob("w600k_*.onnx"))
  37. except StopIteration:
  38. raise FileNotFoundError("Facial recognition models not found in cache directory")
  39. self.det_model = RetinaFace(
  40. session=ort.InferenceSession(
  41. det_file.as_posix(),
  42. sess_options=self.sess_options,
  43. providers=self.providers,
  44. provider_options=self.provider_options,
  45. ),
  46. )
  47. self.rec_model = ArcFaceONNX(
  48. rec_file.as_posix(),
  49. session=ort.InferenceSession(
  50. rec_file.as_posix(),
  51. sess_options=self.sess_options,
  52. providers=self.providers,
  53. provider_options=self.provider_options,
  54. ),
  55. )
  56. self.det_model.prepare(
  57. ctx_id=0,
  58. det_thresh=self.min_score,
  59. input_size=(640, 640),
  60. )
  61. self.rec_model.prepare(ctx_id=0)
  62. def _predict(self, image: cv2.Mat) -> list[dict[str, Any]]:
  63. bboxes, kpss = self.det_model.detect(image)
  64. if bboxes.size == 0:
  65. return []
  66. assert isinstance(kpss, np.ndarray)
  67. scores = bboxes[:, 4].tolist()
  68. bboxes = bboxes[:, :4].round().tolist()
  69. results = []
  70. height, width, _ = image.shape
  71. for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
  72. cropped_img = norm_crop(image, kps)
  73. embedding = self.rec_model.get_feat(cropped_img)[0].tolist()
  74. results.append(
  75. {
  76. "imageWidth": width,
  77. "imageHeight": height,
  78. "boundingBox": {
  79. "x1": x1,
  80. "y1": y1,
  81. "x2": x2,
  82. "y2": y2,
  83. },
  84. "score": score,
  85. "embedding": embedding,
  86. }
  87. )
  88. return results
  89. @property
  90. def cached(self) -> bool:
  91. return self.cache_dir.is_dir() and any(self.cache_dir.glob("*.onnx"))