|
@@ -1,5 +1,3 @@
|
|
|
-from contextlib import nullcontext
|
|
|
-
|
|
|
from django.db import transaction
|
|
|
from django.db.utils import IntegrityError
|
|
|
from rest_framework import status
|
|
@@ -49,7 +47,7 @@ class TokenDomainPolicyClient(APIClient):
|
|
|
|
|
|
class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
client_class = TokenDomainPolicyClient
|
|
|
- default_data = dict(perm_dyndns=False, perm_write=False)
|
|
|
+ default_data = dict(perm_write=False)
|
|
|
|
|
|
def setUp(self):
|
|
|
super().setUp()
|
|
@@ -59,7 +57,9 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
|
|
|
def test_get_policy(self):
|
|
|
def get_policy(domain, subname, type):
|
|
|
- return self.token.get_policy(domain=domain, subname=subname, type=type)
|
|
|
+ return self.token.get_policy(
|
|
|
+ models.RRset(domain=domain, subname=subname, type=type)
|
|
|
+ )
|
|
|
|
|
|
def assertPolicy(policy, domain, subname, type):
|
|
|
self.assertEqual(policy.domain, domain)
|
|
@@ -174,7 +174,7 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
# Change
|
|
|
- data = dict(perm_dyndns=False, perm_write=True)
|
|
|
+ data = dict(perm_write=True)
|
|
|
policy = target.get_policy()
|
|
|
response = self.client.patch_policy(
|
|
|
target, using=self.token, policy_id=policy.pk, data=data
|
|
@@ -265,7 +265,6 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
"domain": self.my_domains[0].name,
|
|
|
"subname": None,
|
|
|
"type": None,
|
|
|
- "perm_dyndns": True,
|
|
|
}
|
|
|
response = self.client.create_policy(
|
|
|
self.token, using=self.token_manage, data=data
|
|
@@ -298,7 +297,6 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
domain=self.my_domains[1].name,
|
|
|
subname="_acme-challenge",
|
|
|
type="TXT",
|
|
|
- perm_dyndns=False,
|
|
|
perm_write=True,
|
|
|
)
|
|
|
response = self.client.patch_policy(
|
|
@@ -332,7 +330,7 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
)
|
|
|
|
|
|
## partially modify the default policy
|
|
|
- data = dict(perm_dyndns=True)
|
|
|
+ data = dict()
|
|
|
response = self.client.patch_policy(
|
|
|
self.token, using=self.token_manage, policy_id=default_policy_id, data=data
|
|
|
)
|
|
@@ -398,47 +396,6 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
setattr(policy, perm, False)
|
|
|
policy.save()
|
|
|
|
|
|
- def _perform_requests(name, perm, value, **kwargs):
|
|
|
- responses = []
|
|
|
- if value:
|
|
|
- pdns_name = self._normalize_name(name).lower()
|
|
|
- cm = self.assertNoRequestsBut(
|
|
|
- self.request_pdns_zone_update(name=pdns_name),
|
|
|
- self.request_pdns_zone_axfr(name=pdns_name),
|
|
|
- )
|
|
|
- else:
|
|
|
- cm = nullcontext()
|
|
|
-
|
|
|
- if perm == "perm_dyndns":
|
|
|
- data = {"username": name, "password": self.token.plain}
|
|
|
- with cm:
|
|
|
- responses.append(
|
|
|
- self.client.get(self.reverse("v1:dyndns12update"), data)
|
|
|
- )
|
|
|
- return responses
|
|
|
-
|
|
|
- if perm == "perm_write":
|
|
|
- url_detail = self.reverse("v1:rrset@", name=name, subname="", type="A")
|
|
|
- url_list = self.reverse("v1:rrsets", name=name)
|
|
|
-
|
|
|
- responses.append(self.client.get(url_list, **kwargs))
|
|
|
- responses.append(self.client.patch(url_list, [], **kwargs))
|
|
|
- responses.append(self.client.put(url_list, [], **kwargs))
|
|
|
- responses.append(self.client.post(url_list, [], **kwargs))
|
|
|
-
|
|
|
- data = {"subname": "", "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}
|
|
|
- with cm:
|
|
|
- responses += [
|
|
|
- self.client.delete(url_detail, **kwargs),
|
|
|
- self.client.post(url_list, data=data, **kwargs),
|
|
|
- self.client.put(url_detail, data=data, **kwargs),
|
|
|
- self.client.patch(url_detail, data=data, **kwargs),
|
|
|
- self.client.get(url_detail, **kwargs),
|
|
|
- ]
|
|
|
- return responses
|
|
|
-
|
|
|
- raise ValueError(f"Unexpected permission: {perm}")
|
|
|
-
|
|
|
# Create
|
|
|
## default policy
|
|
|
data = {"domain": None, "subname": None, "type": None}
|
|
@@ -484,15 +441,6 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
setattr(policy, perm, value)
|
|
|
policy.save()
|
|
|
|
|
|
- # Perform requests that test this permission and inspect responses
|
|
|
- for response in _perform_requests(
|
|
|
- domain.name, perm, value, **kwargs
|
|
|
- ):
|
|
|
- if value:
|
|
|
- self.assertIn(response.status_code, range(200, 300))
|
|
|
- else:
|
|
|
- self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
-
|
|
|
# Can't create domain
|
|
|
data = {"name": self.random_domain_name()}
|
|
|
response = self.client.post(
|
|
@@ -500,10 +448,86 @@ class TokenDomainPolicyTestCase(DomainOwnerTestCase):
|
|
|
)
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
+ # Can't delete domain
|
|
|
+ response = self.client.delete(
|
|
|
+ self.reverse("v1:domain-detail", name=domain), {}, **kwargs
|
|
|
+ )
|
|
|
+ self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
+
|
|
|
# Can't access account details
|
|
|
response = self.client.get(self.reverse("v1:account"), **kwargs)
|
|
|
self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
+ def test_dyndns_permission(self):
|
|
|
+ def _perform_request(**kwargs):
|
|
|
+ return self.client.get(
|
|
|
+ self.reverse("v1:dyndns12update"),
|
|
|
+ {
|
|
|
+ "username": self.my_domains[1].name,
|
|
|
+ "password": self.token.plain,
|
|
|
+ **kwargs,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ def assert_allowed(**kwargs):
|
|
|
+ response = _perform_request(**kwargs)
|
|
|
+ self.assertStatus(response, status.HTTP_200_OK)
|
|
|
+ self.assertEqual(response.data, "good")
|
|
|
+
|
|
|
+ def assert_forbidden(**kwargs):
|
|
|
+ response = _perform_request(**kwargs)
|
|
|
+ self.assertStatus(response, status.HTTP_403_FORBIDDEN)
|
|
|
+ self.assertEqual(response.data["detail"], "Insufficient token permissions.")
|
|
|
+
|
|
|
+ # No policy
|
|
|
+ assert_allowed(
|
|
|
+ myipv4=""
|
|
|
+ ) # empty IPv4 and delete IPv6 (no-op, prevents pdns request)
|
|
|
+
|
|
|
+ # Default policy (deny)
|
|
|
+ qs = self.token.tokendomainpolicy_set
|
|
|
+ qs.create(domain=None, subname=None, type=None)
|
|
|
+ assert_forbidden(myipv4="")
|
|
|
+ assert_allowed(
|
|
|
+ myipv4="preserve", myipv6="preserve"
|
|
|
+ ) # no-op needs no permissions
|
|
|
+
|
|
|
+ # Only A permission
|
|
|
+ qs.create(domain=self.my_domains[1], subname=None, type="A", perm_write=True)
|
|
|
+ assert_forbidden(myipv4="")
|
|
|
+ assert_allowed(myipv4="", myipv6="preserve") # just IPv4
|
|
|
+
|
|
|
+ # Only A permission
|
|
|
+ qs.create(domain=self.my_domains[1], subname=None, type="AAAA")
|
|
|
+ assert_forbidden(myipv4="")
|
|
|
+ assert_allowed(myipv4="", myipv6="preserve") # just IPv4
|
|
|
+
|
|
|
+ # A + AAAA permission
|
|
|
+ qs.filter(domain=self.my_domains[1], type="AAAA").update(perm_write=True)
|
|
|
+ assert_allowed(myipv4="") # empty IPv4 and delete IPv6
|
|
|
+
|
|
|
+ # Only AAAA permission
|
|
|
+ qs.filter(domain=self.my_domains[1], type="A").update(perm_write=False)
|
|
|
+ assert_forbidden(myipv4="")
|
|
|
+ assert_allowed(myipv4="preserve", myipv6="") # just IPv6
|
|
|
+
|
|
|
+ # Update default policy to allow, but A deny policy overrides
|
|
|
+ qs.filter(domain__isnull=True).update(perm_write=True)
|
|
|
+ assert_forbidden(myipv4="")
|
|
|
+ assert_allowed(myipv4="preserve", myipv6="") # just IPv6
|
|
|
+
|
|
|
+ # AAAA (allow) and A (allow via default policy fallback)
|
|
|
+ qs.filter(domain=self.my_domains[1], type="A").delete()
|
|
|
+ assert_allowed(myipv4="", myipv6="")
|
|
|
+
|
|
|
+ # Default policy (allow)
|
|
|
+ qs.filter(domain=self.my_domains[1]).delete()
|
|
|
+ assert_allowed(myipv4="", myipv6="")
|
|
|
+
|
|
|
+ # No policy
|
|
|
+ qs.filter().delete()
|
|
|
+ assert_allowed(myipv4="", myipv6="")
|
|
|
+
|
|
|
def test_domain_owner_consistency(self):
|
|
|
models.TokenDomainPolicy(
|
|
|
token=self.token, domain=None, subname=None, type=None
|