123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- from __future__ import annotations
- import binascii
- import uuid
- import dns
- from django.contrib.postgres.constraints import ExclusionConstraint
- from django.contrib.postgres.fields import RangeOperators
- from django.core import validators
- from django.core.exceptions import ValidationError
- from django.db import models
- from django.db.models import Manager
- from django.db.models.expressions import RawSQL
- from django_prometheus.models import ExportModelOperationsMixin
- from dns import rdataclass, rdatatype
- from dns.rdtypes import ANY, IN
- from desecapi import pdns
- from desecapi.dns import AAAA, CERT, LongQuotedTXT, MX, NS, SRV
- from .base import validate_lower, validate_upper
- # RR set types: the good, the bad, and the ugly
- # known, but unsupported types
- RR_SET_TYPES_UNSUPPORTED = {
- "ALIAS", # Requires signing at the frontend, hence unsupported in desec-stack
- "IPSECKEY", # broken in pdns, https://github.com/PowerDNS/pdns/issues/10589 TODO enable with pdns auth >= 4.7.0
- "KEY", # Application use restricted by RFC 3445, DNSSEC use replaced by DNSKEY and handled automatically
- "WKS", # General usage not recommended, "SHOULD NOT" be used in SMTP (RFC 1123)
- }
- # restricted types are managed in use by the API, and cannot directly be modified by the API client
- RR_SET_TYPES_AUTOMATIC = {
- # corresponding functionality is automatically managed:
- "KEY",
- "NSEC",
- "NSEC3",
- "OPT",
- "RRSIG",
- # automatically managed by the API:
- "NSEC3PARAM",
- "SOA",
- }
- # backend types are types that are the types supported by the backend(s)
- RR_SET_TYPES_BACKEND = pdns.SUPPORTED_RRSET_TYPES
- # validation types are types supported by the validation backend, currently: dnspython
- RR_SET_TYPES_VALIDATION = (
- set(ANY.__all__) | set(IN.__all__) | {"L32", "L64", "LP", "NID"}
- ) # https://github.com/rthalley/dnspython/pull/751
- # manageable types are directly managed by the API client
- RR_SET_TYPES_MANAGEABLE = (
- (RR_SET_TYPES_BACKEND & RR_SET_TYPES_VALIDATION)
- - RR_SET_TYPES_UNSUPPORTED
- - RR_SET_TYPES_AUTOMATIC
- )
- class RRsetManager(Manager):
- def create(self, contents=None, **kwargs):
- rrset = super().create(**kwargs)
- for content in contents or []:
- RR.objects.create(rrset=rrset, content=content)
- return rrset
- class RRset(ExportModelOperationsMixin("RRset"), models.Model):
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
- created = models.DateTimeField(auto_now_add=True)
- touched = models.DateTimeField(auto_now=True, db_index=True)
- domain = models.ForeignKey("Domain", on_delete=models.CASCADE)
- subname = models.CharField(
- max_length=178,
- blank=True,
- validators=[
- validate_lower,
- validators.RegexValidator(
- regex=r"^([*]|(([*][.])?([a-z0-9_-]{1,63}[.])*[a-z0-9_-]{1,63}))$",
- message="Subname can only use (lowercase) a-z, 0-9, ., -, and _, "
- "may start with a '*.', or just be '*'. Components may not exceed 63 characters.",
- code="invalid_subname",
- ),
- ],
- )
- type = models.CharField(
- max_length=10,
- validators=[
- validate_upper,
- validators.RegexValidator(
- regex=r"^[A-Z][A-Z0-9]*$",
- message="Type must be uppercase alphanumeric and start with a letter.",
- code="invalid_type",
- ),
- ],
- )
- ttl = models.PositiveIntegerField()
- objects = RRsetManager()
- class Meta:
- constraints = [
- ExclusionConstraint(
- name="cname_exclusivity",
- expressions=[
- ("domain", RangeOperators.EQUAL),
- ("subname", RangeOperators.EQUAL),
- (RawSQL("int4(type = 'CNAME')", ()), RangeOperators.NOT_EQUAL),
- ],
- ),
- ]
- unique_together = (("domain", "subname", "type"),)
- @staticmethod
- def construct_name(subname, domain_name):
- return ".".join(filter(None, [subname, domain_name])) + "."
- @property
- def name(self):
- return self.construct_name(self.subname, self.domain.name)
- def save(self, *args, **kwargs):
- # TODO Enforce that subname and type aren't changed. https://github.com/desec-io/desec-stack/issues/553
- self.full_clean(validate_unique=False)
- super().save(*args, **kwargs)
- def clean_records(self, records_presentation_format):
- """
- Validates the records belonging to this set. Validation rules follow the DNS specification; some types may
- incur additional validation rules.
- Raises ValidationError if violation of DNS specification is found.
- Returns a set of records in canonical presentation format.
- :param records_presentation_format: iterable of records in presentation format
- """
- errors = []
- # Singletons
- if self.type in (
- "CNAME",
- "DNAME",
- ):
- if len(records_presentation_format) > 1:
- errors.append(f"{self.type} RRset cannot have multiple records.")
- # Non-apex
- if self.type in (
- "CNAME",
- "DS",
- ):
- if self.subname == "":
- errors.append(f"{self.type} RRset cannot have empty subname.")
- if self.type in ("DNSKEY",):
- if self.subname != "":
- errors.append(f"{self.type} RRset must have empty subname.")
- def _error_msg(record, detail):
- return f"Record content of {self.type} {self.name} invalid: '{record}': {detail}"
- records_canonical_format = set()
- for r in records_presentation_format:
- try:
- r_canonical_format = RR.canonical_presentation_format(r, self.type)
- except ValueError as ex:
- errors.append(_error_msg(r, str(ex)))
- else:
- if r_canonical_format in records_canonical_format:
- errors.append(
- _error_msg(
- r,
- f"Duplicate record content: this is identical to "
- f"'{r_canonical_format}'",
- )
- )
- else:
- records_canonical_format.add(r_canonical_format)
- if any(errors):
- raise ValidationError(errors)
- return records_canonical_format
- def save_records(self, records):
- """
- Updates this RR set's resource records, discarding any old values.
- Records are expected in presentation format and are converted to canonical
- presentation format (e.g., 127.00.0.1 will be converted to 127.0.0.1).
- Raises if a invalid set of records is provided.
- This method triggers the following database queries:
- - one DELETE query
- - one SELECT query for comparison of old with new records
- - one INSERT query, if one or more records were added
- Changes are saved to the database immediately.
- :param records: list of records in presentation format
- """
- new_records = self.clean_records(records)
- # Delete RRs that are not in the new record list from the DB
- self.records.exclude(content__in=new_records).delete() # one DELETE
- # Retrieve all remaining RRs from the DB
- unchanged_records = set(r.content for r in self.records.all()) # one SELECT
- # Save missing RRs from the new record list to the DB
- added_records = new_records - unchanged_records
- rrs = [RR(rrset=self, content=content) for content in added_records]
- RR.objects.bulk_create(rrs) # One INSERT
- def __str__(self):
- return "<RRSet %s domain=%s type=%s subname=%s>" % (
- self.pk,
- self.domain.name,
- self.type,
- self.subname,
- )
- class RRManager(Manager):
- def bulk_create(self, rrs, **kwargs):
- ret = super().bulk_create(rrs, **kwargs)
- # For each rrset, save once to set RRset.updated timestamp and trigger signal for post-save processing
- rrsets = {rr.rrset for rr in rrs}
- for rrset in rrsets:
- rrset.save()
- return ret
- class RR(ExportModelOperationsMixin("RR"), models.Model):
- created = models.DateTimeField(auto_now_add=True)
- rrset = models.ForeignKey(RRset, on_delete=models.CASCADE, related_name="records")
- content = models.TextField()
- objects = RRManager()
- _type_map = {
- dns.rdatatype.AAAA: AAAA, # TODO remove when https://github.com/PowerDNS/pdns/issues/8182 is fixed
- dns.rdatatype.CERT: CERT, # do DNS name validation the same way as pdns
- dns.rdatatype.MX: MX, # do DNS name validation the same way as pdns
- dns.rdatatype.NS: NS, # do DNS name validation the same way as pdns
- dns.rdatatype.SRV: SRV, # do DNS name validation the same way as pdns
- dns.rdatatype.TXT: LongQuotedTXT, # we slightly deviate from RFC 1035 and allow tokens longer than 255 bytes
- dns.rdatatype.SPF: LongQuotedTXT, # we slightly deviate from RFC 1035 and allow tokens longer than 255 bytes
- }
- @staticmethod
- def canonical_presentation_format(any_presentation_format, type_):
- """
- Converts any valid presentation format for a RR into it's canonical presentation format.
- Raises if provided presentation format is invalid.
- """
- rdtype = rdatatype.from_text(type_)
- try:
- # Convert to wire format, ensuring input validation.
- cls = RR._type_map.get(rdtype, dns.rdata)
- wire = cls.from_text(
- rdclass=rdataclass.IN,
- rdtype=rdtype,
- tok=dns.tokenizer.Tokenizer(any_presentation_format),
- relativize=False,
- ).to_digestable()
- if len(wire) > 64000:
- raise ValidationError(
- f"Ensure this value has no more than 64000 byte in wire format (it has {len(wire)})."
- )
- parser = dns.wire.Parser(wire, current=0)
- with parser.restrict_to(len(wire)):
- rdata = cls.from_wire_parser(
- rdclass=rdataclass.IN, rdtype=rdtype, parser=parser
- )
- # Convert to canonical presentation format, disable chunking of records.
- # Exempt types which have chunksize hardcoded (prevents "got multiple values for keyword argument 'chunksize'").
- chunksize_exception_types = (
- dns.rdatatype.OPENPGPKEY,
- dns.rdatatype.EUI48,
- dns.rdatatype.EUI64,
- )
- if rdtype in chunksize_exception_types:
- return rdata.to_text()
- else:
- return rdata.to_text(chunksize=0)
- except binascii.Error:
- # e.g., odd-length string
- raise ValueError("Cannot parse hexadecimal or base64 record contents")
- except dns.exception.SyntaxError as e:
- # e.g., A/127.0.0.999
- if "quote" in e.args[0]:
- raise ValueError(
- f"Data for {type_} records must be given using quotation marks."
- )
- else:
- raise ValueError(
- f'Record content for type {type_} malformed: {",".join(e.args)}'
- )
- except dns.name.NeedAbsoluteNameOrOrigin:
- raise ValueError(
- 'Hostname must be fully qualified (i.e., end in a dot: "example.com.")'
- )
- except ValueError as ex:
- # e.g., string ("asdf") cannot be parsed into int on base 10
- raise ValueError(f"Cannot parse record contents: {ex}")
- except Exception as e:
- # TODO see what exceptions raise here for faulty input
- raise e
- def __str__(self):
- return "<RR %s %s rr_set=%s>" % (self.pk, self.content, self.rrset.pk)
|