mfa.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from __future__ import annotations
  2. import base64
  3. from functools import cached_property
  4. import secrets
  5. import uuid
  6. from django.conf import settings
  7. from django.db import models, transaction
  8. from django.utils import timezone
  9. from pyotp import TOTP, utils as pyotp_utils
  10. class BaseFactor(models.Model):
  11. id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
  12. user = models.ForeignKey("User", on_delete=models.CASCADE)
  13. created = models.DateTimeField(auto_now_add=True)
  14. last_used = models.DateTimeField(null=True, blank=True)
  15. name = models.CharField(blank=True, default="", max_length=64)
  16. class Meta:
  17. constraints = [
  18. models.UniqueConstraint(fields=["user", "name"], name="unique_user_name"),
  19. ]
  20. @transaction.atomic()
  21. def delete(self):
  22. if self.last_used is not None:
  23. self.user.save(credentials_changed=True)
  24. return super().delete()
  25. @transaction.atomic()
  26. def save(self, *args, **kwargs):
  27. if not self.user.mfa_enabled: # enabling MFA
  28. self.user.save(credentials_changed=True)
  29. return super().save(*args, **kwargs)
  30. class TOTPFactor(BaseFactor):
  31. @staticmethod
  32. def _secret_default():
  33. return secrets.token_bytes(32)
  34. secret = models.BinaryField(max_length=32, default=_secret_default.__func__)
  35. last_verified_timestep = models.PositiveIntegerField(default=0)
  36. @cached_property
  37. def _totp(self):
  38. # TODO switch to self.secret once https://github.com/pyauth/pyotp/pull/138 is released
  39. return TOTP(self.base32_secret, digits=6)
  40. @property
  41. def base32_secret(self):
  42. return base64.b32encode(self.secret).rstrip(b"=").decode("ascii")
  43. @property
  44. def uri(self):
  45. return self._totp.provisioning_uri(
  46. name=self.name,
  47. issuer_name=f"desec.{settings.DESECSTACK_DOMAIN}",
  48. )
  49. @transaction.atomic
  50. def verify(self, code):
  51. now = timezone.now()
  52. timestep_now = self._totp.timecode(now)
  53. for offset in (-1, 0, 1):
  54. timestep = timestep_now + offset
  55. if not (self.last_verified_timestep < timestep):
  56. continue
  57. if pyotp_utils.strings_equal(str(code), self._totp.generate_otp(timestep)):
  58. self.last_used = now
  59. self.last_verified_timestep = timestep
  60. self.save()
  61. return True
  62. return False