123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- from __future__ import annotations
- import ipaddress
- import secrets
- import uuid
- from datetime import timedelta
- import pgtrigger
- import rest_framework.authtoken.models
- from django.contrib.auth.hashers import make_password
- from django.contrib.postgres.fields import ArrayField
- from django.core import validators
- from django.core.exceptions import ValidationError
- from django.db import models, transaction
- from django.db.models import F, Q
- from django.utils import timezone
- from django_prometheus.models import ExportModelOperationsMixin
- from netfields import CidrAddressField, NetManager
- class Token(ExportModelOperationsMixin("Token"), rest_framework.authtoken.models.Token):
- @staticmethod
- def _allowed_subnets_default():
- return [ipaddress.IPv4Network("0.0.0.0/0"), ipaddress.IPv6Network("::/0")]
- _validators = [
- validators.MinValueValidator(timedelta(0)),
- validators.MaxValueValidator(timedelta(days=365 * 1000)),
- ]
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
- key = models.CharField("Key", max_length=128, db_index=True, unique=True)
- user = models.ForeignKey("User", on_delete=models.CASCADE)
- name = models.CharField("Name", blank=True, max_length=64)
- last_used = models.DateTimeField(null=True, blank=True)
- perm_manage_tokens = models.BooleanField(default=False)
- allowed_subnets = ArrayField(
- CidrAddressField(), default=_allowed_subnets_default.__func__
- )
- max_age = models.DurationField(null=True, default=None, validators=_validators)
- max_unused_period = models.DurationField(
- null=True, default=None, validators=_validators
- )
- domain_policies = models.ManyToManyField("Domain", through="TokenDomainPolicy")
- plain = None
- objects = NetManager()
- class Meta:
- constraints = [
- models.UniqueConstraint(fields=["id", "user"], name="unique_id_user")
- ]
- @property
- def is_valid(self):
- now = timezone.now()
- # Check max age
- try:
- if self.created + self.max_age < now:
- return False
- except TypeError:
- pass
- # Check regular usage requirement
- try:
- if (self.last_used or self.created) + self.max_unused_period < now:
- return False
- except TypeError:
- pass
- return True
- def generate_key(self):
- self.plain = secrets.token_urlsafe(21)
- self.key = Token.make_hash(self.plain)
- return self.key
- @staticmethod
- def make_hash(plain):
- return make_password(plain, salt="static", hasher="pbkdf2_sha256_iter1")
- def get_policy(self, *, domain=None):
- order_by = F("domain").asc(
- nulls_last=True
- ) # default Postgres sorting, but: explicit is better than implicit
- return (
- self.tokendomainpolicy_set.filter(Q(domain=domain) | Q(domain__isnull=True))
- .order_by(order_by)
- .first()
- )
- @transaction.atomic
- def delete(self):
- # This is needed because Model.delete() emulates cascade delete via django.db.models.deletion.Collector.delete()
- # which deletes related objects in pk order. However, the default policy has to be deleted last.
- # Perhaps this will change with https://code.djangoproject.com/ticket/21961
- self.tokendomainpolicy_set.filter(domain__isnull=False).delete()
- self.tokendomainpolicy_set.filter(domain__isnull=True).delete()
- return super().delete()
- @pgtrigger.register(
- # Ensure that token_user is consistent with token
- pgtrigger.Trigger(
- name="token_user",
- operation=pgtrigger.Update | pgtrigger.Insert,
- when=pgtrigger.Before,
- func="NEW.token_user_id = (SELECT user_id FROM desecapi_token WHERE id = NEW.token_id); RETURN NEW;",
- ),
- # Ensure that if there is *any* domain policy for a given token, there is always one with domain=None.
- pgtrigger.Trigger(
- name="default_policy_on_insert",
- operation=pgtrigger.Insert,
- when=pgtrigger.Before,
- # Trigger `condition` arguments (corresponding to WHEN clause) don't support subqueries, so we use `func`
- func="IF (NEW.domain_id IS NOT NULL and NOT EXISTS(SELECT * FROM desecapi_tokendomainpolicy WHERE domain_id IS NULL AND token_id = NEW.token_id)) THEN "
- " RAISE EXCEPTION 'Cannot insert non-default policy into % table when default policy is not present', TG_TABLE_NAME; "
- "END IF; RETURN NEW;",
- ),
- pgtrigger.Protect(
- name="default_policy_on_update",
- operation=pgtrigger.Update,
- when=pgtrigger.Before,
- condition=pgtrigger.Q(old__domain__isnull=True, new__domain__isnull=False),
- ),
- # Ideally, a deferred trigger (https://github.com/Opus10/django-pgtrigger/issues/14). Available in 3.4.0.
- pgtrigger.Trigger(
- name="default_policy_on_delete",
- operation=pgtrigger.Delete,
- when=pgtrigger.Before,
- # Trigger `condition` arguments (corresponding to WHEN clause) don't support subqueries, so we use `func`
- func="IF (OLD.domain_id IS NULL and EXISTS(SELECT * FROM desecapi_tokendomainpolicy WHERE domain_id IS NOT NULL AND token_id = OLD.token_id)) THEN "
- " RAISE EXCEPTION 'Cannot delete default policy from % table when non-default policy is present', TG_TABLE_NAME; "
- "END IF; RETURN OLD;",
- ),
- )
- class TokenDomainPolicy(ExportModelOperationsMixin("TokenDomainPolicy"), models.Model):
- token = models.ForeignKey(Token, on_delete=models.CASCADE)
- domain = models.ForeignKey("Domain", on_delete=models.CASCADE, null=True)
- perm_dyndns = models.BooleanField(default=False)
- perm_rrsets = models.BooleanField(default=False)
- # Token user, filled via trigger. Used by compound FK constraints to tie domain.owner to token.user (see migration).
- token_user = models.ForeignKey(
- "User", on_delete=models.CASCADE, db_constraint=False, related_name="+"
- )
- class Meta:
- constraints = [
- models.UniqueConstraint(fields=["token", "domain"], name="unique_entry"),
- models.UniqueConstraint(
- fields=["token"],
- condition=Q(domain__isnull=True),
- name="unique_entry_null_domain",
- ),
- ]
- def clean(self):
- default_policy = self.token.get_policy(domain=None)
- if self.pk: # update
- # Can't change policy's default status ("domain NULLness") to maintain policy precedence
- if (self.domain is None) != (self.pk == default_policy.pk):
- raise ValidationError(
- {
- "domain": "Policy precedence: Cannot disable default policy when others exist."
- }
- )
- else: # create
- # Can't violate policy precedence (default policy has to be first)
- if (self.domain is not None) and (default_policy is None):
- raise ValidationError(
- {
- "domain": "Policy precedence: The first policy must be the default policy."
- }
- )
- def delete(self):
- # Can't delete default policy when others exist
- if (self.domain is None) and self.token.tokendomainpolicy_set.exclude(
- domain__isnull=True
- ).exists():
- raise ValidationError(
- {
- "domain": "Policy precedence: Can't delete default policy when there exist others."
- }
- )
- return super().delete()
- def save(self, *args, **kwargs):
- self.clean()
- super().save(*args, **kwargs)
|