tokens.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import annotations
  2. import ipaddress
  3. import secrets
  4. import uuid
  5. from datetime import timedelta
  6. import pgtrigger
  7. import rest_framework.authtoken.models
  8. from django.contrib.auth.hashers import make_password
  9. from django.contrib.postgres.fields import ArrayField
  10. from django.core import validators
  11. from django.core.exceptions import ValidationError
  12. from django.db import models, transaction
  13. from django.db.models import F, Q
  14. from django.utils import timezone
  15. from django_prometheus.models import ExportModelOperationsMixin
  16. from netfields import CidrAddressField, NetManager
  17. class Token(ExportModelOperationsMixin('Token'), rest_framework.authtoken.models.Token):
  18. @staticmethod
  19. def _allowed_subnets_default():
  20. return [ipaddress.IPv4Network('0.0.0.0/0'), ipaddress.IPv6Network('::/0')]
  21. _validators = [validators.MinValueValidator(timedelta(0)), validators.MaxValueValidator(timedelta(days=365*1000))]
  22. id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
  23. key = models.CharField("Key", max_length=128, db_index=True, unique=True)
  24. user = models.ForeignKey('User', on_delete=models.CASCADE)
  25. name = models.CharField('Name', blank=True, max_length=64)
  26. last_used = models.DateTimeField(null=True, blank=True)
  27. perm_manage_tokens = models.BooleanField(default=False)
  28. allowed_subnets = ArrayField(CidrAddressField(), default=_allowed_subnets_default.__func__)
  29. max_age = models.DurationField(null=True, default=None, validators=_validators)
  30. max_unused_period = models.DurationField(null=True, default=None, validators=_validators)
  31. domain_policies = models.ManyToManyField('Domain', through='TokenDomainPolicy')
  32. plain = None
  33. objects = NetManager()
  34. class Meta:
  35. constraints = [models.UniqueConstraint(fields=['id', 'user'], name='unique_id_user')]
  36. @property
  37. def is_valid(self):
  38. now = timezone.now()
  39. # Check max age
  40. try:
  41. if self.created + self.max_age < now:
  42. return False
  43. except TypeError:
  44. pass
  45. # Check regular usage requirement
  46. try:
  47. if (self.last_used or self.created) + self.max_unused_period < now:
  48. return False
  49. except TypeError:
  50. pass
  51. return True
  52. def generate_key(self):
  53. self.plain = secrets.token_urlsafe(21)
  54. self.key = Token.make_hash(self.plain)
  55. return self.key
  56. @staticmethod
  57. def make_hash(plain):
  58. return make_password(plain, salt='static', hasher='pbkdf2_sha256_iter1')
  59. def get_policy(self, *, domain=None):
  60. order_by = F('domain').asc(nulls_last=True) # default Postgres sorting, but: explicit is better than implicit
  61. return self.tokendomainpolicy_set.filter(Q(domain=domain) | Q(domain__isnull=True)).order_by(order_by).first()
  62. @transaction.atomic
  63. def delete(self):
  64. # This is needed because Model.delete() emulates cascade delete via django.db.models.deletion.Collector.delete()
  65. # which deletes related objects in pk order. However, the default policy has to be deleted last.
  66. # Perhaps this will change with https://code.djangoproject.com/ticket/21961
  67. self.tokendomainpolicy_set.filter(domain__isnull=False).delete()
  68. self.tokendomainpolicy_set.filter(domain__isnull=True).delete()
  69. return super().delete()
  70. @pgtrigger.register(
  71. # Ensure that token_user is consistent with token
  72. pgtrigger.Trigger(
  73. name='token_user',
  74. operation=pgtrigger.Update | pgtrigger.Insert,
  75. when=pgtrigger.Before,
  76. func='NEW.token_user_id = (SELECT user_id FROM desecapi_token WHERE id = NEW.token_id); RETURN NEW;',
  77. ),
  78. # Ensure that if there is *any* domain policy for a given token, there is always one with domain=None.
  79. pgtrigger.Trigger(
  80. name='default_policy_on_insert',
  81. operation=pgtrigger.Insert,
  82. when=pgtrigger.Before,
  83. # Trigger `condition` arguments (corresponding to WHEN clause) don't support subqueries, so we use `func`
  84. func="IF (NEW.domain_id IS NOT NULL and NOT EXISTS(SELECT * FROM desecapi_tokendomainpolicy WHERE domain_id IS NULL AND token_id = NEW.token_id)) THEN "
  85. " RAISE EXCEPTION 'Cannot insert non-default policy into % table when default policy is not present', TG_TABLE_NAME; "
  86. "END IF; RETURN NEW;",
  87. ),
  88. pgtrigger.Protect(
  89. name='default_policy_on_update',
  90. operation=pgtrigger.Update,
  91. when=pgtrigger.Before,
  92. condition=pgtrigger.Q(old__domain__isnull=True, new__domain__isnull=False),
  93. ),
  94. # Ideally, a deferred trigger (https://github.com/Opus10/django-pgtrigger/issues/14). Available in 3.4.0.
  95. pgtrigger.Trigger(
  96. name='default_policy_on_delete',
  97. operation=pgtrigger.Delete,
  98. when=pgtrigger.Before,
  99. # Trigger `condition` arguments (corresponding to WHEN clause) don't support subqueries, so we use `func`
  100. func="IF (OLD.domain_id IS NULL and EXISTS(SELECT * FROM desecapi_tokendomainpolicy WHERE domain_id IS NOT NULL AND token_id = OLD.token_id)) THEN "
  101. " RAISE EXCEPTION 'Cannot delete default policy from % table when non-default policy is present', TG_TABLE_NAME; "
  102. "END IF; RETURN OLD;",
  103. ),
  104. )
  105. class TokenDomainPolicy(ExportModelOperationsMixin('TokenDomainPolicy'), models.Model):
  106. token = models.ForeignKey(Token, on_delete=models.CASCADE)
  107. domain = models.ForeignKey('Domain', on_delete=models.CASCADE, null=True)
  108. perm_dyndns = models.BooleanField(default=False)
  109. perm_rrsets = models.BooleanField(default=False)
  110. # Token user, filled via trigger. Used by compound FK constraints to tie domain.owner to token.user (see migration).
  111. token_user = models.ForeignKey('User', on_delete=models.CASCADE, db_constraint=False, related_name='+')
  112. class Meta:
  113. constraints = [
  114. models.UniqueConstraint(fields=['token', 'domain'], name='unique_entry'),
  115. models.UniqueConstraint(fields=['token'], condition=Q(domain__isnull=True), name='unique_entry_null_domain')
  116. ]
  117. def clean(self):
  118. default_policy = self.token.get_policy(domain=None)
  119. if self.pk: # update
  120. # Can't change policy's default status ("domain NULLness") to maintain policy precedence
  121. if (self.domain is None) != (self.pk == default_policy.pk):
  122. raise ValidationError({'domain': 'Policy precedence: Cannot disable default policy when others exist.'})
  123. else: # create
  124. # Can't violate policy precedence (default policy has to be first)
  125. if (self.domain is not None) and (default_policy is None):
  126. raise ValidationError({'domain': 'Policy precedence: The first policy must be the default policy.'})
  127. def delete(self):
  128. # Can't delete default policy when others exist
  129. if (self.domain is None) and self.token.tokendomainpolicy_set.exclude(domain__isnull=True).exists():
  130. raise ValidationError({'domain': "Policy precedence: Can't delete default policy when there exist others."})
  131. return super().delete()
  132. def save(self, *args, **kwargs):
  133. self.clean()
  134. super().save(*args, **kwargs)