test_token_domain_policy.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. from contextlib import nullcontext
  2. from django.db import transaction
  3. from django.db.utils import IntegrityError
  4. from rest_framework import status
  5. from rest_framework.test import APIClient
  6. from desecapi import models
  7. from desecapi.tests.base import DomainOwnerTestCase
  8. class TokenDomainPolicyClient(APIClient):
  9. def _request(self, method, url, *, using, **kwargs):
  10. if using is not None:
  11. kwargs.update(HTTP_AUTHORIZATION=f'Token {using.plain}')
  12. return method(url, **kwargs)
  13. def _request_policy(self, method, target, *, using, domain, **kwargs):
  14. domain = domain or 'default'
  15. url = DomainOwnerTestCase.reverse('v1:token_domain_policies-detail', token_id=target.id, domain__name=domain)
  16. return self._request(method, url, using=using, **kwargs)
  17. def _request_policies(self, method, target, *, using, **kwargs):
  18. url = DomainOwnerTestCase.reverse('v1:token_domain_policies-list', token_id=target.id)
  19. return self._request(method, url, using=using, **kwargs)
  20. def list_policies(self, target, *, using):
  21. return self._request_policies(self.get, target, using=using)
  22. def create_policy(self, target, *, using, **kwargs):
  23. return self._request_policies(self.post, target, using=using, **kwargs)
  24. def get_policy(self, target, *, using, domain):
  25. return self._request_policy(self.get, target, using=using, domain=domain)
  26. def patch_policy(self, target, *, using, domain, **kwargs):
  27. return self._request_policy(self.patch, target, using=using, domain=domain, **kwargs)
  28. def delete_policy(self, target, *, using, domain):
  29. return self._request_policy(self.delete, target, using=using, domain=domain)
  30. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  31. client_class = TokenDomainPolicyClient
  32. default_data = dict(perm_dyndns=False, perm_rrsets=False)
  33. def setUp(self):
  34. super().setUp()
  35. self.client.credentials() # remove default credential (corresponding to domain owner)
  36. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  37. self.other_token = self.create_token(self.user)
  38. def test_policy_lifecycle_without_management_permission(self):
  39. # Prepare (with management token)
  40. data = dict(domain=None, perm_rrsets=True)
  41. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  42. self.assertStatus(response, status.HTTP_201_CREATED)
  43. response = self.client.create_policy(self.token_manage, using=self.token_manage, data=data)
  44. self.assertStatus(response, status.HTTP_201_CREATED)
  45. # Self-inspection is fine
  46. ## List
  47. response = self.client.list_policies(self.token, using=self.token)
  48. self.assertStatus(response, status.HTTP_200_OK)
  49. self.assertEqual(len(response.data), 1)
  50. ## Get
  51. response = self.client.get_policy(self.token, using=self.token, domain=None)
  52. self.assertStatus(response, status.HTTP_200_OK)
  53. self.assertEqual(response.data, self.default_data | data)
  54. # Inspection of other tokens forbitten
  55. ## List
  56. response = self.client.list_policies(self.token_manage, using=self.token)
  57. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  58. ## Get
  59. response = self.client.get_policy(self.token_manage, using=self.token, domain=None)
  60. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  61. # Write operations forbidden (self and other)
  62. for target in [self.token, self.token_manage]:
  63. # Create
  64. response = self.client.create_policy(target, using=self.token)
  65. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  66. # Change
  67. data = dict(domain=self.my_domains[1].name, perm_dyndns=False, perm_rrsets=True)
  68. response = self.client.patch_policy(target, using=self.token, domain=self.my_domains[0].name, data=data)
  69. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  70. # Delete
  71. response = self.client.delete_policy(target, using=self.token, domain=None)
  72. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  73. def test_policy_lifecycle(self):
  74. # Can't do anything unauthorized
  75. response = self.client.list_policies(self.token, using=None)
  76. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  77. response = self.client.create_policy(self.token, using=None)
  78. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  79. # Create
  80. ## without required field
  81. response = self.client.create_policy(self.token, using=self.token_manage)
  82. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  83. self.assertEqual(response.data['domain'], ['This field is required.'])
  84. ## without a default policy
  85. data = dict(domain=self.my_domains[0].name)
  86. with transaction.atomic():
  87. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  88. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  89. self.assertEqual(response.data['domain'], ['Policy precedence: The first policy must be the default policy.'])
  90. # List: still empty
  91. response = self.client.list_policies(self.token, using=self.token_manage)
  92. self.assertStatus(response, status.HTTP_200_OK)
  93. self.assertEqual(response.data, [])
  94. # Create
  95. ## default policy
  96. data = dict(domain=None, perm_rrsets=True)
  97. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  98. self.assertStatus(response, status.HTTP_201_CREATED)
  99. ## can't create another default policy
  100. with transaction.atomic():
  101. response = self.client.create_policy(self.token, using=self.token_manage, data=dict(domain=None))
  102. self.assertStatus(response, status.HTTP_409_CONFLICT)
  103. ## verify object creation
  104. response = self.client.get_policy(self.token, using=self.token_manage, domain=None)
  105. self.assertStatus(response, status.HTTP_200_OK)
  106. self.assertEqual(response.data, self.default_data | data)
  107. ## can't create policy for other user's domain
  108. data = dict(domain=self.other_domain.name, perm_dyndns=True, perm_rrsets=True)
  109. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  110. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  111. self.assertEqual(response.data['domain'][0].code, 'does_not_exist')
  112. ## another policy
  113. data = dict(domain=self.my_domains[0].name, perm_dyndns=True)
  114. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  115. self.assertStatus(response, status.HTTP_201_CREATED)
  116. ## can't create policy for the same domain
  117. with transaction.atomic():
  118. response = self.client.create_policy(self.token, using=self.token_manage,
  119. data=dict(domain=self.my_domains[0].name, perm_dyndns=False))
  120. self.assertStatus(response, status.HTTP_409_CONFLICT)
  121. ## verify object creation
  122. response = self.client.get_policy(self.token, using=self.token_manage, domain=self.my_domains[0].name)
  123. self.assertStatus(response, status.HTTP_200_OK)
  124. self.assertEqual(response.data, self.default_data | data)
  125. # List: now has two elements
  126. response = self.client.list_policies(self.token, using=self.token_manage)
  127. self.assertStatus(response, status.HTTP_200_OK)
  128. self.assertEqual(len(response.data), 2)
  129. # Change
  130. ## all fields of a policy
  131. data = dict(domain=self.my_domains[1].name, perm_dyndns=False, perm_rrsets=True)
  132. response = self.client.patch_policy(self.token, using=self.token_manage, domain=self.my_domains[0].name,
  133. data=data)
  134. self.assertStatus(response, status.HTTP_200_OK)
  135. self.assertEqual(response.data, self.default_data | data)
  136. ## verify modification
  137. response = self.client.get_policy(self.token, using=self.token_manage, domain=self.my_domains[1].name)
  138. self.assertStatus(response, status.HTTP_200_OK)
  139. self.assertEqual(response.data, self.default_data | data)
  140. ## verify that policy for former domain is gone
  141. response = self.client.get_policy(self.token, using=self.token_manage, domain=self.my_domains[0].name)
  142. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  143. ## verify that the default policy can't be changed to a non-default policy
  144. with transaction.atomic():
  145. response = self.client.patch_policy(self.token, using=self.token_manage, domain=None, data=data)
  146. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  147. self.assertEqual(response.data,
  148. {'domain': ['Policy precedence: Cannot disable default policy when others exist.']})
  149. ## partially modify the default policy
  150. data = dict(perm_dyndns=True)
  151. response = self.client.patch_policy(self.token, using=self.token_manage, domain=None, data=data)
  152. self.assertStatus(response, status.HTTP_200_OK)
  153. self.assertEqual(response.data, {'domain': None, 'perm_rrsets': True} | data)
  154. # Delete
  155. ## can't delete default policy while others exist
  156. with transaction.atomic():
  157. response = self.client.delete_policy(self.token, using=self.token_manage, domain=None)
  158. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  159. self.assertEqual(response.data,
  160. {'domain': ["Policy precedence: Can't delete default policy when there exist others."]})
  161. ## delete other policy
  162. response = self.client.delete_policy(self.token, using=self.token_manage, domain=self.my_domains[1].name)
  163. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  164. ## delete default policy
  165. response = self.client.delete_policy(self.token, using=self.token_manage, domain=None)
  166. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  167. ## idempotence: delete a non-existing policy
  168. response = self.client.delete_policy(self.token, using=self.token_manage, domain=self.my_domains[0].name)
  169. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  170. ## verify that policies are gone
  171. for domain in [None, self.my_domains[0].name, self.my_domains[1].name]:
  172. response = self.client.get_policy(self.token, using=self.token_manage, domain=domain)
  173. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  174. # List: empty again
  175. response = self.client.list_policies(self.token, using=self.token_manage)
  176. self.assertStatus(response, status.HTTP_200_OK)
  177. self.assertEqual(response.data, [])
  178. def test_policy_permissions(self):
  179. def _reset_policies(token):
  180. for policy in token.tokendomainpolicy_set.all():
  181. for perm in self.default_data.keys():
  182. setattr(policy, perm, False)
  183. policy.save()
  184. def _perform_requests(name, perm, value, **kwargs):
  185. responses = []
  186. if value:
  187. pdns_name = self._normalize_name(name).lower()
  188. cm = self.assertPdnsNoRequestsBut(self.request_pdns_zone_update(name=pdns_name),
  189. self.request_pdns_zone_axfr(name=pdns_name))
  190. else:
  191. cm = nullcontext()
  192. if perm == 'perm_dyndns':
  193. data = {'username': name, 'password': self.token.plain}
  194. with cm:
  195. responses.append(self.client.get(self.reverse('v1:dyndns12update'), data))
  196. return responses
  197. if perm == 'perm_rrsets':
  198. url_detail = self.reverse('v1:rrset@', name=name, subname='', type='A')
  199. url_list = self.reverse('v1:rrsets', name=name)
  200. responses.append(self.client.get(url_list, **kwargs))
  201. responses.append(self.client.patch(url_list, [], **kwargs))
  202. responses.append(self.client.put(url_list, [], **kwargs))
  203. responses.append(self.client.post(url_list, [], **kwargs))
  204. data = {'subname': '', 'type': 'A', 'ttl': 3600, 'records': ['1.2.3.4']}
  205. with cm:
  206. responses += [
  207. self.client.delete(url_detail, **kwargs),
  208. self.client.post(url_list, data=data, **kwargs),
  209. self.client.put(url_detail, data=data, **kwargs),
  210. self.client.patch(url_detail, data=data, **kwargs),
  211. self.client.get(url_detail, **kwargs),
  212. ]
  213. return responses
  214. raise ValueError(f'Unexpected permission: {perm}')
  215. # Create
  216. ## default policy
  217. data = dict(domain=None)
  218. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  219. self.assertStatus(response, status.HTTP_201_CREATED)
  220. ## another policy
  221. data = dict(domain=self.my_domains[0].name)
  222. response = self.client.create_policy(self.token, using=self.token_manage, data=data)
  223. self.assertStatus(response, status.HTTP_201_CREATED)
  224. ## verify object creation
  225. response = self.client.get_policy(self.token, using=self.token_manage, domain=self.my_domains[0].name)
  226. self.assertStatus(response, status.HTTP_200_OK)
  227. self.assertEqual(response.data, self.default_data | data)
  228. policies = {
  229. self.my_domains[0]: self.token.tokendomainpolicy_set.get(domain__isnull=False),
  230. self.my_domains[1]: self.token.tokendomainpolicy_set.get(domain__isnull=True),
  231. }
  232. kwargs = dict(HTTP_AUTHORIZATION=f'Token {self.token.plain}')
  233. # For each permission type
  234. for perm in self.default_data.keys():
  235. # For the domain with specific policy and for the domain covered by the default policy
  236. for domain in policies.keys():
  237. # For both possible values of the permission
  238. for value in [True, False]:
  239. # Set only that permission for that domain (on its effective policy)
  240. _reset_policies(self.token)
  241. policy = policies[domain]
  242. setattr(policy, perm, value)
  243. policy.save()
  244. # Perform requests that test this permission and inspect responses
  245. for response in _perform_requests(domain.name, perm, value, **kwargs):
  246. if value:
  247. self.assertIn(response.status_code, range(200, 300))
  248. else:
  249. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  250. # Can't create domain
  251. data = {'name': self.random_domain_name()}
  252. response = self.client.post(self.reverse('v1:domain-list'), data, **kwargs)
  253. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  254. # Can't access account details
  255. response = self.client.get(self.reverse('v1:account'), **kwargs)
  256. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  257. def test_domain_owner_consistency(self):
  258. models.TokenDomainPolicy(token=self.token, domain=None).save()
  259. domain = self.my_domains[0]
  260. policy = models.TokenDomainPolicy(token=self.token, domain=domain)
  261. policy.save()
  262. domain.owner = self.other_domains[0].owner
  263. with self.assertRaises(IntegrityError):
  264. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  265. domain.save()
  266. policy.delete()
  267. domain.save()
  268. def test_token_user_consistency(self):
  269. policy = models.TokenDomainPolicy(token=self.token, domain=None)
  270. policy.save()
  271. self.token.user = self.other_domains[0].owner
  272. with self.assertRaises(IntegrityError):
  273. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  274. self.token.save()
  275. policy.delete()
  276. self.token.save()
  277. def test_domain_owner_equals_token_user(self):
  278. models.TokenDomainPolicy(token=self.token, domain=None).save()
  279. with self.assertRaises(IntegrityError):
  280. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  281. models.TokenDomainPolicy(token=self.token, domain=self.other_domains[0]).save()
  282. self.token.user = self.other_domain.owner
  283. with self.assertRaises(IntegrityError):
  284. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  285. self.token.save()
  286. def test_domain_deletion(self):
  287. domains = [None] + self.my_domains[:2]
  288. for domain in domains:
  289. models.TokenDomainPolicy(token=self.token, domain=domain).save()
  290. domain = domains.pop()
  291. domain.delete()
  292. self.assertEqual(list(map(lambda x: x.domain, self.token.tokendomainpolicy_set.all())), domains)
  293. def test_token_deletion(self):
  294. domains = [None] + self.my_domains[:2]
  295. policies = {}
  296. for domain in domains:
  297. policy = models.TokenDomainPolicy(token=self.token, domain=domain)
  298. policies[domain] = policy
  299. policy.save()
  300. self.token.delete()
  301. for domain, policy in policies.items():
  302. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists())
  303. if domain:
  304. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  305. def test_user_deletion(self):
  306. domains = [None] + self.my_domains[:2]
  307. for domain in domains:
  308. models.TokenDomainPolicy(token=self.token, domain=domain).save()
  309. # User can only be deleted when domains are deleted
  310. for domain in self.my_domains:
  311. domain.delete()
  312. # Only the default policy should be left, so get can simply get() it
  313. policy_pk = self.token.tokendomainpolicy_set.get().pk
  314. self.token.user.delete()
  315. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())