test_token_domain_policy.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. from contextlib import nullcontext
  2. from django.db import transaction
  3. from django.db.utils import IntegrityError
  4. from rest_framework import status
  5. from rest_framework.test import APIClient
  6. from desecapi import models
  7. from desecapi.tests.base import DomainOwnerTestCase
  8. class TokenDomainPolicyClient(APIClient):
  9. def _request(self, method, url, *, using, **kwargs):
  10. if using is not None:
  11. kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
  12. return method(url, **kwargs)
  13. def _request_policy(self, method, target, *, using, policy_id, **kwargs):
  14. url = DomainOwnerTestCase.reverse(
  15. "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
  16. )
  17. return self._request(method, url, using=using, **kwargs)
  18. def _request_policies(self, method, target, *, using, **kwargs):
  19. url = DomainOwnerTestCase.reverse(
  20. "v1:token_domain_policies-list", token_id=target.id
  21. )
  22. return self._request(method, url, using=using, **kwargs)
  23. def list_policies(self, target, *, using):
  24. return self._request_policies(self.get, target, using=using)
  25. def create_policy(self, target, *, using, **kwargs):
  26. return self._request_policies(self.post, target, using=using, **kwargs)
  27. def get_policy(self, target, *, using, policy_id):
  28. return self._request_policy(self.get, target, using=using, policy_id=policy_id)
  29. def patch_policy(self, target, *, using, policy_id, **kwargs):
  30. return self._request_policy(
  31. self.patch, target, using=using, policy_id=policy_id, **kwargs
  32. )
  33. def delete_policy(self, target, *, using, policy_id):
  34. return self._request_policy(
  35. self.delete, target, using=using, policy_id=policy_id
  36. )
  37. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  38. client_class = TokenDomainPolicyClient
  39. default_data = dict(perm_dyndns=False, perm_write=False)
  40. def setUp(self):
  41. super().setUp()
  42. self.client.credentials() # remove default credential (corresponding to domain owner)
  43. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  44. self.other_token = self.create_token(self.user)
  45. def test_policy_lifecycle_without_management_permission(self):
  46. # Prepare (with management token)
  47. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  48. response = self.client.create_policy(
  49. self.token, using=self.token_manage, data=data
  50. )
  51. self.assertStatus(response, status.HTTP_201_CREATED)
  52. response = self.client.create_policy(
  53. self.token_manage, using=self.token_manage, data=data
  54. )
  55. self.assertStatus(response, status.HTTP_201_CREATED)
  56. # Self-inspection is fine
  57. ## List
  58. response = self.client.list_policies(self.token, using=self.token)
  59. self.assertStatus(response, status.HTTP_200_OK)
  60. self.assertEqual(len(response.data), 1)
  61. default_policy_id = response.data[0]["id"]
  62. ## Get
  63. response = self.client.get_policy(
  64. self.token, using=self.token, policy_id=default_policy_id
  65. )
  66. self.assertStatus(response, status.HTTP_200_OK)
  67. self.assertEqual(
  68. response.data, self.default_data | data | {"id": default_policy_id}
  69. )
  70. # Inspection of other tokens forbidden
  71. ## List
  72. response = self.client.list_policies(self.token_manage, using=self.token)
  73. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  74. ## Get
  75. response = self.client.get_policy(
  76. self.token_manage, using=self.token, policy_id=default_policy_id
  77. )
  78. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  79. # Write operations forbidden (self and other)
  80. for target in [self.token, self.token_manage]:
  81. # Create
  82. response = self.client.create_policy(target, using=self.token)
  83. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  84. # Change
  85. data = dict(perm_dyndns=False, perm_write=True)
  86. policy_id = models.TokenDomainPolicy.objects.get(
  87. token=target, domain__isnull=True
  88. )
  89. response = self.client.patch_policy(
  90. target, using=self.token, policy_id=policy_id, data=data
  91. )
  92. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  93. # Delete
  94. response = self.client.delete_policy(
  95. target, using=self.token, policy_id=default_policy_id
  96. )
  97. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  98. def test_policy_lifecycle(self):
  99. # Can't do anything unauthorized
  100. response = self.client.list_policies(self.token, using=None)
  101. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  102. response = self.client.create_policy(self.token, using=None)
  103. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  104. # Create
  105. ## without required field
  106. response = self.client.create_policy(self.token, using=self.token_manage)
  107. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  108. self.assertEqual(response.data["domain"], ["This field is required."])
  109. ## without a default policy
  110. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  111. with transaction.atomic():
  112. response = self.client.create_policy(
  113. self.token, using=self.token_manage, data=data
  114. )
  115. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  116. self.assertEqual(
  117. response.data["domain"],
  118. ["Policy precedence: The first policy must be the default policy."],
  119. )
  120. # List: still empty
  121. response = self.client.list_policies(self.token, using=self.token_manage)
  122. self.assertStatus(response, status.HTTP_200_OK)
  123. self.assertEqual(response.data, [])
  124. # Create
  125. ## default policy
  126. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  127. response = self.client.create_policy(
  128. self.token, using=self.token_manage, data=data
  129. )
  130. self.assertStatus(response, status.HTTP_201_CREATED)
  131. default_policy_id = response.data["id"]
  132. ## can't create another default policy
  133. with transaction.atomic():
  134. response = self.client.create_policy(
  135. self.token,
  136. using=self.token_manage,
  137. data={"domain": None, "subname": None, "type": None},
  138. )
  139. self.assertStatus(response, status.HTTP_409_CONFLICT)
  140. ## verify object creation
  141. response = self.client.get_policy(
  142. self.token, using=self.token_manage, policy_id=default_policy_id
  143. )
  144. self.assertStatus(response, status.HTTP_200_OK)
  145. self.assertEqual(
  146. response.data, self.default_data | data | {"id": default_policy_id}
  147. )
  148. ## can't create policy for other user's domain
  149. data = {
  150. "domain": self.other_domain.name,
  151. "subname": None,
  152. "type": None,
  153. "perm_dyndns": True,
  154. "perm_write": True,
  155. }
  156. response = self.client.create_policy(
  157. self.token, using=self.token_manage, data=data
  158. )
  159. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  160. self.assertEqual(response.data["domain"][0].code, "does_not_exist")
  161. ## another policy
  162. data = {
  163. "domain": self.my_domains[0].name,
  164. "subname": None,
  165. "type": None,
  166. "perm_dyndns": True,
  167. }
  168. response = self.client.create_policy(
  169. self.token, using=self.token_manage, data=data
  170. )
  171. self.assertStatus(response, status.HTTP_201_CREATED)
  172. policy_id = response.data["id"]
  173. ## can't create policy for the same domain
  174. with transaction.atomic():
  175. response = self.client.create_policy(
  176. self.token, using=self.token_manage, data=data
  177. )
  178. self.assertStatus(response, status.HTTP_409_CONFLICT)
  179. ## verify object creation
  180. response = self.client.get_policy(
  181. self.token, using=self.token_manage, policy_id=policy_id
  182. )
  183. self.assertStatus(response, status.HTTP_200_OK)
  184. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  185. # List: now has two elements
  186. response = self.client.list_policies(self.token, using=self.token_manage)
  187. self.assertStatus(response, status.HTTP_200_OK)
  188. self.assertEqual(len(response.data), 2)
  189. # Change
  190. ## all fields of a policy
  191. data = dict(
  192. domain=self.my_domains[1].name,
  193. subname="_acme-challenge",
  194. type="TXT",
  195. perm_dyndns=False,
  196. perm_write=True,
  197. )
  198. response = self.client.patch_policy(
  199. self.token,
  200. using=self.token_manage,
  201. policy_id=policy_id,
  202. data=data,
  203. )
  204. self.assertStatus(response, status.HTTP_200_OK)
  205. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  206. ## verify modification
  207. response = self.client.get_policy(
  208. self.token, using=self.token_manage, policy_id=policy_id
  209. )
  210. self.assertStatus(response, status.HTTP_200_OK)
  211. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  212. ## verify that the default policy can't be changed to a non-default policy
  213. with transaction.atomic():
  214. response = self.client.patch_policy(
  215. self.token,
  216. using=self.token_manage,
  217. policy_id=default_policy_id,
  218. data=data,
  219. )
  220. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  221. self.assertEqual(
  222. response.data,
  223. {
  224. "domain": [
  225. "Policy precedence: Cannot disable default policy when others exist."
  226. ]
  227. },
  228. )
  229. ## partially modify the default policy
  230. data = dict(perm_dyndns=True)
  231. response = self.client.patch_policy(
  232. self.token, using=self.token_manage, policy_id=default_policy_id, data=data
  233. )
  234. self.assertStatus(response, status.HTTP_200_OK)
  235. self.assertEqual(
  236. response.data,
  237. {
  238. "id": default_policy_id,
  239. "domain": None,
  240. "subname": None,
  241. "type": None,
  242. "perm_write": True,
  243. }
  244. | data,
  245. )
  246. # Delete
  247. ## can't delete default policy while others exist
  248. with transaction.atomic():
  249. response = self.client.delete_policy(
  250. self.token, using=self.token_manage, policy_id=default_policy_id
  251. )
  252. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  253. self.assertEqual(
  254. response.data,
  255. {
  256. "domain": [
  257. "Policy precedence: Can't delete default policy when there exist others."
  258. ]
  259. },
  260. )
  261. ## delete other policy
  262. response = self.client.delete_policy(
  263. self.token, using=self.token_manage, policy_id=policy_id
  264. )
  265. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  266. ## delete default policy
  267. response = self.client.delete_policy(
  268. self.token, using=self.token_manage, policy_id=default_policy_id
  269. )
  270. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  271. ## idempotence: delete a non-existing policy
  272. response = self.client.delete_policy(
  273. self.token, using=self.token_manage, policy_id=policy_id
  274. )
  275. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  276. ## verify that policies are gone
  277. for pid in [policy_id, default_policy_id]:
  278. response = self.client.get_policy(
  279. self.token, using=self.token_manage, policy_id=pid
  280. )
  281. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  282. # List: empty again
  283. response = self.client.list_policies(self.token, using=self.token_manage)
  284. self.assertStatus(response, status.HTTP_200_OK)
  285. self.assertEqual(response.data, [])
  286. def test_policy_permissions(self):
  287. def _reset_policies(token):
  288. for policy in token.tokendomainpolicy_set.all():
  289. for perm in self.default_data.keys():
  290. setattr(policy, perm, False)
  291. policy.save()
  292. def _perform_requests(name, perm, value, **kwargs):
  293. responses = []
  294. if value:
  295. pdns_name = self._normalize_name(name).lower()
  296. cm = self.assertNoRequestsBut(
  297. self.request_pdns_zone_update(name=pdns_name),
  298. self.request_pdns_zone_axfr(name=pdns_name),
  299. )
  300. else:
  301. cm = nullcontext()
  302. if perm == "perm_dyndns":
  303. data = {"username": name, "password": self.token.plain}
  304. with cm:
  305. responses.append(
  306. self.client.get(self.reverse("v1:dyndns12update"), data)
  307. )
  308. return responses
  309. if perm == "perm_write":
  310. url_detail = self.reverse("v1:rrset@", name=name, subname="", type="A")
  311. url_list = self.reverse("v1:rrsets", name=name)
  312. responses.append(self.client.get(url_list, **kwargs))
  313. responses.append(self.client.patch(url_list, [], **kwargs))
  314. responses.append(self.client.put(url_list, [], **kwargs))
  315. responses.append(self.client.post(url_list, [], **kwargs))
  316. data = {"subname": "", "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}
  317. with cm:
  318. responses += [
  319. self.client.delete(url_detail, **kwargs),
  320. self.client.post(url_list, data=data, **kwargs),
  321. self.client.put(url_detail, data=data, **kwargs),
  322. self.client.patch(url_detail, data=data, **kwargs),
  323. self.client.get(url_detail, **kwargs),
  324. ]
  325. return responses
  326. raise ValueError(f"Unexpected permission: {perm}")
  327. # Create
  328. ## default policy
  329. data = {"domain": None, "subname": None, "type": None}
  330. response = self.client.create_policy(
  331. self.token, using=self.token_manage, data=data
  332. )
  333. self.assertStatus(response, status.HTTP_201_CREATED)
  334. ## another policy
  335. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  336. response = self.client.create_policy(
  337. self.token, using=self.token_manage, data=data
  338. )
  339. self.assertStatus(response, status.HTTP_201_CREATED)
  340. policy_id = response.data["id"]
  341. ## verify object creation
  342. response = self.client.get_policy(
  343. self.token, using=self.token_manage, policy_id=policy_id
  344. )
  345. self.assertStatus(response, status.HTTP_200_OK)
  346. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  347. policies = {
  348. self.my_domains[0]: self.token.tokendomainpolicy_set.get(
  349. domain__isnull=False
  350. ),
  351. self.my_domains[1]: self.token.tokendomainpolicy_set.get(
  352. domain__isnull=True
  353. ),
  354. }
  355. kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
  356. # For each permission type
  357. for perm in self.default_data.keys():
  358. # For the domain with specific policy and for the domain covered by the default policy
  359. for domain in policies.keys():
  360. # For both possible values of the permission
  361. for value in [True, False]:
  362. # Set only that permission for that domain (on its effective policy)
  363. _reset_policies(self.token)
  364. policy = policies[domain]
  365. setattr(policy, perm, value)
  366. policy.save()
  367. # Perform requests that test this permission and inspect responses
  368. for response in _perform_requests(
  369. domain.name, perm, value, **kwargs
  370. ):
  371. if value:
  372. self.assertIn(response.status_code, range(200, 300))
  373. else:
  374. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  375. # Can't create domain
  376. data = {"name": self.random_domain_name()}
  377. response = self.client.post(
  378. self.reverse("v1:domain-list"), data, **kwargs
  379. )
  380. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  381. # Can't access account details
  382. response = self.client.get(self.reverse("v1:account"), **kwargs)
  383. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  384. def test_domain_owner_consistency(self):
  385. models.TokenDomainPolicy(
  386. token=self.token, domain=None, subname=None, type=None
  387. ).save()
  388. domain = self.my_domains[0]
  389. policy = models.TokenDomainPolicy(
  390. token=self.token, domain=domain, subname=None, type=None
  391. )
  392. policy.save()
  393. domain.owner = self.other_domains[0].owner
  394. with self.assertRaises(IntegrityError):
  395. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  396. domain.save()
  397. policy.delete()
  398. domain.save()
  399. def test_token_user_consistency(self):
  400. policy = models.TokenDomainPolicy(
  401. token=self.token, domain=None, subname=None, type=None
  402. )
  403. policy.save()
  404. self.token.user = self.other_domains[0].owner
  405. with self.assertRaises(IntegrityError):
  406. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  407. self.token.save()
  408. policy.delete()
  409. self.token.save()
  410. def test_domain_owner_equals_token_user(self):
  411. models.TokenDomainPolicy(
  412. token=self.token, domain=None, subname=None, type=None
  413. ).save()
  414. with self.assertRaises(IntegrityError):
  415. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  416. models.TokenDomainPolicy(
  417. token=self.token,
  418. domain=self.other_domains[0],
  419. subname=None,
  420. type=None,
  421. ).save()
  422. self.token.user = self.other_domain.owner
  423. with self.assertRaises(IntegrityError):
  424. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  425. self.token.save()
  426. def test_domain_deletion(self):
  427. domains = [None] + self.my_domains[:2]
  428. for domain in domains:
  429. models.TokenDomainPolicy(
  430. token=self.token, domain=domain, subname=None, type=None
  431. ).save()
  432. domain = domains.pop()
  433. domain.delete()
  434. self.assertEqual(
  435. list(map(lambda x: x.domain, self.token.tokendomainpolicy_set.all())),
  436. domains,
  437. )
  438. def test_token_deletion(self):
  439. domains = [None] + self.my_domains[:2]
  440. policies = {}
  441. for domain in domains:
  442. policy = models.TokenDomainPolicy(
  443. token=self.token, domain=domain, subname=None, type=None
  444. )
  445. policies[domain] = policy
  446. policy.save()
  447. self.token.delete()
  448. for domain, policy in policies.items():
  449. self.assertFalse(
  450. models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
  451. )
  452. if domain:
  453. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  454. def test_user_deletion(self):
  455. domains = [None] + self.my_domains[:2]
  456. for domain in domains:
  457. models.TokenDomainPolicy(
  458. token=self.token, domain=domain, subname=None, type=None
  459. ).save()
  460. # User can only be deleted when domains are deleted
  461. for domain in self.my_domains:
  462. domain.delete()
  463. # Only the default policy should be left, so get can simply get() it
  464. policy_pk = self.token.tokendomainpolicy_set.get().pk
  465. self.token.user.delete()
  466. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())