123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- import binascii
- import json
- from datetime import timedelta
- from rest_framework import fields, serializers
- from rest_framework.settings import api_settings
- from rest_framework.validators import UniqueValidator, qs_filter
- from api import settings
- from desecapi import crypto, models
- from .captcha import CaptchaSolutionSerializer
- class CustomFieldNameUniqueValidator(UniqueValidator):
- """
- Does exactly what rest_framework's UniqueValidator does, however allows to further customize the
- query that is used to determine the uniqueness.
- More specifically, we allow that the field name the value is queried against is passed when initializing
- this validator. (At the time of writing, UniqueValidator insists that the field's name is used for the
- database query field; only how the lookup must match is allowed to be changed.)
- """
- def __init__(self, queryset, message=None, lookup="exact", lookup_field=None):
- self.lookup_field = lookup_field
- super().__init__(queryset, message, lookup)
- def filter_queryset(self, value, queryset, field_name):
- """
- Filter the queryset to all instances matching the given value on the specified lookup field.
- """
- filter_kwargs = {
- "%s__%s" % (self.lookup_field or field_name, self.lookup): value
- }
- return qs_filter(queryset, **filter_kwargs)
- class AuthenticatedActionSerializer(serializers.ModelSerializer):
- state = serializers.CharField() # serializer read-write, but model read-only field
- validity_period = settings.VALIDITY_PERIOD_VERIFICATION_SIGNATURE
- _crypto_context = "desecapi.serializers.AuthenticatedActionSerializer"
- timestamp = None # is set to the code's timestamp during validation
- class Meta:
- model = models.AuthenticatedAction
- fields = ("state",)
- @classmethod
- def _pack_code(cls, data):
- payload = json.dumps(data).encode()
- code = crypto.encrypt(payload, context=cls._crypto_context).decode()
- return code.rstrip("=")
- @classmethod
- def _unpack_code(cls, code, *, ttl):
- code += -len(code) % 4 * "="
- try:
- timestamp, payload = crypto.decrypt(
- code.encode(), context=cls._crypto_context, ttl=ttl
- )
- return timestamp, json.loads(payload.decode())
- except (
- TypeError,
- UnicodeDecodeError,
- UnicodeEncodeError,
- json.JSONDecodeError,
- binascii.Error,
- ):
- raise ValueError
- def to_representation(self, instance: models.AuthenticatedAction):
- # do the regular business
- data = super().to_representation(instance)
- # encode into single string
- return {"code": self._pack_code(data)}
- def to_internal_value(self, data):
- # Allow injecting validity period from context. This is used, for example, for authentication, where the code's
- # integrity and timestamp is checked by AuthenticatedBasicUserActionSerializer with validity injected as needed.
- validity_period = self.context.get("validity_period", self.validity_period)
- # calculate code TTL
- try:
- ttl = validity_period.total_seconds()
- except AttributeError:
- ttl = None # infinite
- # decode from single string
- try:
- self.timestamp, unpacked_data = self._unpack_code(
- self.context["code"], ttl=ttl
- )
- except KeyError:
- raise serializers.ValidationError({"code": ["This field is required."]})
- except ValueError:
- if ttl is None:
- msg = "This code is invalid."
- else:
- msg = f"This code is invalid, possibly because it expired (validity: {validity_period})."
- raise serializers.ValidationError({api_settings.NON_FIELD_ERRORS_KEY: msg})
- # add extra fields added by the user, but give precedence to fields unpacked from the code
- data = {**data, **unpacked_data}
- # do the regular business
- return super().to_internal_value(data)
- def act(self):
- self.instance.act()
- return self.instance
- def save(self, **kwargs):
- raise ValueError
- class AuthenticatedBasicUserActionMixin:
- def save(self, **kwargs):
- context = {**self.context, "action_serializer": self}
- return self.action_user.send_email(self.reason, context=context, **kwargs)
- class AuthenticatedBasicUserActionSerializer(
- AuthenticatedBasicUserActionMixin, AuthenticatedActionSerializer
- ):
- user = serializers.PrimaryKeyRelatedField(
- queryset=models.User.objects.all(),
- error_messages={"does_not_exist": "This user does not exist."},
- pk_field=serializers.UUIDField(),
- )
- reason = None
- class Meta:
- model = models.AuthenticatedBasicUserAction
- fields = AuthenticatedActionSerializer.Meta.fields + ("user",)
- @property
- def action_user(self):
- return self.instance.user
- @classmethod
- def build_and_save(cls, **kwargs):
- action = cls.Meta.model(**kwargs)
- return cls(action).save()
- class AuthenticatedBasicUserActionListSerializer(
- AuthenticatedBasicUserActionMixin, serializers.ListSerializer
- ):
- @property
- def reason(self):
- return self.child.reason
- @property
- def action_user(self):
- user = self.instance[0].user
- if any(instance.user != user for instance in self.instance):
- raise ValueError("Actions must belong to the same user.")
- return user
- class AuthenticatedChangeOutreachPreferenceUserActionSerializer(
- AuthenticatedBasicUserActionSerializer
- ):
- reason = "change-outreach-preference"
- validity_period = None
- class Meta:
- model = models.AuthenticatedChangeOutreachPreferenceUserAction
- fields = AuthenticatedBasicUserActionSerializer.Meta.fields + (
- "outreach_preference",
- )
- class AuthenticatedActivateUserActionSerializer(AuthenticatedBasicUserActionSerializer):
- captcha = CaptchaSolutionSerializer(required=False)
- reason = "activate-account"
- class Meta(AuthenticatedBasicUserActionSerializer.Meta):
- model = models.AuthenticatedActivateUserAction
- fields = AuthenticatedBasicUserActionSerializer.Meta.fields + (
- "captcha",
- "domain",
- )
- extra_kwargs = {"domain": {"default": None, "allow_null": True}}
- def validate(self, attrs):
- try:
- attrs.pop(
- "captcha"
- ) # remove captcha from internal value to avoid passing to Meta.model(**kwargs)
- except KeyError:
- if attrs["user"].needs_captcha:
- raise serializers.ValidationError(
- {"captcha": fields.Field.default_error_messages["required"]}
- )
- return attrs
- class AuthenticatedChangeEmailUserActionSerializer(
- AuthenticatedBasicUserActionSerializer
- ):
- new_email = serializers.EmailField(
- validators=[
- CustomFieldNameUniqueValidator(
- queryset=models.User.objects.all(),
- lookup_field="email",
- message="You already have another account with this email address.",
- )
- ],
- required=True,
- )
- reason = "change-email"
- class Meta(AuthenticatedBasicUserActionSerializer.Meta):
- model = models.AuthenticatedChangeEmailUserAction
- fields = AuthenticatedBasicUserActionSerializer.Meta.fields + ("new_email",)
- def save(self):
- return super().save(recipient=self.instance.new_email)
- class AuthenticatedConfirmAccountUserActionSerializer(
- AuthenticatedBasicUserActionSerializer
- ):
- reason = "confirm-account"
- validity_period = timedelta(days=14)
- class Meta(AuthenticatedBasicUserActionSerializer.Meta):
- model = (
- models.AuthenticatedNoopUserAction
- ) # confirmation happens during authentication, so nothing left to do
- class AuthenticatedResetPasswordUserActionSerializer(
- AuthenticatedBasicUserActionSerializer
- ):
- new_password = serializers.CharField(write_only=True)
- reason = "reset-password"
- class Meta(AuthenticatedBasicUserActionSerializer.Meta):
- model = models.AuthenticatedResetPasswordUserAction
- fields = AuthenticatedBasicUserActionSerializer.Meta.fields + ("new_password",)
- class AuthenticatedDeleteUserActionSerializer(AuthenticatedBasicUserActionSerializer):
- reason = "delete-account"
- class Meta(AuthenticatedBasicUserActionSerializer.Meta):
- model = models.AuthenticatedDeleteUserAction
- class AuthenticatedDomainBasicUserActionSerializer(
- AuthenticatedBasicUserActionSerializer
- ):
- domain = serializers.PrimaryKeyRelatedField(
- queryset=models.Domain.objects.all(),
- error_messages={"does_not_exist": "This domain does not exist."},
- )
- class Meta:
- model = models.AuthenticatedDomainBasicUserAction
- fields = AuthenticatedBasicUserActionSerializer.Meta.fields + ("domain",)
- class AuthenticatedRenewDomainBasicUserActionSerializer(
- AuthenticatedDomainBasicUserActionSerializer
- ):
- reason = "renew-domain"
- validity_period = None
- class Meta(AuthenticatedDomainBasicUserActionSerializer.Meta):
- model = models.AuthenticatedRenewDomainBasicUserAction
- list_serializer_class = AuthenticatedBasicUserActionListSerializer
|