123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- from django.db import transaction
- from django.db.utils import IntegrityError
- from rest_framework import status
- from rest_framework.test import APIClient
- from desecapi import models
- from desecapi.tests.base import DomainOwnerTestCase
- class TokenDomainPolicyClient(APIClient):
- def _request(self, method, url, *, using, **kwargs):
- if using is not None:
- kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
- return method(url, **kwargs)
- def _request_policy(self, method, target, *, using, policy_id, **kwargs):
- url = DomainOwnerTestCase.reverse(
- "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
- )
- return self._request(method, url, using=using, **kwargs)
- def _request_policies(self, method, target, *, using, **kwargs):
- url = DomainOwnerTestCase.reverse(
- "v1:token_domain_policies-list", token_id=target.id
- )
- return self._request(method, url, using=using, **kwargs)
- def list_policies(self, target, *, using):
- return self._request_policies(self.get, target, using=using)
- def create_policy(self, target, *, using, **kwargs):
- return self._request_policies(self.post, target, using=using, **kwargs)
- def get_policy(self, target, *, using, policy_id):
- return self._request_policy(self.get, target, using=using, policy_id=policy_id)
- def patch_policy(self, target, *, using, policy_id, **kwargs):
- return self._request_policy(
- self.patch, target, using=using, policy_id=policy_id, **kwargs
- )
- def delete_policy(self, target, *, using, policy_id):
- return self._request_policy(
- self.delete, target, using=using, policy_id=policy_id
- )
- class TokenDomainPolicyTestCase(DomainOwnerTestCase):
- client_class = TokenDomainPolicyClient
- default_data = dict(perm_write=False)
- def setUp(self):
- super().setUp()
- self.client.credentials() # remove default credential (corresponding to domain owner)
- self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
- self.other_token = self.create_token(self.user)
- def test_get_policy(self):
- def get_policy(domain, subname, type):
- return self.token.get_policy(
- models.RRset(domain=domain, subname=subname, type=type)
- )
- def assertPolicy(policy, domain, subname, type):
- self.assertEqual(policy.domain, domain)
- self.assertEqual(policy.subname, subname)
- self.assertEqual(policy.type, type)
- qs = self.token.tokendomainpolicy_set
- # Default policy is fallback for everything
- qs.create(domain=None, subname=None, type=None)
- for kwargs in [
- dict(subname=subname, type=type_)
- for subname in (None, "www")
- for type_ in (None, "A")
- ]:
- policy = get_policy(self.my_domain, **kwargs)
- assertPolicy(policy, None, None, None)
- # Type wins over default
- qs.create(domain=None, subname=None, type="A")
- policy = get_policy(self.my_domain, "www", "A")
- assertPolicy(policy, None, None, "A")
- # Subname wins over type
- qs.create(domain=None, subname="www", type=None)
- policy = get_policy(self.my_domain, "www", "A")
- assertPolicy(policy, None, "www", None)
- # Most specific wins
- qs.create(domain=None, subname="www", type="A")
- policy = get_policy(self.my_domain, "www", "A")
- assertPolicy(policy, None, "www", "A")
- # Domain wins over default and over subname and type
- qs.create(domain=self.my_domain, subname=None, type=None)
- policy = get_policy(self.my_domain, None, None)
- assertPolicy(policy, self.my_domain, None, None)
- # Subname wins over default or domain default
- qs.create(domain=self.my_domain, subname="www", type=None)
- for kwargs in [
- dict(subname="www", type=None),
- dict(subname="www", type="A"),
- ]:
- policy = get_policy(self.my_domain, **kwargs)
- assertPolicy(policy, self.my_domain, "www", None)
- # Type wins over default or domain default
- qs.create(domain=self.my_domain, subname=None, type="A")
- for kwargs in [
- dict(subname=None, type="A"),
- dict(subname="www2", type="A"),
- ]:
- policy = get_policy(self.my_domain, **kwargs)
- assertPolicy(policy, self.my_domain, None, "A")
- # Subname wins over type
- policy = get_policy(self.my_domain, "www", "A")
- assertPolicy(policy, self.my_domain, "www", None)
- # Subname + type wins over less specific
- qs.create(domain=self.my_domain, subname="www", type="A")
- policy = get_policy(self.my_domain, "www", "A")
- assertPolicy(policy, self.my_domain, "www", "A")
- # Check that we did all combinations
- self.assertEqual(qs.count(), 2**3)
- def test_policy_lifecycle_without_management_permission(self):
- # Prepare (with management token)
- data = {"domain": None, "subname": None, "type": None, "perm_write": True}
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- response = self.client.create_policy(
- self.token_manage, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- # Self-inspection is fine
- ## List
- response = self.client.list_policies(self.token, using=self.token)
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(len(response.data), 1)
- default_policy_id = response.data[0]["id"]
- ## Get
- response = self.client.get_policy(
- self.token, using=self.token, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(
- response.data, self.default_data | data | {"id": default_policy_id}
- )
- # Inspection of other tokens forbidden
- ## List
- response = self.client.list_policies(self.token_manage, using=self.token)
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- ## Get
- response = self.client.get_policy(
- self.token_manage, using=self.token, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- # Write operations forbidden (self and other)
- for target in [self.token, self.token_manage]:
- # Create
- response = self.client.create_policy(target, using=self.token)
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- # Change
- data = dict(perm_write=True)
- policy = target.get_policy()
- response = self.client.patch_policy(
- target, using=self.token, policy_id=policy.pk, data=data
- )
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- # Delete
- response = self.client.delete_policy(
- target, using=self.token, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- def test_policy_lifecycle(self):
- # Can't do anything unauthorized
- response = self.client.list_policies(self.token, using=None)
- self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
- response = self.client.create_policy(self.token, using=None)
- self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
- # Create
- ## without required field
- response = self.client.create_policy(self.token, using=self.token_manage)
- self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
- for field in ["domain", "subname", "type"]:
- self.assertEqual(response.data[field], ["This field is required."])
- ## without a default policy
- data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
- with transaction.atomic():
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
- self.assertEqual(
- response.data["non_field_errors"],
- ["Policy precedence: The first policy must be the default policy."],
- )
- # List: still empty
- response = self.client.list_policies(self.token, using=self.token_manage)
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, [])
- # Create
- ## default policy
- data = {"domain": None, "subname": None, "type": None, "perm_write": True}
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- default_policy_id = response.data["id"]
- ## can't create another default policy
- with transaction.atomic():
- response = self.client.create_policy(
- self.token,
- using=self.token_manage,
- data={"domain": None, "subname": None, "type": None},
- )
- self.assertStatus(response, status.HTTP_409_CONFLICT)
- ## verify object creation
- response = self.client.get_policy(
- self.token, using=self.token_manage, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(
- response.data, self.default_data | data | {"id": default_policy_id}
- )
- ## can't create policy for other user's domain
- data = {
- "domain": self.other_domain.name,
- "subname": None,
- "type": None,
- "perm_dyndns": True,
- "perm_write": True,
- }
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
- self.assertEqual(response.data["domain"][0].code, "does_not_exist")
- ## another policy
- data = {
- "domain": self.my_domains[0].name,
- "subname": None,
- "type": None,
- }
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- policy_id = response.data["id"]
- ## can't create policy for the same domain
- with transaction.atomic():
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_409_CONFLICT)
- ## verify object creation
- response = self.client.get_policy(
- self.token, using=self.token_manage, policy_id=policy_id
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
- # List: now has two elements
- response = self.client.list_policies(self.token, using=self.token_manage)
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(len(response.data), 2)
- # Change
- ## all fields of a policy
- data = dict(
- domain=self.my_domains[1].name,
- subname="_acme-challenge",
- type="TXT",
- perm_write=True,
- )
- response = self.client.patch_policy(
- self.token,
- using=self.token_manage,
- policy_id=policy_id,
- data=data,
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
- ## verify modification
- response = self.client.get_policy(
- self.token, using=self.token_manage, policy_id=policy_id
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
- ## verify that the default policy can't be changed to a non-default policy
- with transaction.atomic():
- response = self.client.patch_policy(
- self.token,
- using=self.token_manage,
- policy_id=default_policy_id,
- data=data,
- )
- self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
- self.assertEqual(
- response.data["non_field_errors"],
- ["When using policies, there must be exactly one default policy."],
- )
- ## partially modify the default policy
- data = dict()
- response = self.client.patch_policy(
- self.token, using=self.token_manage, policy_id=default_policy_id, data=data
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(
- response.data,
- {
- "id": default_policy_id,
- "domain": None,
- "subname": None,
- "type": None,
- "perm_write": True,
- }
- | data,
- )
- # Delete
- ## can't delete default policy while others exist
- with transaction.atomic():
- response = self.client.delete_policy(
- self.token, using=self.token_manage, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
- self.assertEqual(
- response.data["non_field_errors"],
- ["Policy precedence: Can't delete default policy when there exist others."],
- )
- ## delete other policy
- response = self.client.delete_policy(
- self.token, using=self.token_manage, policy_id=policy_id
- )
- self.assertStatus(response, status.HTTP_204_NO_CONTENT)
- ## delete default policy
- response = self.client.delete_policy(
- self.token, using=self.token_manage, policy_id=default_policy_id
- )
- self.assertStatus(response, status.HTTP_204_NO_CONTENT)
- ## idempotence: delete a non-existing policy
- response = self.client.delete_policy(
- self.token, using=self.token_manage, policy_id=policy_id
- )
- self.assertStatus(response, status.HTTP_204_NO_CONTENT)
- ## verify that policies are gone
- for pid in [policy_id, default_policy_id]:
- response = self.client.get_policy(
- self.token, using=self.token_manage, policy_id=pid
- )
- self.assertStatus(response, status.HTTP_404_NOT_FOUND)
- # List: empty again
- response = self.client.list_policies(self.token, using=self.token_manage)
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, [])
- def test_policy_permissions(self):
- def _reset_policies(token):
- for policy in token.tokendomainpolicy_set.all():
- for perm in self.default_data.keys():
- setattr(policy, perm, False)
- policy.save()
- # Create
- ## default policy
- data = {"domain": None, "subname": None, "type": None}
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- default_policy_id = response.data["id"]
- ## another policy
- data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
- response = self.client.create_policy(
- self.token, using=self.token_manage, data=data
- )
- self.assertStatus(response, status.HTTP_201_CREATED)
- policy_id = response.data["id"]
- ## verify object creation
- response = self.client.get_policy(
- self.token, using=self.token_manage, policy_id=policy_id
- )
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
- policy_id_by_domain = {
- self.my_domains[0]: policy_id,
- self.my_domains[1]: default_policy_id,
- }
- kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
- # For each permission type
- for perm in self.default_data.keys():
- # For the domain with specific policy and for the domain covered by the default policy
- for domain in policy_id_by_domain.keys():
- # For both possible values of the permission
- for value in [True, False]:
- # Set only that permission for that domain (on its effective policy)
- _reset_policies(self.token)
- policy = self.token.tokendomainpolicy_set.get(
- pk=policy_id_by_domain[domain]
- )
- setattr(policy, perm, value)
- policy.save()
- # Can't create domain
- data = {"name": self.random_domain_name()}
- response = self.client.post(
- self.reverse("v1:domain-list"), data, **kwargs
- )
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- # Can't delete domain
- response = self.client.delete(
- self.reverse("v1:domain-detail", name=domain), {}, **kwargs
- )
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- # Can't access account details
- response = self.client.get(self.reverse("v1:account"), **kwargs)
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- def test_dyndns_permission(self):
- def _perform_request(**kwargs):
- return self.client.get(
- self.reverse("v1:dyndns12update"),
- {
- "username": self.my_domains[1].name,
- "password": self.token.plain,
- **kwargs,
- },
- )
- def assert_allowed(**kwargs):
- response = _perform_request(**kwargs)
- self.assertStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data, "good")
- def assert_forbidden(**kwargs):
- response = _perform_request(**kwargs)
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
- self.assertEqual(response.data["detail"], "Insufficient token permissions.")
- # No policy
- assert_allowed(
- myipv4=""
- ) # empty IPv4 and delete IPv6 (no-op, prevents pdns request)
- # Default policy (deny)
- qs = self.token.tokendomainpolicy_set
- qs.create(domain=None, subname=None, type=None)
- assert_forbidden(myipv4="")
- assert_allowed(
- myipv4="preserve", myipv6="preserve"
- ) # no-op needs no permissions
- # Only A permission
- qs.create(domain=self.my_domains[1], subname=None, type="A", perm_write=True)
- assert_forbidden(myipv4="")
- assert_allowed(myipv4="", myipv6="preserve") # just IPv4
- # Only A permission
- qs.create(domain=self.my_domains[1], subname=None, type="AAAA")
- assert_forbidden(myipv4="")
- assert_allowed(myipv4="", myipv6="preserve") # just IPv4
- # A + AAAA permission
- qs.filter(domain=self.my_domains[1], type="AAAA").update(perm_write=True)
- assert_allowed(myipv4="") # empty IPv4 and delete IPv6
- # Only AAAA permission
- qs.filter(domain=self.my_domains[1], type="A").update(perm_write=False)
- assert_forbidden(myipv4="")
- assert_allowed(myipv4="preserve", myipv6="") # just IPv6
- # Update default policy to allow, but A deny policy overrides
- qs.filter(domain__isnull=True).update(perm_write=True)
- assert_forbidden(myipv4="")
- assert_allowed(myipv4="preserve", myipv6="") # just IPv6
- # AAAA (allow) and A (allow via default policy fallback)
- qs.filter(domain=self.my_domains[1], type="A").delete()
- assert_allowed(myipv4="", myipv6="")
- # Default policy (allow)
- qs.filter(domain=self.my_domains[1]).delete()
- assert_allowed(myipv4="", myipv6="")
- # No policy
- qs.filter().delete()
- assert_allowed(myipv4="", myipv6="")
- def test_domain_owner_consistency(self):
- models.TokenDomainPolicy(
- token=self.token, domain=None, subname=None, type=None
- ).save()
- domain = self.my_domains[0]
- policy = models.TokenDomainPolicy(
- token=self.token, domain=domain, subname=None, type=None
- )
- policy.save()
- domain.owner = self.other_domains[0].owner
- with self.assertRaises(IntegrityError):
- with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
- domain.save()
- policy.delete()
- domain.save()
- def test_token_user_consistency(self):
- policy = models.TokenDomainPolicy(
- token=self.token, domain=None, subname=None, type=None
- )
- policy.save()
- self.token.user = self.other_domains[0].owner
- with self.assertRaises(IntegrityError):
- with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
- self.token.save()
- policy.delete()
- self.token.save()
- def test_domain_owner_equals_token_user(self):
- models.TokenDomainPolicy(
- token=self.token, domain=None, subname=None, type=None
- ).save()
- with self.assertRaises(IntegrityError):
- with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
- models.TokenDomainPolicy(
- token=self.token,
- domain=self.other_domains[0],
- subname=None,
- type=None,
- ).save()
- self.token.user = self.other_domain.owner
- with self.assertRaises(IntegrityError):
- with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
- self.token.save()
- def test_domain_deletion(self):
- domains = [None] + self.my_domains[:2]
- for domain in domains:
- models.TokenDomainPolicy(
- token=self.token, domain=domain, subname=None, type=None
- ).save()
- domain = domains.pop()
- domain.delete()
- self.assertEqual(
- set(policy.domain for policy in self.token.tokendomainpolicy_set.all()),
- set(domains),
- )
- def test_token_deletion(self):
- domains = [None] + self.my_domains[:2]
- policies = {}
- for domain in domains:
- policy = models.TokenDomainPolicy(
- token=self.token, domain=domain, subname=None, type=None
- )
- policies[domain] = policy
- policy.save()
- self.token.delete()
- for domain, policy in policies.items():
- self.assertFalse(
- models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
- )
- if domain:
- self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
- def test_user_deletion(self):
- domains = [None] + self.my_domains[:2]
- for domain in domains:
- models.TokenDomainPolicy(
- token=self.token, domain=domain, subname=None, type=None
- ).save()
- # User can only be deleted when domains are deleted
- for domain in self.my_domains:
- domain.delete()
- # Only the default policy should be left, so get can simply get() it
- policy_pk = self.token.tokendomainpolicy_set.get().pk
- self.token.user.delete()
- self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())
|