test_token_domain_policy.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. from django.db import transaction
  2. from django.db.utils import IntegrityError
  3. from rest_framework import status
  4. from rest_framework.test import APIClient
  5. from desecapi import models
  6. from desecapi.tests.base import DomainOwnerTestCase
  7. class TokenDomainPolicyClient(APIClient):
  8. def _request(self, method, url, *, using, **kwargs):
  9. if using is not None:
  10. kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
  11. return method(url, **kwargs)
  12. def _request_policy(self, method, target, *, using, policy_id, **kwargs):
  13. url = DomainOwnerTestCase.reverse(
  14. "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
  15. )
  16. return self._request(method, url, using=using, **kwargs)
  17. def _request_policies(self, method, target, *, using, **kwargs):
  18. url = DomainOwnerTestCase.reverse(
  19. "v1:token_domain_policies-list", token_id=target.id
  20. )
  21. return self._request(method, url, using=using, **kwargs)
  22. def list_policies(self, target, *, using):
  23. return self._request_policies(self.get, target, using=using)
  24. def create_policy(self, target, *, using, **kwargs):
  25. return self._request_policies(self.post, target, using=using, **kwargs)
  26. def get_policy(self, target, *, using, policy_id):
  27. return self._request_policy(self.get, target, using=using, policy_id=policy_id)
  28. def patch_policy(self, target, *, using, policy_id, **kwargs):
  29. return self._request_policy(
  30. self.patch, target, using=using, policy_id=policy_id, **kwargs
  31. )
  32. def delete_policy(self, target, *, using, policy_id):
  33. return self._request_policy(
  34. self.delete, target, using=using, policy_id=policy_id
  35. )
  36. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  37. client_class = TokenDomainPolicyClient
  38. default_data = dict(perm_write=False)
  39. def setUp(self):
  40. super().setUp()
  41. self.client.credentials() # remove default credential (corresponding to domain owner)
  42. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  43. self.other_token = self.create_token(self.user)
  44. def test_get_policy(self):
  45. def get_policy(domain, subname, type):
  46. return self.token.get_policy(
  47. models.RRset(domain=domain, subname=subname, type=type)
  48. )
  49. def assertPolicy(policy, domain, subname, type):
  50. self.assertEqual(policy.domain, domain)
  51. self.assertEqual(policy.subname, subname)
  52. self.assertEqual(policy.type, type)
  53. qs = self.token.tokendomainpolicy_set
  54. # Default policy is fallback for everything
  55. qs.create(domain=None, subname=None, type=None)
  56. for kwargs in [
  57. dict(subname=subname, type=type_)
  58. for subname in (None, "www")
  59. for type_ in (None, "A")
  60. ]:
  61. policy = get_policy(self.my_domain, **kwargs)
  62. assertPolicy(policy, None, None, None)
  63. # Type wins over default
  64. qs.create(domain=None, subname=None, type="A")
  65. policy = get_policy(self.my_domain, "www", "A")
  66. assertPolicy(policy, None, None, "A")
  67. # Subname wins over type
  68. qs.create(domain=None, subname="www", type=None)
  69. policy = get_policy(self.my_domain, "www", "A")
  70. assertPolicy(policy, None, "www", None)
  71. # Most specific wins
  72. qs.create(domain=None, subname="www", type="A")
  73. policy = get_policy(self.my_domain, "www", "A")
  74. assertPolicy(policy, None, "www", "A")
  75. # Domain wins over default and over subname and type
  76. qs.create(domain=self.my_domain, subname=None, type=None)
  77. policy = get_policy(self.my_domain, None, None)
  78. assertPolicy(policy, self.my_domain, None, None)
  79. # Subname wins over default or domain default
  80. qs.create(domain=self.my_domain, subname="www", type=None)
  81. for kwargs in [
  82. dict(subname="www", type=None),
  83. dict(subname="www", type="A"),
  84. ]:
  85. policy = get_policy(self.my_domain, **kwargs)
  86. assertPolicy(policy, self.my_domain, "www", None)
  87. # Type wins over default or domain default
  88. qs.create(domain=self.my_domain, subname=None, type="A")
  89. for kwargs in [
  90. dict(subname=None, type="A"),
  91. dict(subname="www2", type="A"),
  92. ]:
  93. policy = get_policy(self.my_domain, **kwargs)
  94. assertPolicy(policy, self.my_domain, None, "A")
  95. # Subname wins over type
  96. policy = get_policy(self.my_domain, "www", "A")
  97. assertPolicy(policy, self.my_domain, "www", None)
  98. # Subname + type wins over less specific
  99. qs.create(domain=self.my_domain, subname="www", type="A")
  100. policy = get_policy(self.my_domain, "www", "A")
  101. assertPolicy(policy, self.my_domain, "www", "A")
  102. # Check that we did all combinations
  103. self.assertEqual(qs.count(), 2**3)
  104. def test_policy_lifecycle_without_management_permission(self):
  105. # Prepare (with management token)
  106. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  107. response = self.client.create_policy(
  108. self.token, using=self.token_manage, data=data
  109. )
  110. self.assertStatus(response, status.HTTP_201_CREATED)
  111. response = self.client.create_policy(
  112. self.token_manage, using=self.token_manage, data=data
  113. )
  114. self.assertStatus(response, status.HTTP_201_CREATED)
  115. # Self-inspection is fine
  116. ## List
  117. response = self.client.list_policies(self.token, using=self.token)
  118. self.assertStatus(response, status.HTTP_200_OK)
  119. self.assertEqual(len(response.data), 1)
  120. default_policy_id = response.data[0]["id"]
  121. ## Get
  122. response = self.client.get_policy(
  123. self.token, using=self.token, policy_id=default_policy_id
  124. )
  125. self.assertStatus(response, status.HTTP_200_OK)
  126. self.assertEqual(
  127. response.data, self.default_data | data | {"id": default_policy_id}
  128. )
  129. # Inspection of other tokens forbidden
  130. ## List
  131. response = self.client.list_policies(self.token_manage, using=self.token)
  132. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  133. ## Get
  134. response = self.client.get_policy(
  135. self.token_manage, using=self.token, policy_id=default_policy_id
  136. )
  137. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  138. # Write operations forbidden (self and other)
  139. for target in [self.token, self.token_manage]:
  140. # Create
  141. response = self.client.create_policy(target, using=self.token)
  142. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  143. # Change
  144. data = dict(perm_write=True)
  145. policy = target.get_policy()
  146. response = self.client.patch_policy(
  147. target, using=self.token, policy_id=policy.pk, data=data
  148. )
  149. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  150. # Delete
  151. response = self.client.delete_policy(
  152. target, using=self.token, policy_id=default_policy_id
  153. )
  154. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  155. def test_policy_lifecycle(self):
  156. # Can't do anything unauthorized
  157. response = self.client.list_policies(self.token, using=None)
  158. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  159. response = self.client.create_policy(self.token, using=None)
  160. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  161. # Create
  162. ## without required field
  163. response = self.client.create_policy(self.token, using=self.token_manage)
  164. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  165. for field in ["domain", "subname", "type"]:
  166. self.assertEqual(response.data[field], ["This field is required."])
  167. ## without a default policy
  168. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  169. with transaction.atomic():
  170. response = self.client.create_policy(
  171. self.token, using=self.token_manage, data=data
  172. )
  173. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  174. self.assertEqual(
  175. response.data["non_field_errors"],
  176. ["Policy precedence: The first policy must be the default policy."],
  177. )
  178. # List: still empty
  179. response = self.client.list_policies(self.token, using=self.token_manage)
  180. self.assertStatus(response, status.HTTP_200_OK)
  181. self.assertEqual(response.data, [])
  182. # Create
  183. ## default policy
  184. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  185. response = self.client.create_policy(
  186. self.token, using=self.token_manage, data=data
  187. )
  188. self.assertStatus(response, status.HTTP_201_CREATED)
  189. default_policy_id = response.data["id"]
  190. ## can't create another default policy
  191. with transaction.atomic():
  192. response = self.client.create_policy(
  193. self.token,
  194. using=self.token_manage,
  195. data={"domain": None, "subname": None, "type": None},
  196. )
  197. self.assertStatus(response, status.HTTP_409_CONFLICT)
  198. ## verify object creation
  199. response = self.client.get_policy(
  200. self.token, using=self.token_manage, policy_id=default_policy_id
  201. )
  202. self.assertStatus(response, status.HTTP_200_OK)
  203. self.assertEqual(
  204. response.data, self.default_data | data | {"id": default_policy_id}
  205. )
  206. ## can't create policy for other user's domain
  207. data = {
  208. "domain": self.other_domain.name,
  209. "subname": None,
  210. "type": None,
  211. "perm_dyndns": True,
  212. "perm_write": True,
  213. }
  214. response = self.client.create_policy(
  215. self.token, using=self.token_manage, data=data
  216. )
  217. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  218. self.assertEqual(response.data["domain"][0].code, "does_not_exist")
  219. ## another policy
  220. data = {
  221. "domain": self.my_domains[0].name,
  222. "subname": None,
  223. "type": None,
  224. }
  225. response = self.client.create_policy(
  226. self.token, using=self.token_manage, data=data
  227. )
  228. self.assertStatus(response, status.HTTP_201_CREATED)
  229. policy_id = response.data["id"]
  230. ## can't create policy for the same domain
  231. with transaction.atomic():
  232. response = self.client.create_policy(
  233. self.token, using=self.token_manage, data=data
  234. )
  235. self.assertStatus(response, status.HTTP_409_CONFLICT)
  236. ## verify object creation
  237. response = self.client.get_policy(
  238. self.token, using=self.token_manage, policy_id=policy_id
  239. )
  240. self.assertStatus(response, status.HTTP_200_OK)
  241. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  242. # List: now has two elements
  243. response = self.client.list_policies(self.token, using=self.token_manage)
  244. self.assertStatus(response, status.HTTP_200_OK)
  245. self.assertEqual(len(response.data), 2)
  246. # Change
  247. ## all fields of a policy
  248. data = dict(
  249. domain=self.my_domains[1].name,
  250. subname="_acme-challenge",
  251. type="TXT",
  252. perm_write=True,
  253. )
  254. response = self.client.patch_policy(
  255. self.token,
  256. using=self.token_manage,
  257. policy_id=policy_id,
  258. data=data,
  259. )
  260. self.assertStatus(response, status.HTTP_200_OK)
  261. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  262. ## verify modification
  263. response = self.client.get_policy(
  264. self.token, using=self.token_manage, policy_id=policy_id
  265. )
  266. self.assertStatus(response, status.HTTP_200_OK)
  267. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  268. ## verify that the default policy can't be changed to a non-default policy
  269. with transaction.atomic():
  270. response = self.client.patch_policy(
  271. self.token,
  272. using=self.token_manage,
  273. policy_id=default_policy_id,
  274. data=data,
  275. )
  276. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  277. self.assertEqual(
  278. response.data["non_field_errors"],
  279. ["When using policies, there must be exactly one default policy."],
  280. )
  281. ## partially modify the default policy
  282. data = dict()
  283. response = self.client.patch_policy(
  284. self.token, using=self.token_manage, policy_id=default_policy_id, data=data
  285. )
  286. self.assertStatus(response, status.HTTP_200_OK)
  287. self.assertEqual(
  288. response.data,
  289. {
  290. "id": default_policy_id,
  291. "domain": None,
  292. "subname": None,
  293. "type": None,
  294. "perm_write": True,
  295. }
  296. | data,
  297. )
  298. # Delete
  299. ## can't delete default policy while others exist
  300. with transaction.atomic():
  301. response = self.client.delete_policy(
  302. self.token, using=self.token_manage, policy_id=default_policy_id
  303. )
  304. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  305. self.assertEqual(
  306. response.data["non_field_errors"],
  307. ["Policy precedence: Can't delete default policy when there exist others."],
  308. )
  309. ## delete other policy
  310. response = self.client.delete_policy(
  311. self.token, using=self.token_manage, policy_id=policy_id
  312. )
  313. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  314. ## delete default policy
  315. response = self.client.delete_policy(
  316. self.token, using=self.token_manage, policy_id=default_policy_id
  317. )
  318. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  319. ## idempotence: delete a non-existing policy
  320. response = self.client.delete_policy(
  321. self.token, using=self.token_manage, policy_id=policy_id
  322. )
  323. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  324. ## verify that policies are gone
  325. for pid in [policy_id, default_policy_id]:
  326. response = self.client.get_policy(
  327. self.token, using=self.token_manage, policy_id=pid
  328. )
  329. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  330. # List: empty again
  331. response = self.client.list_policies(self.token, using=self.token_manage)
  332. self.assertStatus(response, status.HTTP_200_OK)
  333. self.assertEqual(response.data, [])
  334. def test_policy_permissions(self):
  335. def _reset_policies(token):
  336. for policy in token.tokendomainpolicy_set.all():
  337. for perm in self.default_data.keys():
  338. setattr(policy, perm, False)
  339. policy.save()
  340. # Create
  341. ## default policy
  342. data = {"domain": None, "subname": None, "type": None}
  343. response = self.client.create_policy(
  344. self.token, using=self.token_manage, data=data
  345. )
  346. self.assertStatus(response, status.HTTP_201_CREATED)
  347. default_policy_id = response.data["id"]
  348. ## another policy
  349. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  350. response = self.client.create_policy(
  351. self.token, using=self.token_manage, data=data
  352. )
  353. self.assertStatus(response, status.HTTP_201_CREATED)
  354. policy_id = response.data["id"]
  355. ## verify object creation
  356. response = self.client.get_policy(
  357. self.token, using=self.token_manage, policy_id=policy_id
  358. )
  359. self.assertStatus(response, status.HTTP_200_OK)
  360. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  361. policy_id_by_domain = {
  362. self.my_domains[0]: policy_id,
  363. self.my_domains[1]: default_policy_id,
  364. }
  365. kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
  366. # For each permission type
  367. for perm in self.default_data.keys():
  368. # For the domain with specific policy and for the domain covered by the default policy
  369. for domain in policy_id_by_domain.keys():
  370. # For both possible values of the permission
  371. for value in [True, False]:
  372. # Set only that permission for that domain (on its effective policy)
  373. _reset_policies(self.token)
  374. policy = self.token.tokendomainpolicy_set.get(
  375. pk=policy_id_by_domain[domain]
  376. )
  377. setattr(policy, perm, value)
  378. policy.save()
  379. # Can't create domain
  380. data = {"name": self.random_domain_name()}
  381. response = self.client.post(
  382. self.reverse("v1:domain-list"), data, **kwargs
  383. )
  384. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  385. # Can't delete domain
  386. response = self.client.delete(
  387. self.reverse("v1:domain-detail", name=domain), {}, **kwargs
  388. )
  389. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  390. # Can't access account details
  391. response = self.client.get(self.reverse("v1:account"), **kwargs)
  392. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  393. def test_dyndns_permission(self):
  394. def _perform_request(**kwargs):
  395. return self.client.get(
  396. self.reverse("v1:dyndns12update"),
  397. {
  398. "username": self.my_domains[1].name,
  399. "password": self.token.plain,
  400. **kwargs,
  401. },
  402. )
  403. def assert_allowed(**kwargs):
  404. response = _perform_request(**kwargs)
  405. self.assertStatus(response, status.HTTP_200_OK)
  406. self.assertEqual(response.data, "good")
  407. def assert_forbidden(**kwargs):
  408. response = _perform_request(**kwargs)
  409. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  410. self.assertEqual(response.data["detail"], "Insufficient token permissions.")
  411. # No policy
  412. assert_allowed(
  413. myipv4=""
  414. ) # empty IPv4 and delete IPv6 (no-op, prevents pdns request)
  415. # Default policy (deny)
  416. qs = self.token.tokendomainpolicy_set
  417. qs.create(domain=None, subname=None, type=None)
  418. assert_forbidden(myipv4="")
  419. assert_allowed(
  420. myipv4="preserve", myipv6="preserve"
  421. ) # no-op needs no permissions
  422. # Only A permission
  423. qs.create(domain=self.my_domains[1], subname=None, type="A", perm_write=True)
  424. assert_forbidden(myipv4="")
  425. assert_allowed(myipv4="", myipv6="preserve") # just IPv4
  426. # Only A permission
  427. qs.create(domain=self.my_domains[1], subname=None, type="AAAA")
  428. assert_forbidden(myipv4="")
  429. assert_allowed(myipv4="", myipv6="preserve") # just IPv4
  430. # A + AAAA permission
  431. qs.filter(domain=self.my_domains[1], type="AAAA").update(perm_write=True)
  432. assert_allowed(myipv4="") # empty IPv4 and delete IPv6
  433. # Only AAAA permission
  434. qs.filter(domain=self.my_domains[1], type="A").update(perm_write=False)
  435. assert_forbidden(myipv4="")
  436. assert_allowed(myipv4="preserve", myipv6="") # just IPv6
  437. # Update default policy to allow, but A deny policy overrides
  438. qs.filter(domain__isnull=True).update(perm_write=True)
  439. assert_forbidden(myipv4="")
  440. assert_allowed(myipv4="preserve", myipv6="") # just IPv6
  441. # AAAA (allow) and A (allow via default policy fallback)
  442. qs.filter(domain=self.my_domains[1], type="A").delete()
  443. assert_allowed(myipv4="", myipv6="")
  444. # Default policy (allow)
  445. qs.filter(domain=self.my_domains[1]).delete()
  446. assert_allowed(myipv4="", myipv6="")
  447. # No policy
  448. qs.filter().delete()
  449. assert_allowed(myipv4="", myipv6="")
  450. def test_domain_owner_consistency(self):
  451. models.TokenDomainPolicy(
  452. token=self.token, domain=None, subname=None, type=None
  453. ).save()
  454. domain = self.my_domains[0]
  455. policy = models.TokenDomainPolicy(
  456. token=self.token, domain=domain, subname=None, type=None
  457. )
  458. policy.save()
  459. domain.owner = self.other_domains[0].owner
  460. with self.assertRaises(IntegrityError):
  461. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  462. domain.save()
  463. policy.delete()
  464. domain.save()
  465. def test_token_user_consistency(self):
  466. policy = models.TokenDomainPolicy(
  467. token=self.token, domain=None, subname=None, type=None
  468. )
  469. policy.save()
  470. self.token.user = self.other_domains[0].owner
  471. with self.assertRaises(IntegrityError):
  472. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  473. self.token.save()
  474. policy.delete()
  475. self.token.save()
  476. def test_domain_owner_equals_token_user(self):
  477. models.TokenDomainPolicy(
  478. token=self.token, domain=None, subname=None, type=None
  479. ).save()
  480. with self.assertRaises(IntegrityError):
  481. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  482. models.TokenDomainPolicy(
  483. token=self.token,
  484. domain=self.other_domains[0],
  485. subname=None,
  486. type=None,
  487. ).save()
  488. self.token.user = self.other_domain.owner
  489. with self.assertRaises(IntegrityError):
  490. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  491. self.token.save()
  492. def test_domain_deletion(self):
  493. domains = [None] + self.my_domains[:2]
  494. for domain in domains:
  495. models.TokenDomainPolicy(
  496. token=self.token, domain=domain, subname=None, type=None
  497. ).save()
  498. domain = domains.pop()
  499. domain.delete()
  500. self.assertEqual(
  501. set(policy.domain for policy in self.token.tokendomainpolicy_set.all()),
  502. set(domains),
  503. )
  504. def test_token_deletion(self):
  505. domains = [None] + self.my_domains[:2]
  506. policies = {}
  507. for domain in domains:
  508. policy = models.TokenDomainPolicy(
  509. token=self.token, domain=domain, subname=None, type=None
  510. )
  511. policies[domain] = policy
  512. policy.save()
  513. self.token.delete()
  514. for domain, policy in policies.items():
  515. self.assertFalse(
  516. models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
  517. )
  518. if domain:
  519. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  520. def test_user_deletion(self):
  521. domains = [None] + self.my_domains[:2]
  522. for domain in domains:
  523. models.TokenDomainPolicy(
  524. token=self.token, domain=domain, subname=None, type=None
  525. ).save()
  526. # User can only be deleted when domains are deleted
  527. for domain in self.my_domains:
  528. domain.delete()
  529. # Only the default policy should be left, so get can simply get() it
  530. policy_pk = self.token.tokendomainpolicy_set.get().pk
  531. self.token.user.delete()
  532. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())