|
@@ -15,10 +15,9 @@ class TokenDomainPolicyClient(APIClient):
|
|
|
kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
|
|
|
return method(url, **kwargs)
|
|
|
|
|
|
- def _request_policy(self, method, target, *, using, domain, **kwargs):
|
|
|
- domain = domain or "default"
|
|
|
+ def _request_policy(self, method, target, *, using, policy_id, **kwargs):
|
|
|
url = DomainOwnerTestCase.reverse(
|
|
|
- "v1:token_domain_policies-detail", token_id=target.id, domain__name=domain
|
|
|
+ "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
|
|
|
)
|
|
|
return self._request(method, url, using=using, **kwargs)
|
|
|
|
|
@@ -34,16 +33,18 @@ class TokenDomainPolicyClient(APIClient):
|
|
|
def create_policy(self, target, *, using, **kwargs):
|
|
|
return self._request_policies(self.post, target, using=using, **kwargs)
|
|
|
|
|
|
- def get_policy(self, target, *, using, domain):
|
|
|
- return self._request_policy(self.get, target, using=using, domain=domain)
|
|
|
+ def get_policy(self, target, *, using, policy_id):
|
|
|
+ return self._request_policy(self.get, target, using=using, policy_id=policy_id)
|
|
|
|
|
|
- def patch_policy(self, target, *, using, domain, **kwargs):
|
|
|
+ def patch_policy(self, target, *, using, policy_id, **kwargs):
|
|
|
return self._request_policy(
|
|
|
- self.patch, target, using=using, domain=domain, **kwargs
|
|
|
+ self.patch, target, using=using, policy_id=policy_id, **kwargs
|
|
|
)
|
|
|
|
|
|
- def delete_policy(self, target, *, using, domain):
|
|
|
- return self._request_policy(self.delete, target, using=using, domain=domain)
|
|
|
+ def delete_policy(self, target, *, using, policy_id):
|
|
|
+ return self._request_policy(
|
|
|
+ self.delete, target, using=using, policy_id=policy_id
|
|
|
+ )
|
|
|
|
|
|
|
|
|
class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
@@ -58,7 +59,7 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
def test_policy_lifecycle_without_management_permission(self):
|
|
|
# Prepare (with management token)
|
|
|
- data = dict(domain=None, perm_write=True)
|
|
|
+ data = {"domain": None, "subname": None, "type": None, "perm_write": True}
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
@@ -73,11 +74,16 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
response = self.client.list_policies(self.token, using=self.token)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
self.assertEqual(len(response.data), 1)
|
|
|
+ default_policy_id = response.data[0]["id"]
|
|
|
|
|
|
## Get
|
|
|
- response = self.client.get_policy(self.token, using=self.token, domain=None)
|
|
|
+ response = self.client.get_policy(
|
|
|
+ self.token, using=self.token, policy_id=default_policy_id
|
|
|
+ )
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
+ self.assertEqual(
|
|
|
+ response.data, self.default_data | data | {"id": default_policy_id}
|
|
|
+ )
|
|
|
|
|
|
# Inspection of other tokens forbidden
|
|
|
## List
|
|
@@ -86,7 +92,7 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
## Get
|
|
|
response = self.client.get_policy(
|
|
|
- self.token_manage, using=self.token, domain=None
|
|
|
+ self.token_manage, using=self.token, policy_id=default_policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
@@ -97,16 +103,19 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
# Change
|
|
|
- data = dict(
|
|
|
- domain=self.my_domains[1].name, perm_dyndns=False, perm_write=True
|
|
|
+ data = dict(perm_dyndns=False, perm_write=True)
|
|
|
+ policy_id = models.TokenDomainPolicy.objects.get(
|
|
|
+ token=target, domain__isnull=True
|
|
|
)
|
|
|
response = self.client.patch_policy(
|
|
|
- target, using=self.token, domain=self.my_domains[0].name, data=data
|
|
|
+ target, using=self.token, policy_id=policy_id, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
# Delete
|
|
|
- response = self.client.delete_policy(target, using=self.token, domain=None)
|
|
|
+ response = self.client.delete_policy(
|
|
|
+ target, using=self.token, policy_id=default_policy_id
|
|
|
+ )
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
def test_policy_lifecycle(self):
|
|
@@ -124,7 +133,7 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.assertEqual(response.data["domain"], ["This field is required."])
|
|
|
|
|
|
## without a default policy
|
|
|
- data = dict(domain=self.my_domains[0].name)
|
|
|
+ data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
|
|
|
with transaction.atomic():
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
@@ -142,28 +151,39 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
# Create
|
|
|
## default policy
|
|
|
- data = dict(domain=None, perm_write=True)
|
|
|
+ data = {"domain": None, "subname": None, "type": None, "perm_write": True}
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
|
+ default_policy_id = response.data["id"]
|
|
|
|
|
|
## can't create another default policy
|
|
|
with transaction.atomic():
|
|
|
response = self.client.create_policy(
|
|
|
- self.token, using=self.token_manage, data=dict(domain=None)
|
|
|
+ self.token,
|
|
|
+ using=self.token_manage,
|
|
|
+ data={"domain": None, "subname": None, "type": None},
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_409_CONFLICT)
|
|
|
|
|
|
## verify object creation
|
|
|
response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=None
|
|
|
+ self.token, using=self.token_manage, policy_id=default_policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
+ self.assertEqual(
|
|
|
+ response.data, self.default_data | data | {"id": default_policy_id}
|
|
|
+ )
|
|
|
|
|
|
## can't create policy for other user's domain
|
|
|
- data = dict(domain=self.other_domain.name, perm_dyndns=True, perm_write=True)
|
|
|
+ data = {
|
|
|
+ "domain": self.other_domain.name,
|
|
|
+ "subname": None,
|
|
|
+ "type": None,
|
|
|
+ "perm_dyndns": True,
|
|
|
+ "perm_write": True,
|
|
|
+ }
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
@@ -171,27 +191,31 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.assertEqual(response.data["domain"][0].code, "does_not_exist")
|
|
|
|
|
|
## another policy
|
|
|
- data = dict(domain=self.my_domains[0].name, perm_dyndns=True)
|
|
|
+ data = {
|
|
|
+ "domain": self.my_domains[0].name,
|
|
|
+ "subname": None,
|
|
|
+ "type": None,
|
|
|
+ "perm_dyndns": True,
|
|
|
+ }
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
|
+ policy_id = response.data["id"]
|
|
|
|
|
|
## can't create policy for the same domain
|
|
|
with transaction.atomic():
|
|
|
response = self.client.create_policy(
|
|
|
- self.token,
|
|
|
- using=self.token_manage,
|
|
|
- data=dict(domain=self.my_domains[0].name, perm_dyndns=False),
|
|
|
+ self.token, using=self.token_manage, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_409_CONFLICT)
|
|
|
|
|
|
## verify object creation
|
|
|
response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[0].name
|
|
|
+ self.token, using=self.token_manage, policy_id=policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
+ self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
|
|
|
|
|
|
# List: now has two elements
|
|
|
response = self.client.list_policies(self.token, using=self.token_manage)
|
|
@@ -200,33 +224,36 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
# Change
|
|
|
## all fields of a policy
|
|
|
- data = dict(domain=self.my_domains[1].name, perm_dyndns=False, perm_write=True)
|
|
|
+ data = dict(
|
|
|
+ domain=self.my_domains[1].name,
|
|
|
+ subname="_acme-challenge",
|
|
|
+ type="TXT",
|
|
|
+ perm_dyndns=False,
|
|
|
+ perm_write=True,
|
|
|
+ )
|
|
|
response = self.client.patch_policy(
|
|
|
self.token,
|
|
|
using=self.token_manage,
|
|
|
- domain=self.my_domains[0].name,
|
|
|
+ policy_id=policy_id,
|
|
|
data=data,
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
+ self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
|
|
|
|
|
|
## verify modification
|
|
|
response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[1].name
|
|
|
+ self.token, using=self.token_manage, policy_id=policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
-
|
|
|
- ## verify that policy for former domain is gone
|
|
|
- response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[0].name
|
|
|
- )
|
|
|
- self.assertStatus(response, status.HTTP_404_NOT_FOUND)
|
|
|
+ self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
|
|
|
|
|
|
## verify that the default policy can't be changed to a non-default policy
|
|
|
with transaction.atomic():
|
|
|
response = self.client.patch_policy(
|
|
|
- self.token, using=self.token_manage, domain=None, data=data
|
|
|
+ self.token,
|
|
|
+ using=self.token_manage,
|
|
|
+ policy_id=default_policy_id,
|
|
|
+ data=data,
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
|
self.assertEqual(
|
|
@@ -241,16 +268,26 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
## partially modify the default policy
|
|
|
data = dict(perm_dyndns=True)
|
|
|
response = self.client.patch_policy(
|
|
|
- self.token, using=self.token_manage, domain=None, data=data
|
|
|
+ self.token, using=self.token_manage, policy_id=default_policy_id, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, {"domain": None, "perm_write": True} | data)
|
|
|
+ self.assertEqual(
|
|
|
+ response.data,
|
|
|
+ {
|
|
|
+ "id": default_policy_id,
|
|
|
+ "domain": None,
|
|
|
+ "subname": None,
|
|
|
+ "type": None,
|
|
|
+ "perm_write": True,
|
|
|
+ }
|
|
|
+ | data,
|
|
|
+ )
|
|
|
|
|
|
# Delete
|
|
|
## can't delete default policy while others exist
|
|
|
with transaction.atomic():
|
|
|
response = self.client.delete_policy(
|
|
|
- self.token, using=self.token_manage, domain=None
|
|
|
+ self.token, using=self.token_manage, policy_id=default_policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
|
self.assertEqual(
|
|
@@ -264,26 +301,26 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
## delete other policy
|
|
|
response = self.client.delete_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[1].name
|
|
|
+ self.token, using=self.token_manage, policy_id=policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_204_NO_CONTENT)
|
|
|
|
|
|
## delete default policy
|
|
|
response = self.client.delete_policy(
|
|
|
- self.token, using=self.token_manage, domain=None
|
|
|
+ self.token, using=self.token_manage, policy_id=default_policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_204_NO_CONTENT)
|
|
|
|
|
|
## idempotence: delete a non-existing policy
|
|
|
response = self.client.delete_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[0].name
|
|
|
+ self.token, using=self.token_manage, policy_id=policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_204_NO_CONTENT)
|
|
|
|
|
|
## verify that policies are gone
|
|
|
- for domain in [None, self.my_domains[0].name, self.my_domains[1].name]:
|
|
|
+ for pid in [policy_id, default_policy_id]:
|
|
|
response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=domain
|
|
|
+ self.token, using=self.token_manage, policy_id=pid
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_404_NOT_FOUND)
|
|
|
|
|
@@ -342,25 +379,26 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
# Create
|
|
|
## default policy
|
|
|
- data = dict(domain=None)
|
|
|
+ data = {"domain": None, "subname": None, "type": None}
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
|
|
|
|
## another policy
|
|
|
- data = dict(domain=self.my_domains[0].name)
|
|
|
+ data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
|
+ policy_id = response.data["id"]
|
|
|
|
|
|
## verify object creation
|
|
|
response = self.client.get_policy(
|
|
|
- self.token, using=self.token_manage, domain=self.my_domains[0].name
|
|
|
+ self.token, using=self.token_manage, policy_id=policy_id
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
- self.assertEqual(response.data, self.default_data | data)
|
|
|
+ self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
|
|
|
|
|
|
policies = {
|
|
|
self.my_domains[0]: self.token.tokendomainpolicy_set.get(
|
|
@@ -406,10 +444,14 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
def test_domain_owner_consistency(self):
|
|
|
- models.TokenDomainPolicy(token=self.token, domain=None).save()
|
|
|
+ models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=None, subname=None, type=None
|
|
|
+ ).save()
|
|
|
|
|
|
domain = self.my_domains[0]
|
|
|
- policy = models.TokenDomainPolicy(token=self.token, domain=domain)
|
|
|
+ policy = models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=domain, subname=None, type=None
|
|
|
+ )
|
|
|
policy.save()
|
|
|
|
|
|
domain.owner = self.other_domains[0].owner
|
|
@@ -421,7 +463,9 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
domain.save()
|
|
|
|
|
|
def test_token_user_consistency(self):
|
|
|
- policy = models.TokenDomainPolicy(token=self.token, domain=None)
|
|
|
+ policy = models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=None, subname=None, type=None
|
|
|
+ )
|
|
|
policy.save()
|
|
|
|
|
|
self.token.user = self.other_domains[0].owner
|
|
@@ -433,12 +477,17 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.token.save()
|
|
|
|
|
|
def test_domain_owner_equals_token_user(self):
|
|
|
- models.TokenDomainPolicy(token=self.token, domain=None).save()
|
|
|
+ models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=None, subname=None, type=None
|
|
|
+ ).save()
|
|
|
|
|
|
with self.assertRaises(IntegrityError):
|
|
|
with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
|
|
|
models.TokenDomainPolicy(
|
|
|
- token=self.token, domain=self.other_domains[0]
|
|
|
+ token=self.token,
|
|
|
+ domain=self.other_domains[0],
|
|
|
+ subname=None,
|
|
|
+ type=None,
|
|
|
).save()
|
|
|
|
|
|
self.token.user = self.other_domain.owner
|
|
@@ -449,7 +498,9 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
def test_domain_deletion(self):
|
|
|
domains = [None] + self.my_domains[:2]
|
|
|
for domain in domains:
|
|
|
- models.TokenDomainPolicy(token=self.token, domain=domain).save()
|
|
|
+ models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=domain, subname=None, type=None
|
|
|
+ ).save()
|
|
|
|
|
|
domain = domains.pop()
|
|
|
domain.delete()
|
|
@@ -462,7 +513,9 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
domains = [None] + self.my_domains[:2]
|
|
|
policies = {}
|
|
|
for domain in domains:
|
|
|
- policy = models.TokenDomainPolicy(token=self.token, domain=domain)
|
|
|
+ policy = models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=domain, subname=None, type=None
|
|
|
+ )
|
|
|
policies[domain] = policy
|
|
|
policy.save()
|
|
|
|
|
@@ -477,7 +530,9 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
def test_user_deletion(self):
|
|
|
domains = [None] + self.my_domains[:2]
|
|
|
for domain in domains:
|
|
|
- models.TokenDomainPolicy(token=self.token, domain=domain).save()
|
|
|
+ models.TokenDomainPolicy(
|
|
|
+ token=self.token, domain=domain, subname=None, type=None
|
|
|
+ ).save()
|
|
|
|
|
|
# User can only be deleted when domains are deleted
|
|
|
for domain in self.my_domains:
|