123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- from __future__ import annotations
- from functools import cached_property
- import dns
- import psl_dns
- from django.conf import settings
- from django.contrib.auth.models import AnonymousUser
- from django.core.exceptions import ValidationError
- from django.db import models
- from django.db.models import CharField, F, Manager, Q, Value
- from django.db.models.functions import Concat, Length
- from django_prometheus.models import ExportModelOperationsMixin
- from dns.exception import Timeout
- from dns.resolver import NoNameservers
- from rest_framework.exceptions import APIException
- from desecapi import logger, metrics, pdns
- from .base import validate_domain_name
- from .records import RRset
- psl = psl_dns.PSL(resolver=settings.PSL_RESOLVER, timeout=0.5)
- class DomainManager(Manager):
- def filter_qname(self, qname: str, **kwargs) -> models.query.QuerySet:
- qs = self.annotate(
- name_length=Length("name")
- ) # callers expect this to be present after returning
- try:
- Domain._meta.get_field("name").run_validators(
- qname.removeprefix("*.").lower()
- )
- except ValidationError:
- return qs.none()
- return qs.annotate(
- dotted_name=Concat(Value("."), "name", output_field=CharField()),
- dotted_qname=Value(f".{qname}", output_field=CharField()),
- ).filter(dotted_qname__endswith=F("dotted_name"), **kwargs)
- class Domain(ExportModelOperationsMixin("Domain"), models.Model):
- @staticmethod
- def _minimum_ttl_default():
- return settings.MINIMUM_TTL_DEFAULT
- class RenewalState(models.IntegerChoices):
- IMMORTAL = 0
- FRESH = 1
- NOTIFIED = 2
- WARNED = 3
- created = models.DateTimeField(auto_now_add=True)
- name = models.CharField(
- max_length=191, unique=True, validators=validate_domain_name
- )
- owner = models.ForeignKey("User", on_delete=models.PROTECT, related_name="domains")
- published = models.DateTimeField(null=True, blank=True)
- minimum_ttl = models.PositiveIntegerField(default=_minimum_ttl_default.__func__)
- renewal_state = models.IntegerField(
- choices=RenewalState.choices, default=RenewalState.IMMORTAL
- )
- renewal_changed = models.DateTimeField(auto_now_add=True)
- _keys = None
- objects = DomainManager()
- class Meta:
- constraints = [
- models.UniqueConstraint(fields=["id", "owner"], name="unique_id_owner")
- ]
- ordering = ("created",)
- def __init__(self, *args, **kwargs):
- if isinstance(kwargs.get("owner"), AnonymousUser):
- kwargs = {**kwargs, "owner": None} # make a copy and override
- # Avoid super().__init__(owner=None, ...) to not mess up *values instantiation in django.db.models.Model.from_db
- super().__init__(*args, **kwargs)
- if (
- self.pk is None
- and kwargs.get("renewal_state") is None
- and self.is_locally_registrable
- ):
- self.renewal_state = Domain.RenewalState.FRESH
- @cached_property
- def public_suffix(self):
- try:
- public_suffix = psl.get_public_suffix(self.name)
- is_public_suffix = psl.is_public_suffix(self.name)
- except (Timeout, NoNameservers):
- public_suffix = self.name.rpartition(".")[2]
- is_public_suffix = "." not in self.name # TLDs are public suffixes
- if is_public_suffix:
- return public_suffix
- # Take into account that any of the parent domains could be a local public suffix. To that
- # end, identify the longest local public suffix that is actually a suffix of domain_name.
- for local_public_suffix in settings.LOCAL_PUBLIC_SUFFIXES:
- has_local_public_suffix_parent = ("." + self.name).endswith(
- "." + local_public_suffix
- )
- if has_local_public_suffix_parent and len(local_public_suffix) > len(
- public_suffix
- ):
- public_suffix = local_public_suffix
- return public_suffix
- def is_covered_by_foreign_zone(self):
- # Generate a list of all domains connecting this one and its public suffix.
- # If another user owns a zone with one of these names, then the requested
- # domain is unavailable because it is part of the other user's zone.
- private_components = self.name.rsplit(self.public_suffix, 1)[0].rstrip(".")
- private_components = private_components.split(".") if private_components else []
- private_domains = [
- ".".join(private_components[i:]) for i in range(0, len(private_components))
- ]
- private_domains = [
- f"{private_domain}.{self.public_suffix}"
- for private_domain in private_domains
- ]
- assert self.name == next(iter(private_domains), self.public_suffix)
- # Determine whether domain is covered by other users' zones
- return Domain.objects.filter(
- Q(name__in=private_domains) & ~Q(owner=self._owner_or_none)
- ).exists()
- def covers_foreign_zone(self):
- # Note: This is not completely accurate: Ideally, we should only consider zones with identical public suffix.
- # (If a public suffix lies in between, it's ok.) However, as there could be many descendant zones, the accurate
- # check is expensive, so currently not implemented (PSL lookups for each of them).
- return Domain.objects.filter(
- Q(name__endswith=f".{self.name}") & ~Q(owner=self._owner_or_none)
- ).exists()
- def is_registrable(self):
- """
- Returns False if the domain name is reserved, a public suffix, or covered by / covers another user's domain.
- Otherwise, True is returned.
- """
- self.clean() # ensure .name is a domain name
- private_generation = self.name.count(".") - self.public_suffix.count(".")
- assert private_generation >= 0
- # .internal is reserved
- if f".{self.name}".endswith(".internal"):
- return False
- # Public suffixes can only be registered if they are local
- if private_generation == 0 and self.name not in settings.LOCAL_PUBLIC_SUFFIXES:
- return False
- # Disallow _acme-challenge.dedyn.io and the like. Rejects reserved direct children of public suffixes.
- reserved_prefixes = (
- "_",
- "autoconfig.",
- "autodiscover.",
- )
- if private_generation == 1 and any(
- self.name.startswith(prefix) for prefix in reserved_prefixes
- ):
- return False
- # Domains covered by another user's zone can't be registered
- if self.is_covered_by_foreign_zone():
- return False
- # Domains that would cover another user's zone can't be registered
- if self.covers_foreign_zone():
- return False
- return True
- @property
- def keys(self):
- if not self._keys:
- self._keys = [{**key, "managed": True} for key in pdns.get_keys(self)]
- try:
- unmanaged_keys = self.rrset_set.get(
- subname="", type="DNSKEY"
- ).records.all()
- except RRset.DoesNotExist:
- pass
- else:
- name = dns.name.from_text(self.name)
- for rr in unmanaged_keys:
- key = dns.rdata.from_text(
- dns.rdataclass.IN, dns.rdatatype.DNSKEY, rr.content
- )
- key_is_sep = key.flags & dns.rdtypes.ANY.DNSKEY.SEP
- self._keys.append(
- {
- "dnskey": rr.content,
- "ds": [
- dns.dnssec.make_ds(name, key, algo).to_text()
- for algo in (2, 4)
- ]
- if key_is_sep
- else [],
- "flags": key.flags, # deprecated
- "keytype": None, # deprecated
- "managed": False,
- }
- )
- return self._keys
- @property
- def touched(self):
- try:
- rrset_touched = max(
- updated for updated in self.rrset_set.values_list("touched", flat=True)
- )
- except ValueError: # no RRsets (but there should be at least NS)
- return self.published # may be None if the domain was never published
- return max(rrset_touched, self.published or rrset_touched)
- @property
- def is_locally_registrable(self):
- return self.parent_domain_name in settings.LOCAL_PUBLIC_SUFFIXES
- @property
- def _owner_or_none(self):
- try:
- return self.owner
- except Domain.owner.RelatedObjectDoesNotExist:
- return None
- @property
- def parent_domain_name(self):
- return self._partitioned_name[1]
- @property
- def _partitioned_name(self):
- subname, _, parent_name = self.name.partition(".")
- return subname, parent_name or None
- def save(self, *args, **kwargs):
- self.full_clean(validate_unique=False)
- super().save(*args, **kwargs)
- def update_delegation(self, child_domain: Domain):
- child_subname, child_domain_name = child_domain._partitioned_name
- if self.name != child_domain_name:
- raise ValueError(
- "Cannot update delegation of %s as it is not an immediate child domain of %s."
- % (child_domain.name, self.name)
- )
- # Always remove delegation so that we con properly recreate it
- for rrset in self.rrset_set.filter(
- subname=child_subname, type__in=["NS", "DS"]
- ):
- rrset.delete()
- if child_domain.pk:
- # Domain real: (re-)set delegation
- child_keys = child_domain.keys
- if not child_keys:
- raise APIException(
- "Cannot delegate %s, as it currently has no keys."
- % child_domain.name
- )
- RRset.objects.create(
- domain=self,
- subname=child_subname,
- type="NS",
- ttl=3600,
- contents=settings.DEFAULT_NS,
- )
- RRset.objects.create(
- domain=self,
- subname=child_subname,
- type="DS",
- ttl=300,
- contents=[ds for k in child_keys for ds in k["ds"]],
- )
- metrics.get("desecapi_autodelegation_created").inc()
- else:
- # Domain not real: that's it
- metrics.get("desecapi_autodelegation_deleted").inc()
- def delete(self):
- ret = super().delete()
- logger.warning(f"Domain {self.name} deleted (owner: {self.owner.pk})")
- return ret
- def __str__(self):
- return self.name
|