|
- import re
- import django.core.exceptions
- from django.core.validators import RegexValidator
- from django.db import models, transaction
- from djoser import serializers as djoser_serializers
- from rest_framework import serializers
- from rest_framework.exceptions import ValidationError
- from rest_framework.fields import empty
- from rest_framework.settings import api_settings
- from rest_framework_bulk import BulkListSerializer, BulkSerializerMixin
- from desecapi.models import Domain, Donation, User, RR, RRset, Token
- class TokenSerializer(serializers.ModelSerializer):
- value = serializers.ReadOnlyField(source='key')
- # note this overrides the original "id" field, which is the db primary key
- id = serializers.ReadOnlyField(source='user_specific_id')
- class Meta:
- model = Token
- fields = ('id', 'created', 'name', 'value',)
- read_only_fields = ('created', 'value', 'id')
- class RRSerializer(serializers.ModelSerializer):
- class Meta:
- model = RR
- fields = ('content',)
- def to_internal_value(self, data):
- if not isinstance(data, dict):
- data = {'content': data}
- return super().to_internal_value(data)
- class RRsetBulkListSerializer(BulkListSerializer):
- default_error_messages = {'not_a_list': 'Invalid input, expected a list of RRsets.'}
- @transaction.atomic
- def update(self, queryset, validated_data):
- q = models.Q(pk__isnull=True)
- for data in validated_data:
- q |= models.Q(subname=data.get('subname', ''), type=data['type'])
- rrsets = {(obj.subname, obj.type): obj for obj in queryset.filter(q)}
- instance = [rrsets.get((data.get('subname', ''), data['type']), None)
- for data in validated_data]
- # noinspection PyUnresolvedReferences,PyProtectedMember
- return self.child._save(instance, validated_data)
- @transaction.atomic
- def create(self, validated_data):
- # noinspection PyUnresolvedReferences,PyProtectedMember
- return self.child._save([None] * len(validated_data), validated_data)
- class RequiredOnPartialUpdateCharField(serializers.CharField):
- """
- This field is always required, even for partial updates (e.g. using PATCH).
- """
- def validate_empty_values(self, data):
- if data is empty:
- self.fail('required')
- return super().validate_empty_values(data)
- class SlugRRField(serializers.SlugRelatedField):
- def __init__(self, *args, **kwargs):
- kwargs['slug_field'] = 'content'
- kwargs['queryset'] = RR.objects.all()
- super().__init__(*args, **kwargs)
- def to_internal_value(self, data):
- return RR(**{self.slug_field: data})
- class RRsetSerializer(BulkSerializerMixin, serializers.ModelSerializer):
- domain = serializers.SlugRelatedField(read_only=True, slug_field='name')
- subname = serializers.CharField(
- allow_blank=True,
- required=False,
- validators=[RegexValidator(
- regex=r'^\*?[a-z\.\-_0-9]*$',
- message='Subname can only use (lowercase) a-z, 0-9, ., -, and _.',
- code='invalid_subname'
- )]
- )
- type = RequiredOnPartialUpdateCharField(
- allow_blank=False,
- required=True,
- validators=[RegexValidator(
- regex=r'^[A-Z][A-Z0-9]*$',
- message='Type must be uppercase alphanumeric and start with a letter.',
- code='invalid_type'
- )]
- )
- records = SlugRRField(many=True)
- class Meta:
- model = RRset
- fields = ('id', 'domain', 'subname', 'name', 'records', 'ttl', 'type',)
- list_serializer_class = RRsetBulkListSerializer
- def _save(self, instance, validated_data):
- bulk = isinstance(instance, list)
- if not bulk:
- instance = [instance]
- validated_data = [validated_data]
- name = self.context['view'].kwargs['name']
- domain = self.context['request'].user.domains.get(name=name)
- method = self.context['request'].method
- errors = []
- rrsets = {}
- rrsets_seen = set()
- for rrset, data in zip(instance, validated_data):
- # Construct RRset
- records = data.pop('records', None)
- if rrset:
- # We have a known instance (update). Update fields if given.
- rrset.subname = data.get('subname', rrset.subname)
- rrset.type = data.get('type', rrset.type)
- rrset.ttl = data.get('ttl', rrset.ttl)
- else:
- # No known instance (creation)
- rrset_errors = {}
- if 'ttl' not in data:
- rrset_errors['ttl'] = ['This field is required for new RRsets.']
- if records is None:
- rrset_errors['records'] = ['This field is required for new RRsets.']
- if rrset_errors:
- errors.append(rrset_errors)
- continue
- data.pop('id', None)
- data['domain'] = domain
- rrset = RRset(**data)
- # Verify that we have not seen this RRset before
- if (rrset.subname, rrset.type) in rrsets_seen:
- errors.append({'__all__': ['RRset repeated with same subname and type.']})
- continue
- rrsets_seen.add((rrset.subname, rrset.type))
- # Validate RRset. Raises error if type or subname have been changed
- # or if new RRset is not unique.
- validate_unique = (method == 'POST')
- try:
- rrset.full_clean(exclude=['updated'],
- validate_unique=validate_unique)
- except django.core.exceptions.ValidationError as e:
- errors.append(e.message_dict)
- continue
- # Construct dictionary of RR lists to write, indexed by their RRset
- if records is None:
- rrsets[rrset] = None
- else:
- rr_data = [{'content': x.content} for x in records]
- # Use RRSerializer to validate records inputs
- allow_empty = (method in ('PATCH', 'PUT'))
- rr_serializer = RRSerializer(data=rr_data, many=True,
- allow_empty=allow_empty)
- if not rr_serializer.is_valid():
- error = rr_serializer.errors
- if api_settings.NON_FIELD_ERRORS_KEY in error:
- error['records'] = error.pop(api_settings.NON_FIELD_ERRORS_KEY)
- errors.append(error)
- continue
- # Blessings have been given, so add RRset to the to-write dict
- rrsets[rrset] = [RR(rrset=rrset, **rr_validated_data)
- for rr_validated_data in rr_serializer.validated_data]
- errors.append({})
- if any(errors):
- raise ValidationError(errors if bulk else errors[0])
- # Now try to save RRsets
- try:
- rrsets = domain.write_rrsets(rrsets)
- except django.core.exceptions.ValidationError as e:
- for attr in ['errors', 'error_dict', 'message']:
- detail = getattr(e, attr, None)
- if detail:
- raise ValidationError(detail)
- raise ValidationError(str(e))
- except ValueError as e:
- raise ValidationError({'__all__': str(e)})
- return rrsets if bulk else rrsets[0]
- @transaction.atomic
- def update(self, instance, validated_data):
- return self._save(instance, validated_data)
- @transaction.atomic
- def create(self, validated_data):
- return self._save(None, validated_data)
- @staticmethod
- def validate_type(value):
- if value in RRset.DEAD_TYPES:
- raise serializers.ValidationError(
- "The %s RRset type is currently unsupported." % value)
- if value in RRset.RESTRICTED_TYPES:
- raise serializers.ValidationError(
- "You cannot tinker with the %s RRset." % value)
- if value.startswith('TYPE'):
- raise serializers.ValidationError(
- "Generic type format is not supported.")
- return value
- def to_representation(self, instance):
- data = super().to_representation(instance)
- data.pop('id')
- return data
- class DomainSerializer(serializers.ModelSerializer):
- name = serializers.RegexField(regex=r'^[a-z0-9_.-]+$', max_length=191, trim_whitespace=False)
- class Meta:
- model = Domain
- fields = ('created', 'published', 'name', 'keys')
- class DonationSerializer(serializers.ModelSerializer):
- class Meta:
- model = Donation
- fields = ('name', 'iban', 'bic', 'amount', 'message', 'email')
- @staticmethod
- def validate_bic(value):
- return re.sub(r'[\s]', '', value)
- @staticmethod
- def validate_iban(value):
- return re.sub(r'[\s]', '', value)
- class UserSerializer(djoser_serializers.UserSerializer):
- locked = serializers.SerializerMethodField()
- class Meta(djoser_serializers.UserSerializer.Meta):
- fields = tuple(User.REQUIRED_FIELDS) + (
- User.USERNAME_FIELD,
- 'dyn',
- 'limit_domains',
- 'locked',
- )
- read_only_fields = ('dyn', 'limit_domains', 'locked',)
- @staticmethod
- def get_locked(obj):
- return bool(obj.locked)
- class UserCreateSerializer(djoser_serializers.UserCreateSerializer):
- class Meta(djoser_serializers.UserCreateSerializer.Meta):
- fields = tuple(User.REQUIRED_FIELDS) + (
- User.USERNAME_FIELD,
- 'password',
- 'dyn',
- )
|