123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import dns.name
- import dns.zone
- from rest_framework import serializers
- from api import settings
- from desecapi.models import Domain, RR_SET_TYPES_AUTOMATIC
- from desecapi.validators import ReadOnlyOnUpdateValidator
- from .records import RRsetSerializer
- class DomainSerializer(serializers.ModelSerializer):
- default_error_messages = {
- **serializers.Serializer.default_error_messages,
- 'name_unavailable': 'This domain name conflicts with an existing zone, or is disallowed by policy.',
- }
- zonefile = serializers.CharField(write_only=True, required=False, allow_blank=True)
- class Meta:
- model = Domain
- fields = ('created', 'published', 'name', 'keys', 'minimum_ttl', 'touched', 'zonefile')
- read_only_fields = ('published', 'minimum_ttl',)
- extra_kwargs = {
- 'name': {'trim_whitespace': False},
- }
- def __init__(self, *args, include_keys=False, **kwargs):
- self.include_keys = include_keys
- self.import_zone = None
- super().__init__(*args, **kwargs)
- def get_fields(self):
- fields = super().get_fields()
- if not self.include_keys:
- fields.pop('keys')
- fields['name'].validators.append(ReadOnlyOnUpdateValidator())
- return fields
- def validate_name(self, value):
- if not Domain(name=value, owner=self.context['request'].user).is_registrable():
- raise serializers.ValidationError(self.default_error_messages['name_unavailable'], code='name_unavailable')
- return value
- def parse_zonefile(self, domain_name: str, zonefile: str):
- try:
- self.import_zone = dns.zone.from_text(
- zonefile,
- origin=dns.name.from_text(domain_name),
- allow_include=False,
- check_origin=False,
- relativize=False,
- )
- except dns.zonefile.CNAMEAndOtherData:
- raise serializers.ValidationError(
- {'zonefile': ['No other records with the same name are allowed alongside a CNAME record.']})
- except ValueError as e:
- if 'has non-origin SOA' in str(e):
- raise serializers.ValidationError(
- {'zonefile': [f'Zonefile includes an SOA record for a name different from {domain_name}.']})
- raise e
- except dns.exception.SyntaxError as e:
- try:
- line = str(e).split(':')[1]
- raise serializers.ValidationError({'zonefile': [f'Zonefile contains syntax error in line {line}.']})
- except IndexError:
- raise serializers.ValidationError({'zonefile': [f'Could not parse zonefile: {str(e)}']})
- def validate(self, attrs):
- if attrs.get('zonefile') is not None:
- self.parse_zonefile(attrs.get('name'), attrs.pop('zonefile'))
- return super().validate(attrs)
- def create(self, validated_data):
- # save domain
- if 'minimum_ttl' not in validated_data and Domain(name=validated_data['name']).is_locally_registrable:
- validated_data.update(minimum_ttl=60)
- domain: Domain = super().create(validated_data)
- # save RRsets if zonefile was given
- nodes = getattr(self.import_zone, 'nodes', None)
- if nodes:
- zone_name = dns.name.from_text(validated_data['name'])
- min_ttl, max_ttl = domain.minimum_ttl, settings.MAXIMUM_TTL
- data = [
- {
- 'type': dns.rdatatype.to_text(rrset.rdtype),
- 'ttl': max(min_ttl, min(max_ttl, rrset.ttl)),
- 'subname': (owner_name - zone_name).to_text() if owner_name - zone_name != dns.name.empty else '',
- 'records': [rr.to_text() for rr in rrset],
- }
- for owner_name, node in nodes.items()
- for rrset in node.rdatasets
- if (
- dns.rdatatype.to_text(rrset.rdtype) not in (
- RR_SET_TYPES_AUTOMATIC | # do not import automatically managed record types
- {'CDS', 'CDNSKEY', 'DNSKEY'} # do not import these, as this would likely be unexpected
- )
- and not (owner_name - zone_name == dns.name.empty and rrset.rdtype == dns.rdatatype.NS) # ignore apex NS
- )
- ]
- rrset_list_serializer = RRsetSerializer(data=data, context=dict(domain=domain), many=True)
- # The following line raises if data passed validation by dnspython during zone file parsing,
- # but is rejected by validation in RRsetSerializer. See also
- # test_create_domain_zonefile_import_validation
- try:
- rrset_list_serializer.is_valid(raise_exception=True)
- except serializers.ValidationError as e:
- if isinstance(e.detail, serializers.ReturnList):
- # match the order of error messages with the RRsets provided to the
- # serializer to make sense to the client
- def fqdn(idx): return (data[idx]['subname'] + "." + domain.name).lstrip('.')
- raise serializers.ValidationError({
- 'zonefile': [
- f"{fqdn(idx)}/{data[idx]['type']}: {err}"
- for idx, d in enumerate(e.detail)
- for _, errs in d.items()
- for err in errs
- ]
- })
- raise e
- rrset_list_serializer.save()
- return domain
|