validators.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from django.db import DataError
  2. from django.db.models import Model
  3. from rest_framework import exceptions, serializers
  4. from rest_framework.exceptions import ValidationError
  5. from rest_framework.validators import qs_exists, qs_filter, UniqueTogetherValidator
  6. from desecapi.permissions import TokenHasRRsetPermission
  7. def qs_exclude(queryset, **kwargs):
  8. try:
  9. return queryset.exclude(**kwargs)
  10. except (TypeError, ValueError, DataError):
  11. return queryset.none()
  12. class ExclusionConstraintValidator(UniqueTogetherValidator):
  13. """
  14. Validator that implements ExclusionConstraints, currently very basic with support for one field only.
  15. Should be applied to the serializer class, not to an individual field.
  16. No-op if parent serializer is a list serializer (many=True). We expect the list serializer to assure exclusivity.
  17. """
  18. message = "This field violates an exclusion constraint."
  19. def __init__(self, queryset, fields, exclusion_condition, message=None):
  20. super().__init__(queryset, fields, message)
  21. self.exclusion_condition = exclusion_condition
  22. def filter_queryset(self, attrs, queryset, serializer):
  23. qs = super().filter_queryset(attrs, queryset, serializer)
  24. # Determine the exclusion filters and prepare the queryset.
  25. field_name = self.exclusion_condition[0]
  26. value = self.exclusion_condition[1]
  27. source = serializer.fields[field_name].source
  28. if serializer.instance is not None:
  29. if source not in attrs:
  30. attrs[source] = getattr(serializer.instance, source)
  31. exclusion_method = qs_exclude if attrs[source] == value else qs_filter
  32. return exclusion_method(qs, **{field_name: value})
  33. def __call__(self, attrs, serializer, *args, **kwargs):
  34. # Ignore validation if the many flag is set
  35. if getattr(serializer.root, "many", False):
  36. return
  37. self.enforce_required_fields(attrs, serializer)
  38. queryset = self.queryset
  39. queryset = self.filter_queryset(attrs, queryset, serializer)
  40. queryset = self.exclude_current_instance(attrs, queryset, serializer.instance)
  41. # Ignore validation if any field is None
  42. checked_values = [
  43. value for field, value in attrs.items() if field in self.fields
  44. ]
  45. if None not in checked_values and qs_exists(queryset):
  46. types = queryset.values_list("type", flat=True)
  47. types = ", ".join(types)
  48. message = self.message.format(types=types)
  49. raise ValidationError(message, code="exclusive")
  50. class PermissionValidator:
  51. """
  52. Validator that checks write permission for an RRset.
  53. """
  54. requires_context = True
  55. def __call__(self, attrs, serializer):
  56. # On the RRsetDetail apex endpoint, subname is not in attrs
  57. subname = attrs.get("subname")
  58. if subname is None:
  59. subname = serializer.context["view"].kwargs["subname"]
  60. # On the RRsetDetail endpoint, the type is not in attrs
  61. type_ = attrs.get("type") or serializer.instance.type
  62. rrset = serializer.Meta.model(
  63. domain=serializer.domain, subname=subname, type=type_
  64. )
  65. permission = TokenHasRRsetPermission()
  66. if not permission.has_object_permission(
  67. serializer.context.get("request"), None, rrset
  68. ):
  69. raise exceptions.PermissionDenied(
  70. detail=getattr(permission, "message", None),
  71. code=getattr(permission, "code", None),
  72. )
  73. class Validator:
  74. message = "This field did not pass validation."
  75. def __init__(self, message=None):
  76. self.field_name = None
  77. self.message = message or self.message
  78. self.instance = None
  79. def __call__(self, value):
  80. raise NotImplementedError
  81. def __repr__(self):
  82. return "<%s>" % self.__class__.__name__
  83. class ReadOnlyOnUpdateValidator(Validator):
  84. message = "Can only be written on create."
  85. requires_context = True
  86. def __call__(self, value, serializer_field):
  87. field_name = serializer_field.source_attrs[-1]
  88. instance = getattr(serializer_field.parent, "instance", None)
  89. if isinstance(instance, Model) and value != getattr(instance, field_name):
  90. raise serializers.ValidationError(self.message, code="read-only-on-update")