test_token_domain_policy.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. from contextlib import nullcontext
  2. from django.db import transaction
  3. from django.db.utils import IntegrityError
  4. from rest_framework import status
  5. from rest_framework.test import APIClient
  6. from desecapi import models
  7. from desecapi.tests.base import DomainOwnerTestCase
  8. class TokenDomainPolicyClient(APIClient):
  9. def _request(self, method, url, *, using, **kwargs):
  10. if using is not None:
  11. kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
  12. return method(url, **kwargs)
  13. def _request_policy(self, method, target, *, using, policy_id, **kwargs):
  14. url = DomainOwnerTestCase.reverse(
  15. "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
  16. )
  17. return self._request(method, url, using=using, **kwargs)
  18. def _request_policies(self, method, target, *, using, **kwargs):
  19. url = DomainOwnerTestCase.reverse(
  20. "v1:token_domain_policies-list", token_id=target.id
  21. )
  22. return self._request(method, url, using=using, **kwargs)
  23. def list_policies(self, target, *, using):
  24. return self._request_policies(self.get, target, using=using)
  25. def create_policy(self, target, *, using, **kwargs):
  26. return self._request_policies(self.post, target, using=using, **kwargs)
  27. def get_policy(self, target, *, using, policy_id):
  28. return self._request_policy(self.get, target, using=using, policy_id=policy_id)
  29. def patch_policy(self, target, *, using, policy_id, **kwargs):
  30. return self._request_policy(
  31. self.patch, target, using=using, policy_id=policy_id, **kwargs
  32. )
  33. def delete_policy(self, target, *, using, policy_id):
  34. return self._request_policy(
  35. self.delete, target, using=using, policy_id=policy_id
  36. )
  37. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  38. client_class = TokenDomainPolicyClient
  39. default_data = dict(perm_dyndns=False, perm_write=False)
  40. def setUp(self):
  41. super().setUp()
  42. self.client.credentials() # remove default credential (corresponding to domain owner)
  43. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  44. self.other_token = self.create_token(self.user)
  45. def test_get_policy(self):
  46. def get_policy(domain, subname, type):
  47. return self.token.get_policy(domain=domain, subname=subname, type=type)
  48. def assertPolicy(policy, domain, subname, type):
  49. self.assertEqual(policy.domain, domain)
  50. self.assertEqual(policy.subname, subname)
  51. self.assertEqual(policy.type, type)
  52. qs = self.token.tokendomainpolicy_set
  53. # Default policy is fallback for everything
  54. qs.create(domain=None, subname=None, type=None)
  55. for kwargs in [
  56. dict(subname=subname, type=type_)
  57. for subname in (None, "www")
  58. for type_ in (None, "A")
  59. ]:
  60. policy = get_policy(self.my_domain, **kwargs)
  61. assertPolicy(policy, None, None, None)
  62. # Type wins over default
  63. qs.create(domain=None, subname=None, type="A")
  64. policy = get_policy(self.my_domain, "www", "A")
  65. assertPolicy(policy, None, None, "A")
  66. # Subname wins over type
  67. qs.create(domain=None, subname="www", type=None)
  68. policy = get_policy(self.my_domain, "www", "A")
  69. assertPolicy(policy, None, "www", None)
  70. # Most specific wins
  71. qs.create(domain=None, subname="www", type="A")
  72. policy = get_policy(self.my_domain, "www", "A")
  73. assertPolicy(policy, None, "www", "A")
  74. # Domain wins over default and over subname and type
  75. qs.create(domain=self.my_domain, subname=None, type=None)
  76. policy = get_policy(self.my_domain, None, None)
  77. assertPolicy(policy, self.my_domain, None, None)
  78. # Subname wins over default or domain default
  79. qs.create(domain=self.my_domain, subname="www", type=None)
  80. for kwargs in [
  81. dict(subname="www", type=None),
  82. dict(subname="www", type="A"),
  83. ]:
  84. policy = get_policy(self.my_domain, **kwargs)
  85. assertPolicy(policy, self.my_domain, "www", None)
  86. # Type wins over default or domain default
  87. qs.create(domain=self.my_domain, subname=None, type="A")
  88. for kwargs in [
  89. dict(subname=None, type="A"),
  90. dict(subname="www2", type="A"),
  91. ]:
  92. policy = get_policy(self.my_domain, **kwargs)
  93. assertPolicy(policy, self.my_domain, None, "A")
  94. # Subname wins over type
  95. policy = get_policy(self.my_domain, "www", "A")
  96. assertPolicy(policy, self.my_domain, "www", None)
  97. # Subname + type wins over less specific
  98. qs.create(domain=self.my_domain, subname="www", type="A")
  99. policy = get_policy(self.my_domain, "www", "A")
  100. assertPolicy(policy, self.my_domain, "www", "A")
  101. # Check that we did all combinations
  102. self.assertEqual(qs.count(), 2**3)
  103. def test_policy_lifecycle_without_management_permission(self):
  104. # Prepare (with management token)
  105. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  106. response = self.client.create_policy(
  107. self.token, using=self.token_manage, data=data
  108. )
  109. self.assertStatus(response, status.HTTP_201_CREATED)
  110. response = self.client.create_policy(
  111. self.token_manage, using=self.token_manage, data=data
  112. )
  113. self.assertStatus(response, status.HTTP_201_CREATED)
  114. # Self-inspection is fine
  115. ## List
  116. response = self.client.list_policies(self.token, using=self.token)
  117. self.assertStatus(response, status.HTTP_200_OK)
  118. self.assertEqual(len(response.data), 1)
  119. default_policy_id = response.data[0]["id"]
  120. ## Get
  121. response = self.client.get_policy(
  122. self.token, using=self.token, policy_id=default_policy_id
  123. )
  124. self.assertStatus(response, status.HTTP_200_OK)
  125. self.assertEqual(
  126. response.data, self.default_data | data | {"id": default_policy_id}
  127. )
  128. # Inspection of other tokens forbidden
  129. ## List
  130. response = self.client.list_policies(self.token_manage, using=self.token)
  131. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  132. ## Get
  133. response = self.client.get_policy(
  134. self.token_manage, using=self.token, policy_id=default_policy_id
  135. )
  136. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  137. # Write operations forbidden (self and other)
  138. for target in [self.token, self.token_manage]:
  139. # Create
  140. response = self.client.create_policy(target, using=self.token)
  141. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  142. # Change
  143. data = dict(perm_dyndns=False, perm_write=True)
  144. policy = target.get_policy()
  145. response = self.client.patch_policy(
  146. target, using=self.token, policy_id=policy.pk, data=data
  147. )
  148. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  149. # Delete
  150. response = self.client.delete_policy(
  151. target, using=self.token, policy_id=default_policy_id
  152. )
  153. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  154. def test_policy_lifecycle(self):
  155. # Can't do anything unauthorized
  156. response = self.client.list_policies(self.token, using=None)
  157. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  158. response = self.client.create_policy(self.token, using=None)
  159. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  160. # Create
  161. ## without required field
  162. response = self.client.create_policy(self.token, using=self.token_manage)
  163. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  164. for field in ["domain", "subname", "type"]:
  165. self.assertEqual(response.data[field], ["This field is required."])
  166. ## without a default policy
  167. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  168. with transaction.atomic():
  169. response = self.client.create_policy(
  170. self.token, using=self.token_manage, data=data
  171. )
  172. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  173. self.assertEqual(
  174. response.data["non_field_errors"],
  175. ["Policy precedence: The first policy must be the default policy."],
  176. )
  177. # List: still empty
  178. response = self.client.list_policies(self.token, using=self.token_manage)
  179. self.assertStatus(response, status.HTTP_200_OK)
  180. self.assertEqual(response.data, [])
  181. # Create
  182. ## default policy
  183. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  184. response = self.client.create_policy(
  185. self.token, using=self.token_manage, data=data
  186. )
  187. self.assertStatus(response, status.HTTP_201_CREATED)
  188. default_policy_id = response.data["id"]
  189. ## can't create another default policy
  190. with transaction.atomic():
  191. response = self.client.create_policy(
  192. self.token,
  193. using=self.token_manage,
  194. data={"domain": None, "subname": None, "type": None},
  195. )
  196. self.assertStatus(response, status.HTTP_409_CONFLICT)
  197. ## verify object creation
  198. response = self.client.get_policy(
  199. self.token, using=self.token_manage, policy_id=default_policy_id
  200. )
  201. self.assertStatus(response, status.HTTP_200_OK)
  202. self.assertEqual(
  203. response.data, self.default_data | data | {"id": default_policy_id}
  204. )
  205. ## can't create policy for other user's domain
  206. data = {
  207. "domain": self.other_domain.name,
  208. "subname": None,
  209. "type": None,
  210. "perm_dyndns": True,
  211. "perm_write": True,
  212. }
  213. response = self.client.create_policy(
  214. self.token, using=self.token_manage, data=data
  215. )
  216. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  217. self.assertEqual(response.data["domain"][0].code, "does_not_exist")
  218. ## another policy
  219. data = {
  220. "domain": self.my_domains[0].name,
  221. "subname": None,
  222. "type": None,
  223. "perm_dyndns": True,
  224. }
  225. response = self.client.create_policy(
  226. self.token, using=self.token_manage, data=data
  227. )
  228. self.assertStatus(response, status.HTTP_201_CREATED)
  229. policy_id = response.data["id"]
  230. ## can't create policy for the same domain
  231. with transaction.atomic():
  232. response = self.client.create_policy(
  233. self.token, using=self.token_manage, data=data
  234. )
  235. self.assertStatus(response, status.HTTP_409_CONFLICT)
  236. ## verify object creation
  237. response = self.client.get_policy(
  238. self.token, using=self.token_manage, policy_id=policy_id
  239. )
  240. self.assertStatus(response, status.HTTP_200_OK)
  241. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  242. # List: now has two elements
  243. response = self.client.list_policies(self.token, using=self.token_manage)
  244. self.assertStatus(response, status.HTTP_200_OK)
  245. self.assertEqual(len(response.data), 2)
  246. # Change
  247. ## all fields of a policy
  248. data = dict(
  249. domain=self.my_domains[1].name,
  250. subname="_acme-challenge",
  251. type="TXT",
  252. perm_dyndns=False,
  253. perm_write=True,
  254. )
  255. response = self.client.patch_policy(
  256. self.token,
  257. using=self.token_manage,
  258. policy_id=policy_id,
  259. data=data,
  260. )
  261. self.assertStatus(response, status.HTTP_200_OK)
  262. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  263. ## verify modification
  264. response = self.client.get_policy(
  265. self.token, using=self.token_manage, policy_id=policy_id
  266. )
  267. self.assertStatus(response, status.HTTP_200_OK)
  268. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  269. ## verify that the default policy can't be changed to a non-default policy
  270. with transaction.atomic():
  271. response = self.client.patch_policy(
  272. self.token,
  273. using=self.token_manage,
  274. policy_id=default_policy_id,
  275. data=data,
  276. )
  277. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  278. self.assertEqual(
  279. response.data["non_field_errors"],
  280. ["When using policies, there must be exactly one default policy."],
  281. )
  282. ## partially modify the default policy
  283. data = dict(perm_dyndns=True)
  284. response = self.client.patch_policy(
  285. self.token, using=self.token_manage, policy_id=default_policy_id, data=data
  286. )
  287. self.assertStatus(response, status.HTTP_200_OK)
  288. self.assertEqual(
  289. response.data,
  290. {
  291. "id": default_policy_id,
  292. "domain": None,
  293. "subname": None,
  294. "type": None,
  295. "perm_write": True,
  296. }
  297. | data,
  298. )
  299. # Delete
  300. ## can't delete default policy while others exist
  301. with transaction.atomic():
  302. response = self.client.delete_policy(
  303. self.token, using=self.token_manage, policy_id=default_policy_id
  304. )
  305. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  306. self.assertEqual(
  307. response.data["non_field_errors"],
  308. ["Policy precedence: Can't delete default policy when there exist others."],
  309. )
  310. ## delete other policy
  311. response = self.client.delete_policy(
  312. self.token, using=self.token_manage, policy_id=policy_id
  313. )
  314. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  315. ## delete default policy
  316. response = self.client.delete_policy(
  317. self.token, using=self.token_manage, policy_id=default_policy_id
  318. )
  319. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  320. ## idempotence: delete a non-existing policy
  321. response = self.client.delete_policy(
  322. self.token, using=self.token_manage, policy_id=policy_id
  323. )
  324. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  325. ## verify that policies are gone
  326. for pid in [policy_id, default_policy_id]:
  327. response = self.client.get_policy(
  328. self.token, using=self.token_manage, policy_id=pid
  329. )
  330. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  331. # List: empty again
  332. response = self.client.list_policies(self.token, using=self.token_manage)
  333. self.assertStatus(response, status.HTTP_200_OK)
  334. self.assertEqual(response.data, [])
  335. def test_policy_permissions(self):
  336. def _reset_policies(token):
  337. for policy in token.tokendomainpolicy_set.all():
  338. for perm in self.default_data.keys():
  339. setattr(policy, perm, False)
  340. policy.save()
  341. def _perform_requests(name, perm, value, **kwargs):
  342. responses = []
  343. if value:
  344. pdns_name = self._normalize_name(name).lower()
  345. cm = self.assertNoRequestsBut(
  346. self.request_pdns_zone_update(name=pdns_name),
  347. self.request_pdns_zone_axfr(name=pdns_name),
  348. )
  349. else:
  350. cm = nullcontext()
  351. if perm == "perm_dyndns":
  352. data = {"username": name, "password": self.token.plain}
  353. with cm:
  354. responses.append(
  355. self.client.get(self.reverse("v1:dyndns12update"), data)
  356. )
  357. return responses
  358. if perm == "perm_write":
  359. url_detail = self.reverse("v1:rrset@", name=name, subname="", type="A")
  360. url_list = self.reverse("v1:rrsets", name=name)
  361. responses.append(self.client.get(url_list, **kwargs))
  362. responses.append(self.client.patch(url_list, [], **kwargs))
  363. responses.append(self.client.put(url_list, [], **kwargs))
  364. responses.append(self.client.post(url_list, [], **kwargs))
  365. data = {"subname": "", "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}
  366. with cm:
  367. responses += [
  368. self.client.delete(url_detail, **kwargs),
  369. self.client.post(url_list, data=data, **kwargs),
  370. self.client.put(url_detail, data=data, **kwargs),
  371. self.client.patch(url_detail, data=data, **kwargs),
  372. self.client.get(url_detail, **kwargs),
  373. ]
  374. return responses
  375. raise ValueError(f"Unexpected permission: {perm}")
  376. # Create
  377. ## default policy
  378. data = {"domain": None, "subname": None, "type": None}
  379. response = self.client.create_policy(
  380. self.token, using=self.token_manage, data=data
  381. )
  382. self.assertStatus(response, status.HTTP_201_CREATED)
  383. default_policy_id = response.data["id"]
  384. ## another policy
  385. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  386. response = self.client.create_policy(
  387. self.token, using=self.token_manage, data=data
  388. )
  389. self.assertStatus(response, status.HTTP_201_CREATED)
  390. policy_id = response.data["id"]
  391. ## verify object creation
  392. response = self.client.get_policy(
  393. self.token, using=self.token_manage, policy_id=policy_id
  394. )
  395. self.assertStatus(response, status.HTTP_200_OK)
  396. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  397. policy_id_by_domain = {
  398. self.my_domains[0]: policy_id,
  399. self.my_domains[1]: default_policy_id,
  400. }
  401. kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
  402. # For each permission type
  403. for perm in self.default_data.keys():
  404. # For the domain with specific policy and for the domain covered by the default policy
  405. for domain in policy_id_by_domain.keys():
  406. # For both possible values of the permission
  407. for value in [True, False]:
  408. # Set only that permission for that domain (on its effective policy)
  409. _reset_policies(self.token)
  410. policy = self.token.tokendomainpolicy_set.get(
  411. pk=policy_id_by_domain[domain]
  412. )
  413. setattr(policy, perm, value)
  414. policy.save()
  415. # Perform requests that test this permission and inspect responses
  416. for response in _perform_requests(
  417. domain.name, perm, value, **kwargs
  418. ):
  419. if value:
  420. self.assertIn(response.status_code, range(200, 300))
  421. else:
  422. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  423. # Can't create domain
  424. data = {"name": self.random_domain_name()}
  425. response = self.client.post(
  426. self.reverse("v1:domain-list"), data, **kwargs
  427. )
  428. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  429. # Can't access account details
  430. response = self.client.get(self.reverse("v1:account"), **kwargs)
  431. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  432. def test_domain_owner_consistency(self):
  433. models.TokenDomainPolicy(
  434. token=self.token, domain=None, subname=None, type=None
  435. ).save()
  436. domain = self.my_domains[0]
  437. policy = models.TokenDomainPolicy(
  438. token=self.token, domain=domain, subname=None, type=None
  439. )
  440. policy.save()
  441. domain.owner = self.other_domains[0].owner
  442. with self.assertRaises(IntegrityError):
  443. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  444. domain.save()
  445. policy.delete()
  446. domain.save()
  447. def test_token_user_consistency(self):
  448. policy = models.TokenDomainPolicy(
  449. token=self.token, domain=None, subname=None, type=None
  450. )
  451. policy.save()
  452. self.token.user = self.other_domains[0].owner
  453. with self.assertRaises(IntegrityError):
  454. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  455. self.token.save()
  456. policy.delete()
  457. self.token.save()
  458. def test_domain_owner_equals_token_user(self):
  459. models.TokenDomainPolicy(
  460. token=self.token, domain=None, subname=None, type=None
  461. ).save()
  462. with self.assertRaises(IntegrityError):
  463. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  464. models.TokenDomainPolicy(
  465. token=self.token,
  466. domain=self.other_domains[0],
  467. subname=None,
  468. type=None,
  469. ).save()
  470. self.token.user = self.other_domain.owner
  471. with self.assertRaises(IntegrityError):
  472. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  473. self.token.save()
  474. def test_domain_deletion(self):
  475. domains = [None] + self.my_domains[:2]
  476. for domain in domains:
  477. models.TokenDomainPolicy(
  478. token=self.token, domain=domain, subname=None, type=None
  479. ).save()
  480. domain = domains.pop()
  481. domain.delete()
  482. self.assertEqual(
  483. set(policy.domain for policy in self.token.tokendomainpolicy_set.all()),
  484. set(domains),
  485. )
  486. def test_token_deletion(self):
  487. domains = [None] + self.my_domains[:2]
  488. policies = {}
  489. for domain in domains:
  490. policy = models.TokenDomainPolicy(
  491. token=self.token, domain=domain, subname=None, type=None
  492. )
  493. policies[domain] = policy
  494. policy.save()
  495. self.token.delete()
  496. for domain, policy in policies.items():
  497. self.assertFalse(
  498. models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
  499. )
  500. if domain:
  501. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  502. def test_user_deletion(self):
  503. domains = [None] + self.my_domains[:2]
  504. for domain in domains:
  505. models.TokenDomainPolicy(
  506. token=self.token, domain=domain, subname=None, type=None
  507. ).save()
  508. # User can only be deleted when domains are deleted
  509. for domain in self.my_domains:
  510. domain.delete()
  511. # Only the default policy should be left, so get can simply get() it
  512. policy_pk = self.token.tokendomainpolicy_set.get().pk
  513. self.token.user.delete()
  514. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())