tokens.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import django.core.exceptions
  2. from rest_framework import viewsets
  3. from rest_framework.exceptions import ValidationError
  4. from rest_framework.permissions import IsAuthenticated, SAFE_METHODS
  5. from rest_framework.response import Response
  6. from rest_framework.reverse import reverse
  7. from rest_framework.views import APIView
  8. from desecapi import permissions
  9. from desecapi.models import TokenDomainPolicy
  10. from desecapi.serializers import TokenDomainPolicySerializer, TokenSerializer
  11. from .base import IdempotentDestroyMixin
  12. from .domains import DomainViewSet
  13. class TokenViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
  14. serializer_class = TokenSerializer
  15. permission_classes = (
  16. IsAuthenticated,
  17. permissions.HasManageTokensPermission,
  18. )
  19. throttle_scope = "account_management_passive"
  20. def get_queryset(self):
  21. return self.request.user.token_set.all()
  22. def get_serializer(self, *args, **kwargs):
  23. # When creating a new token, return the plaintext representation
  24. if self.action == "create":
  25. kwargs.setdefault("include_plain", True)
  26. return super().get_serializer(*args, **kwargs)
  27. def perform_create(self, serializer):
  28. serializer.save(user=self.request.user)
  29. class TokenPoliciesRoot(APIView):
  30. permission_classes = [
  31. IsAuthenticated,
  32. permissions.HasManageTokensPermission
  33. | permissions.AuthTokenCorrespondsToViewToken,
  34. ]
  35. def get(self, request, *args, **kwargs):
  36. return Response(
  37. {
  38. "domain": reverse(
  39. "token_domain_policies-list", request=request, kwargs=kwargs
  40. )
  41. }
  42. )
  43. class TokenDomainPolicyViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
  44. lookup_field = "domain__name"
  45. lookup_value_regex = DomainViewSet.lookup_value_regex
  46. pagination_class = None
  47. serializer_class = TokenDomainPolicySerializer
  48. throttle_scope = "account_management_passive"
  49. @property
  50. def permission_classes(self):
  51. ret = [IsAuthenticated]
  52. if self.request.method in SAFE_METHODS:
  53. ret.append(
  54. permissions.HasManageTokensPermission
  55. | permissions.AuthTokenCorrespondsToViewToken
  56. )
  57. else:
  58. ret.append(permissions.HasManageTokensPermission)
  59. return ret
  60. def dispatch(self, request, *args, **kwargs):
  61. # map default policy onto domain_id IS NULL
  62. lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
  63. try:
  64. if kwargs[lookup_url_kwarg] == "default":
  65. kwargs[lookup_url_kwarg] = None
  66. except KeyError:
  67. pass
  68. return super().dispatch(request, *args, **kwargs)
  69. def get_queryset(self):
  70. return TokenDomainPolicy.objects.filter(
  71. token_id=self.kwargs["token_id"], token__user=self.request.user
  72. )
  73. def perform_destroy(self, instance):
  74. try:
  75. super().perform_destroy(instance)
  76. except django.core.exceptions.ValidationError as exc:
  77. raise ValidationError(exc.message_dict, code="precedence")