123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import dns.name
- import dns.zone
- from rest_framework import serializers
- from api import settings
- from desecapi.models import Domain, RR_SET_TYPES_AUTOMATIC
- from desecapi.validators import ReadOnlyOnUpdateValidator
- from .records import RRsetSerializer
- class DomainSerializer(serializers.ModelSerializer):
- default_error_messages = {
- **serializers.Serializer.default_error_messages,
- "name_unavailable": "This domain name conflicts with an existing zone, or is disallowed by policy.",
- }
- zonefile = serializers.CharField(write_only=True, required=False, allow_blank=True)
- class Meta:
- model = Domain
- fields = (
- "created",
- "published",
- "name",
- "keys",
- "minimum_ttl",
- "touched",
- "zonefile",
- )
- read_only_fields = (
- "published",
- "minimum_ttl",
- )
- extra_kwargs = {
- "name": {"trim_whitespace": False},
- }
- def __init__(self, *args, include_keys=False, **kwargs):
- self.include_keys = include_keys
- self.import_zone = None
- super().__init__(*args, **kwargs)
- def get_fields(self):
- fields = super().get_fields()
- if not self.include_keys:
- fields.pop("keys")
- fields["name"].validators.append(ReadOnlyOnUpdateValidator())
- return fields
- def validate_name(self, value):
- if not Domain(name=value, owner=self.context["request"].user).is_registrable():
- raise serializers.ValidationError(
- self.default_error_messages["name_unavailable"], code="name_unavailable"
- )
- return value
- def parse_zonefile(self, domain_name: str, zonefile: str):
- try:
- self.import_zone = dns.zone.from_text(
- zonefile,
- origin=dns.name.from_text(domain_name),
- allow_include=False,
- check_origin=False,
- relativize=False,
- )
- except dns.zonefile.CNAMEAndOtherData:
- raise serializers.ValidationError(
- {
- "zonefile": [
- "No other records with the same name are allowed alongside a CNAME record."
- ]
- }
- )
- except ValueError as e:
- if "has non-origin SOA" in str(e):
- raise serializers.ValidationError(
- {
- "zonefile": [
- f"Zonefile includes an SOA record for a name different from {domain_name}."
- ]
- }
- )
- raise e
- except dns.exception.SyntaxError as e:
- try:
- line = str(e).split(":")[1]
- raise serializers.ValidationError(
- {"zonefile": [f"Zonefile contains syntax error in line {line}."]}
- )
- except IndexError:
- raise serializers.ValidationError(
- {"zonefile": [f"Could not parse zonefile: {str(e)}"]}
- )
- def validate(self, attrs):
- if attrs.get("zonefile") is not None:
- self.parse_zonefile(attrs.get("name"), attrs.pop("zonefile"))
- return super().validate(attrs)
- def create(self, validated_data):
- # save domain
- if (
- "minimum_ttl" not in validated_data
- and Domain(name=validated_data["name"]).is_locally_registrable
- ):
- validated_data.update(minimum_ttl=60)
- domain: Domain = super().create(validated_data)
- # save RRsets if zonefile was given
- nodes = getattr(self.import_zone, "nodes", None)
- if nodes:
- zone_name = dns.name.from_text(validated_data["name"])
- min_ttl, max_ttl = domain.minimum_ttl, settings.MAXIMUM_TTL
- data = [
- {
- "type": dns.rdatatype.to_text(rrset.rdtype),
- "ttl": max(min_ttl, min(max_ttl, rrset.ttl)),
- "subname": (owner_name - zone_name).to_text()
- if owner_name - zone_name != dns.name.empty
- else "",
- "records": [rr.to_text() for rr in rrset],
- }
- for owner_name, node in nodes.items()
- for rrset in node.rdatasets
- if (
- dns.rdatatype.to_text(rrset.rdtype)
- not in (
- RR_SET_TYPES_AUTOMATIC
- | { # do not import automatically managed record types
- "CDS",
- "CDNSKEY",
- "DNSKEY",
- } # do not import these, as this would likely be unexpected
- )
- and not (
- owner_name - zone_name == dns.name.empty
- and rrset.rdtype == dns.rdatatype.NS
- ) # ignore apex NS
- )
- ]
- rrset_list_serializer = RRsetSerializer(
- data=data, context=dict(domain=domain), many=True
- )
- # The following line raises if data passed validation by dnspython during zone file parsing,
- # but is rejected by validation in RRsetSerializer. See also
- # test_create_domain_zonefile_import_validation
- try:
- rrset_list_serializer.is_valid(raise_exception=True)
- except serializers.ValidationError as e:
- if isinstance(e.detail, serializers.ReturnList):
- # match the order of error messages with the RRsets provided to the
- # serializer to make sense to the client
- def fqdn(idx):
- return (data[idx]["subname"] + "." + domain.name).lstrip(".")
- raise serializers.ValidationError(
- {
- "zonefile": [
- f"{fqdn(idx)}/{data[idx]['type']}: {err}"
- for idx, d in enumerate(e.detail)
- for _, errs in d.items()
- for err in errs
- ]
- }
- )
- raise e
- rrset_list_serializer.save()
- return domain
|