import dns.name import dns.zone from rest_framework import serializers from api import settings from desecapi.models import Domain, RR_SET_TYPES_AUTOMATIC from desecapi.validators import ReadOnlyOnUpdateValidator from .records import RRsetSerializer class DomainSerializer(serializers.ModelSerializer): default_error_messages = { **serializers.Serializer.default_error_messages, 'name_unavailable': 'This domain name conflicts with an existing zone, or is disallowed by policy.', } zonefile = serializers.CharField(write_only=True, required=False, allow_blank=True) class Meta: model = Domain fields = ('created', 'published', 'name', 'keys', 'minimum_ttl', 'touched', 'zonefile') read_only_fields = ('published', 'minimum_ttl',) extra_kwargs = { 'name': {'trim_whitespace': False}, } def __init__(self, *args, include_keys=False, **kwargs): self.include_keys = include_keys self.import_zone = None super().__init__(*args, **kwargs) def get_fields(self): fields = super().get_fields() if not self.include_keys: fields.pop('keys') fields['name'].validators.append(ReadOnlyOnUpdateValidator()) return fields def validate_name(self, value): if not Domain(name=value, owner=self.context['request'].user).is_registrable(): raise serializers.ValidationError(self.default_error_messages['name_unavailable'], code='name_unavailable') return value def parse_zonefile(self, domain_name: str, zonefile: str): try: self.import_zone = dns.zone.from_text( zonefile, origin=dns.name.from_text(domain_name), allow_include=False, check_origin=False, relativize=False, ) except dns.zonefile.CNAMEAndOtherData: raise serializers.ValidationError( {'zonefile': ['No other records with the same name are allowed alongside a CNAME record.']}) except ValueError as e: if 'has non-origin SOA' in str(e): raise serializers.ValidationError( {'zonefile': [f'Zonefile includes an SOA record for a name different from {domain_name}.']}) raise e except dns.exception.SyntaxError as e: try: line = str(e).split(':')[1] raise serializers.ValidationError({'zonefile': [f'Zonefile contains syntax error in line {line}.']}) except IndexError: raise serializers.ValidationError({'zonefile': [f'Could not parse zonefile: {str(e)}']}) def validate(self, attrs): if attrs.get('zonefile') is not None: self.parse_zonefile(attrs.get('name'), attrs.pop('zonefile')) return super().validate(attrs) def create(self, validated_data): # save domain if 'minimum_ttl' not in validated_data and Domain(name=validated_data['name']).is_locally_registrable: validated_data.update(minimum_ttl=60) domain: Domain = super().create(validated_data) # save RRsets if zonefile was given nodes = getattr(self.import_zone, 'nodes', None) if nodes: zone_name = dns.name.from_text(validated_data['name']) min_ttl, max_ttl = domain.minimum_ttl, settings.MAXIMUM_TTL data = [ { 'type': dns.rdatatype.to_text(rrset.rdtype), 'ttl': max(min_ttl, min(max_ttl, rrset.ttl)), 'subname': (owner_name - zone_name).to_text() if owner_name - zone_name != dns.name.empty else '', 'records': [rr.to_text() for rr in rrset], } for owner_name, node in nodes.items() for rrset in node.rdatasets if ( dns.rdatatype.to_text(rrset.rdtype) not in ( RR_SET_TYPES_AUTOMATIC | # do not import automatically managed record types {'CDS', 'CDNSKEY', 'DNSKEY'} # do not import these, as this would likely be unexpected ) and not (owner_name - zone_name == dns.name.empty and rrset.rdtype == dns.rdatatype.NS) # ignore apex NS ) ] rrset_list_serializer = RRsetSerializer(data=data, context=dict(domain=domain), many=True) # The following line raises if data passed validation by dnspython during zone file parsing, # but is rejected by validation in RRsetSerializer. See also # test_create_domain_zonefile_import_validation try: rrset_list_serializer.is_valid(raise_exception=True) except serializers.ValidationError as e: if isinstance(e.detail, serializers.ReturnList): # match the order of error messages with the RRsets provided to the # serializer to make sense to the client def fqdn(idx): return (data[idx]['subname'] + "." + domain.name).lstrip('.') raise serializers.ValidationError({ 'zonefile': [ f"{fqdn(idx)}/{data[idx]['type']}: {err}" for idx, d in enumerate(e.detail) for _, errs in d.items() for err in errs ] }) raise e rrset_list_serializer.save() return domain