test_token_domain_policy.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. from contextlib import nullcontext
  2. from django.db import transaction
  3. from django.db.utils import IntegrityError
  4. from rest_framework import status
  5. from rest_framework.test import APIClient
  6. from desecapi import models
  7. from desecapi.tests.base import DomainOwnerTestCase
  8. class TokenDomainPolicyClient(APIClient):
  9. def _request(self, method, url, *, using, **kwargs):
  10. if using is not None:
  11. kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
  12. return method(url, **kwargs)
  13. def _request_policy(self, method, target, *, using, domain, **kwargs):
  14. domain = domain or "default"
  15. url = DomainOwnerTestCase.reverse(
  16. "v1:token_domain_policies-detail", token_id=target.id, domain__name=domain
  17. )
  18. return self._request(method, url, using=using, **kwargs)
  19. def _request_policies(self, method, target, *, using, **kwargs):
  20. url = DomainOwnerTestCase.reverse(
  21. "v1:token_domain_policies-list", token_id=target.id
  22. )
  23. return self._request(method, url, using=using, **kwargs)
  24. def list_policies(self, target, *, using):
  25. return self._request_policies(self.get, target, using=using)
  26. def create_policy(self, target, *, using, **kwargs):
  27. return self._request_policies(self.post, target, using=using, **kwargs)
  28. def get_policy(self, target, *, using, domain):
  29. return self._request_policy(self.get, target, using=using, domain=domain)
  30. def patch_policy(self, target, *, using, domain, **kwargs):
  31. return self._request_policy(
  32. self.patch, target, using=using, domain=domain, **kwargs
  33. )
  34. def delete_policy(self, target, *, using, domain):
  35. return self._request_policy(self.delete, target, using=using, domain=domain)
  36. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  37. client_class = TokenDomainPolicyClient
  38. default_data = dict(perm_dyndns=False, perm_rrsets=False)
  39. def setUp(self):
  40. super().setUp()
  41. self.client.credentials() # remove default credential (corresponding to domain owner)
  42. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  43. self.other_token = self.create_token(self.user)
  44. def test_policy_lifecycle_without_management_permission(self):
  45. # Prepare (with management token)
  46. data = dict(domain=None, perm_rrsets=True)
  47. response = self.client.create_policy(
  48. self.token, using=self.token_manage, data=data
  49. )
  50. self.assertStatus(response, status.HTTP_201_CREATED)
  51. response = self.client.create_policy(
  52. self.token_manage, using=self.token_manage, data=data
  53. )
  54. self.assertStatus(response, status.HTTP_201_CREATED)
  55. # Self-inspection is fine
  56. ## List
  57. response = self.client.list_policies(self.token, using=self.token)
  58. self.assertStatus(response, status.HTTP_200_OK)
  59. self.assertEqual(len(response.data), 1)
  60. ## Get
  61. response = self.client.get_policy(self.token, using=self.token, domain=None)
  62. self.assertStatus(response, status.HTTP_200_OK)
  63. self.assertEqual(response.data, self.default_data | data)
  64. # Inspection of other tokens forbidden
  65. ## List
  66. response = self.client.list_policies(self.token_manage, using=self.token)
  67. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  68. ## Get
  69. response = self.client.get_policy(
  70. self.token_manage, using=self.token, domain=None
  71. )
  72. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  73. # Write operations forbidden (self and other)
  74. for target in [self.token, self.token_manage]:
  75. # Create
  76. response = self.client.create_policy(target, using=self.token)
  77. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  78. # Change
  79. data = dict(
  80. domain=self.my_domains[1].name, perm_dyndns=False, perm_rrsets=True
  81. )
  82. response = self.client.patch_policy(
  83. target, using=self.token, domain=self.my_domains[0].name, data=data
  84. )
  85. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  86. # Delete
  87. response = self.client.delete_policy(target, using=self.token, domain=None)
  88. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  89. def test_policy_lifecycle(self):
  90. # Can't do anything unauthorized
  91. response = self.client.list_policies(self.token, using=None)
  92. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  93. response = self.client.create_policy(self.token, using=None)
  94. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  95. # Create
  96. ## without required field
  97. response = self.client.create_policy(self.token, using=self.token_manage)
  98. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  99. self.assertEqual(response.data["domain"], ["This field is required."])
  100. ## without a default policy
  101. data = dict(domain=self.my_domains[0].name)
  102. with transaction.atomic():
  103. response = self.client.create_policy(
  104. self.token, using=self.token_manage, data=data
  105. )
  106. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  107. self.assertEqual(
  108. response.data["domain"],
  109. ["Policy precedence: The first policy must be the default policy."],
  110. )
  111. # List: still empty
  112. response = self.client.list_policies(self.token, using=self.token_manage)
  113. self.assertStatus(response, status.HTTP_200_OK)
  114. self.assertEqual(response.data, [])
  115. # Create
  116. ## default policy
  117. data = dict(domain=None, perm_rrsets=True)
  118. response = self.client.create_policy(
  119. self.token, using=self.token_manage, data=data
  120. )
  121. self.assertStatus(response, status.HTTP_201_CREATED)
  122. ## can't create another default policy
  123. with transaction.atomic():
  124. response = self.client.create_policy(
  125. self.token, using=self.token_manage, data=dict(domain=None)
  126. )
  127. self.assertStatus(response, status.HTTP_409_CONFLICT)
  128. ## verify object creation
  129. response = self.client.get_policy(
  130. self.token, using=self.token_manage, domain=None
  131. )
  132. self.assertStatus(response, status.HTTP_200_OK)
  133. self.assertEqual(response.data, self.default_data | data)
  134. ## can't create policy for other user's domain
  135. data = dict(domain=self.other_domain.name, perm_dyndns=True, perm_rrsets=True)
  136. response = self.client.create_policy(
  137. self.token, using=self.token_manage, data=data
  138. )
  139. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  140. self.assertEqual(response.data["domain"][0].code, "does_not_exist")
  141. ## another policy
  142. data = dict(domain=self.my_domains[0].name, perm_dyndns=True)
  143. response = self.client.create_policy(
  144. self.token, using=self.token_manage, data=data
  145. )
  146. self.assertStatus(response, status.HTTP_201_CREATED)
  147. ## can't create policy for the same domain
  148. with transaction.atomic():
  149. response = self.client.create_policy(
  150. self.token,
  151. using=self.token_manage,
  152. data=dict(domain=self.my_domains[0].name, perm_dyndns=False),
  153. )
  154. self.assertStatus(response, status.HTTP_409_CONFLICT)
  155. ## verify object creation
  156. response = self.client.get_policy(
  157. self.token, using=self.token_manage, domain=self.my_domains[0].name
  158. )
  159. self.assertStatus(response, status.HTTP_200_OK)
  160. self.assertEqual(response.data, self.default_data | data)
  161. # List: now has two elements
  162. response = self.client.list_policies(self.token, using=self.token_manage)
  163. self.assertStatus(response, status.HTTP_200_OK)
  164. self.assertEqual(len(response.data), 2)
  165. # Change
  166. ## all fields of a policy
  167. data = dict(domain=self.my_domains[1].name, perm_dyndns=False, perm_rrsets=True)
  168. response = self.client.patch_policy(
  169. self.token,
  170. using=self.token_manage,
  171. domain=self.my_domains[0].name,
  172. data=data,
  173. )
  174. self.assertStatus(response, status.HTTP_200_OK)
  175. self.assertEqual(response.data, self.default_data | data)
  176. ## verify modification
  177. response = self.client.get_policy(
  178. self.token, using=self.token_manage, domain=self.my_domains[1].name
  179. )
  180. self.assertStatus(response, status.HTTP_200_OK)
  181. self.assertEqual(response.data, self.default_data | data)
  182. ## verify that policy for former domain is gone
  183. response = self.client.get_policy(
  184. self.token, using=self.token_manage, domain=self.my_domains[0].name
  185. )
  186. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  187. ## verify that the default policy can't be changed to a non-default policy
  188. with transaction.atomic():
  189. response = self.client.patch_policy(
  190. self.token, using=self.token_manage, domain=None, data=data
  191. )
  192. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  193. self.assertEqual(
  194. response.data,
  195. {
  196. "domain": [
  197. "Policy precedence: Cannot disable default policy when others exist."
  198. ]
  199. },
  200. )
  201. ## partially modify the default policy
  202. data = dict(perm_dyndns=True)
  203. response = self.client.patch_policy(
  204. self.token, using=self.token_manage, domain=None, data=data
  205. )
  206. self.assertStatus(response, status.HTTP_200_OK)
  207. self.assertEqual(response.data, {"domain": None, "perm_rrsets": True} | data)
  208. # Delete
  209. ## can't delete default policy while others exist
  210. with transaction.atomic():
  211. response = self.client.delete_policy(
  212. self.token, using=self.token_manage, domain=None
  213. )
  214. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  215. self.assertEqual(
  216. response.data,
  217. {
  218. "domain": [
  219. "Policy precedence: Can't delete default policy when there exist others."
  220. ]
  221. },
  222. )
  223. ## delete other policy
  224. response = self.client.delete_policy(
  225. self.token, using=self.token_manage, domain=self.my_domains[1].name
  226. )
  227. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  228. ## delete default policy
  229. response = self.client.delete_policy(
  230. self.token, using=self.token_manage, domain=None
  231. )
  232. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  233. ## idempotence: delete a non-existing policy
  234. response = self.client.delete_policy(
  235. self.token, using=self.token_manage, domain=self.my_domains[0].name
  236. )
  237. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  238. ## verify that policies are gone
  239. for domain in [None, self.my_domains[0].name, self.my_domains[1].name]:
  240. response = self.client.get_policy(
  241. self.token, using=self.token_manage, domain=domain
  242. )
  243. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  244. # List: empty again
  245. response = self.client.list_policies(self.token, using=self.token_manage)
  246. self.assertStatus(response, status.HTTP_200_OK)
  247. self.assertEqual(response.data, [])
  248. def test_policy_permissions(self):
  249. def _reset_policies(token):
  250. for policy in token.tokendomainpolicy_set.all():
  251. for perm in self.default_data.keys():
  252. setattr(policy, perm, False)
  253. policy.save()
  254. def _perform_requests(name, perm, value, **kwargs):
  255. responses = []
  256. if value:
  257. pdns_name = self._normalize_name(name).lower()
  258. cm = self.assertPdnsNoRequestsBut(
  259. self.request_pdns_zone_update(name=pdns_name),
  260. self.request_pdns_zone_axfr(name=pdns_name),
  261. )
  262. else:
  263. cm = nullcontext()
  264. if perm == "perm_dyndns":
  265. data = {"username": name, "password": self.token.plain}
  266. with cm:
  267. responses.append(
  268. self.client.get(self.reverse("v1:dyndns12update"), data)
  269. )
  270. return responses
  271. if perm == "perm_rrsets":
  272. url_detail = self.reverse("v1:rrset@", name=name, subname="", type="A")
  273. url_list = self.reverse("v1:rrsets", name=name)
  274. responses.append(self.client.get(url_list, **kwargs))
  275. responses.append(self.client.patch(url_list, [], **kwargs))
  276. responses.append(self.client.put(url_list, [], **kwargs))
  277. responses.append(self.client.post(url_list, [], **kwargs))
  278. data = {"subname": "", "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}
  279. with cm:
  280. responses += [
  281. self.client.delete(url_detail, **kwargs),
  282. self.client.post(url_list, data=data, **kwargs),
  283. self.client.put(url_detail, data=data, **kwargs),
  284. self.client.patch(url_detail, data=data, **kwargs),
  285. self.client.get(url_detail, **kwargs),
  286. ]
  287. return responses
  288. raise ValueError(f"Unexpected permission: {perm}")
  289. # Create
  290. ## default policy
  291. data = dict(domain=None)
  292. response = self.client.create_policy(
  293. self.token, using=self.token_manage, data=data
  294. )
  295. self.assertStatus(response, status.HTTP_201_CREATED)
  296. ## another policy
  297. data = dict(domain=self.my_domains[0].name)
  298. response = self.client.create_policy(
  299. self.token, using=self.token_manage, data=data
  300. )
  301. self.assertStatus(response, status.HTTP_201_CREATED)
  302. ## verify object creation
  303. response = self.client.get_policy(
  304. self.token, using=self.token_manage, domain=self.my_domains[0].name
  305. )
  306. self.assertStatus(response, status.HTTP_200_OK)
  307. self.assertEqual(response.data, self.default_data | data)
  308. policies = {
  309. self.my_domains[0]: self.token.tokendomainpolicy_set.get(
  310. domain__isnull=False
  311. ),
  312. self.my_domains[1]: self.token.tokendomainpolicy_set.get(
  313. domain__isnull=True
  314. ),
  315. }
  316. kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
  317. # For each permission type
  318. for perm in self.default_data.keys():
  319. # For the domain with specific policy and for the domain covered by the default policy
  320. for domain in policies.keys():
  321. # For both possible values of the permission
  322. for value in [True, False]:
  323. # Set only that permission for that domain (on its effective policy)
  324. _reset_policies(self.token)
  325. policy = policies[domain]
  326. setattr(policy, perm, value)
  327. policy.save()
  328. # Perform requests that test this permission and inspect responses
  329. for response in _perform_requests(
  330. domain.name, perm, value, **kwargs
  331. ):
  332. if value:
  333. self.assertIn(response.status_code, range(200, 300))
  334. else:
  335. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  336. # Can't create domain
  337. data = {"name": self.random_domain_name()}
  338. response = self.client.post(
  339. self.reverse("v1:domain-list"), data, **kwargs
  340. )
  341. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  342. # Can't access account details
  343. response = self.client.get(self.reverse("v1:account"), **kwargs)
  344. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  345. def test_domain_owner_consistency(self):
  346. models.TokenDomainPolicy(token=self.token, domain=None).save()
  347. domain = self.my_domains[0]
  348. policy = models.TokenDomainPolicy(token=self.token, domain=domain)
  349. policy.save()
  350. domain.owner = self.other_domains[0].owner
  351. with self.assertRaises(IntegrityError):
  352. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  353. domain.save()
  354. policy.delete()
  355. domain.save()
  356. def test_token_user_consistency(self):
  357. policy = models.TokenDomainPolicy(token=self.token, domain=None)
  358. policy.save()
  359. self.token.user = self.other_domains[0].owner
  360. with self.assertRaises(IntegrityError):
  361. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  362. self.token.save()
  363. policy.delete()
  364. self.token.save()
  365. def test_domain_owner_equals_token_user(self):
  366. models.TokenDomainPolicy(token=self.token, domain=None).save()
  367. with self.assertRaises(IntegrityError):
  368. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  369. models.TokenDomainPolicy(
  370. token=self.token, domain=self.other_domains[0]
  371. ).save()
  372. self.token.user = self.other_domain.owner
  373. with self.assertRaises(IntegrityError):
  374. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  375. self.token.save()
  376. def test_domain_deletion(self):
  377. domains = [None] + self.my_domains[:2]
  378. for domain in domains:
  379. models.TokenDomainPolicy(token=self.token, domain=domain).save()
  380. domain = domains.pop()
  381. domain.delete()
  382. self.assertEqual(
  383. list(map(lambda x: x.domain, self.token.tokendomainpolicy_set.all())),
  384. domains,
  385. )
  386. def test_token_deletion(self):
  387. domains = [None] + self.my_domains[:2]
  388. policies = {}
  389. for domain in domains:
  390. policy = models.TokenDomainPolicy(token=self.token, domain=domain)
  391. policies[domain] = policy
  392. policy.save()
  393. self.token.delete()
  394. for domain, policy in policies.items():
  395. self.assertFalse(
  396. models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
  397. )
  398. if domain:
  399. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  400. def test_user_deletion(self):
  401. domains = [None] + self.my_domains[:2]
  402. for domain in domains:
  403. models.TokenDomainPolicy(token=self.token, domain=domain).save()
  404. # User can only be deleted when domains are deleted
  405. for domain in self.my_domains:
  406. domain.delete()
  407. # Only the default policy should be left, so get can simply get() it
  408. policy_pk = self.token.tokendomainpolicy_set.get().pk
  409. self.token.user.delete()
  410. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())