tokens.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import django.core.exceptions
  2. from django.http import Http404
  3. from rest_framework import viewsets
  4. from rest_framework.exceptions import ValidationError
  5. from rest_framework.generics import get_object_or_404, RetrieveAPIView
  6. from rest_framework.permissions import IsAuthenticated, SAFE_METHODS
  7. from rest_framework.response import Response
  8. from rest_framework.reverse import reverse
  9. from desecapi import permissions
  10. from desecapi.models import Token
  11. from desecapi.serializers import TokenDomainPolicySerializer, TokenSerializer
  12. from .base import IdempotentDestroyMixin
  13. class TokenViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
  14. serializer_class = TokenSerializer
  15. permission_classes = (
  16. IsAuthenticated,
  17. permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
  18. permissions.HasManageTokensPermission,
  19. )
  20. throttle_scope = "account_management_passive"
  21. def get_queryset(self):
  22. return self.request.user.token_set.all()
  23. def get_serializer(self, *args, **kwargs):
  24. # When creating a new token, return the plaintext representation
  25. if self.action == "create":
  26. kwargs.setdefault("include_plain", True)
  27. return super().get_serializer(*args, **kwargs)
  28. def perform_create(self, serializer):
  29. serializer.save(user=self.request.user)
  30. class TokenPoliciesRoot(RetrieveAPIView):
  31. serializer_class = TokenSerializer
  32. lookup_url_kwarg = "token_id"
  33. throttle_scope = "account_management_passive"
  34. permission_classes = [
  35. IsAuthenticated,
  36. permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
  37. permissions.HasManageTokensPermission
  38. | permissions.AuthTokenCorrespondsToViewToken,
  39. ]
  40. def get_queryset(self):
  41. return self.request.user.token_set.all()
  42. def get(self, request, *args, **kwargs):
  43. self.get_object() # raises if token does not exist
  44. return Response(
  45. {
  46. "domain": reverse(
  47. "token_domain_policies-list", request=request, kwargs=kwargs
  48. )
  49. }
  50. )
  51. class TokenDomainPolicyViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
  52. pagination_class = None
  53. serializer_class = TokenDomainPolicySerializer
  54. throttle_scope = "account_management_passive"
  55. @property
  56. def permission_classes(self):
  57. ret = [
  58. IsAuthenticated,
  59. permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
  60. ]
  61. if self.request.method in SAFE_METHODS:
  62. ret.append(
  63. permissions.HasManageTokensPermission
  64. | permissions.AuthTokenCorrespondsToViewToken
  65. )
  66. else:
  67. ret.append(permissions.HasManageTokensPermission)
  68. return ret
  69. def create(self, request, *args, **kwargs):
  70. try:
  71. return super().create(request, *args, **kwargs)
  72. except Token.DoesNotExist:
  73. raise Http404
  74. def get_queryset(self):
  75. qs = Token.objects.filter(user=self.request.user)
  76. return get_object_or_404(qs, pk=self.kwargs["token_id"]).tokendomainpolicy_set
  77. def perform_destroy(self, instance):
  78. try:
  79. super().perform_destroy(instance)
  80. except django.core.exceptions.ValidationError as exc:
  81. raise ValidationError(exc.message_dict, code="precedence")