test_token_domain_policy.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. from django.db import transaction
  2. from django.db.utils import IntegrityError
  3. from rest_framework import status
  4. from rest_framework.test import APIClient
  5. from desecapi import models
  6. from desecapi.tests.base import DomainOwnerTestCase
  7. class TokenDomainPolicyClient(APIClient):
  8. def _request(self, method, url, *, using, **kwargs):
  9. if using is not None:
  10. kwargs.update(HTTP_AUTHORIZATION=f"Token {using.plain}")
  11. return method(url, **kwargs)
  12. def _request_policy(self, method, target, *, using, policy_id, **kwargs):
  13. url = DomainOwnerTestCase.reverse(
  14. "v1:token_domain_policies-detail", token_id=target.id, pk=policy_id
  15. )
  16. return self._request(method, url, using=using, **kwargs)
  17. def _request_policies(self, method, target, *, using, **kwargs):
  18. url = DomainOwnerTestCase.reverse(
  19. "v1:token_domain_policies-list", token_id=target.id
  20. )
  21. return self._request(method, url, using=using, **kwargs)
  22. def list_policies(self, target, *, using):
  23. return self._request_policies(self.get, target, using=using)
  24. def create_policy(self, target, *, using, **kwargs):
  25. return self._request_policies(self.post, target, using=using, **kwargs)
  26. def get_policy(self, target, *, using, policy_id):
  27. return self._request_policy(self.get, target, using=using, policy_id=policy_id)
  28. def patch_policy(self, target, *, using, policy_id, **kwargs):
  29. return self._request_policy(
  30. self.patch, target, using=using, policy_id=policy_id, **kwargs
  31. )
  32. def delete_policy(self, target, *, using, policy_id):
  33. return self._request_policy(
  34. self.delete, target, using=using, policy_id=policy_id
  35. )
  36. class TokenDomainPolicyTestCase(DomainOwnerTestCase):
  37. client_class = TokenDomainPolicyClient
  38. default_data = dict(perm_write=False)
  39. def setUp(self):
  40. super().setUp()
  41. self.client.credentials() # remove default credential (corresponding to domain owner)
  42. self.token_manage = self.create_token(self.owner, perm_manage_tokens=True)
  43. self.other_token = self.create_token(self.user)
  44. def test_get_policy(self):
  45. def get_policy(domain, subname, type):
  46. return self.token.get_policy(
  47. models.RRset(domain=domain, subname=subname, type=type)
  48. )
  49. def assertPolicy(policy, domain, subname, type):
  50. self.assertEqual(policy.domain, domain)
  51. self.assertEqual(policy.subname, subname)
  52. self.assertEqual(policy.type, type)
  53. qs = self.token.tokendomainpolicy_set
  54. # Default policy is fallback for everything
  55. qs.create(domain=None, subname=None, type=None)
  56. for kwargs in [
  57. dict(subname=subname, type=type_)
  58. for subname in (None, "www")
  59. for type_ in (None, "A")
  60. ]:
  61. policy = get_policy(self.my_domain, **kwargs)
  62. assertPolicy(policy, None, None, None)
  63. # Type wins over default
  64. qs.create(domain=None, subname=None, type="A")
  65. policy = get_policy(self.my_domain, "www", "A")
  66. assertPolicy(policy, None, None, "A")
  67. # Subname wins over type
  68. qs.create(domain=None, subname="www", type=None)
  69. policy = get_policy(self.my_domain, "www", "A")
  70. assertPolicy(policy, None, "www", None)
  71. # Most specific wins
  72. qs.create(domain=None, subname="www", type="A")
  73. policy = get_policy(self.my_domain, "www", "A")
  74. assertPolicy(policy, None, "www", "A")
  75. # Domain wins over default and over subname and type
  76. qs.create(domain=self.my_domain, subname=None, type=None)
  77. policy = get_policy(self.my_domain, None, None)
  78. assertPolicy(policy, self.my_domain, None, None)
  79. # Subname wins over default or domain default
  80. qs.create(domain=self.my_domain, subname="www", type=None)
  81. for kwargs in [
  82. dict(subname="www", type=None),
  83. dict(subname="www", type="A"),
  84. ]:
  85. policy = get_policy(self.my_domain, **kwargs)
  86. assertPolicy(policy, self.my_domain, "www", None)
  87. # Type wins over default or domain default
  88. qs.create(domain=self.my_domain, subname=None, type="A")
  89. for kwargs in [
  90. dict(subname=None, type="A"),
  91. dict(subname="www2", type="A"),
  92. ]:
  93. policy = get_policy(self.my_domain, **kwargs)
  94. assertPolicy(policy, self.my_domain, None, "A")
  95. # Subname wins over type
  96. policy = get_policy(self.my_domain, "www", "A")
  97. assertPolicy(policy, self.my_domain, "www", None)
  98. # Subname + type wins over less specific
  99. qs.create(domain=self.my_domain, subname="www", type="A")
  100. policy = get_policy(self.my_domain, "www", "A")
  101. assertPolicy(policy, self.my_domain, "www", "A")
  102. # Check that we did all combinations
  103. self.assertEqual(qs.count(), 2**3)
  104. def test_policy_lifecycle_without_management_permission(self):
  105. # Prepare (with management token)
  106. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  107. response = self.client.create_policy(
  108. self.token, using=self.token_manage, data=data
  109. )
  110. self.assertStatus(response, status.HTTP_201_CREATED)
  111. response = self.client.create_policy(
  112. self.token_manage, using=self.token_manage, data=data
  113. )
  114. self.assertStatus(response, status.HTTP_201_CREATED)
  115. # Self-inspection is fine
  116. ## List
  117. response = self.client.list_policies(self.token, using=self.token)
  118. self.assertStatus(response, status.HTTP_200_OK)
  119. self.assertEqual(len(response.data), 1)
  120. default_policy_id = response.data[0]["id"]
  121. ## Get
  122. response = self.client.get_policy(
  123. self.token, using=self.token, policy_id=default_policy_id
  124. )
  125. self.assertStatus(response, status.HTTP_200_OK)
  126. self.assertEqual(
  127. response.data, self.default_data | data | {"id": default_policy_id}
  128. )
  129. # Inspection of other tokens forbidden
  130. ## List
  131. response = self.client.list_policies(self.token_manage, using=self.token)
  132. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  133. ## Get
  134. response = self.client.get_policy(
  135. self.token_manage, using=self.token, policy_id=default_policy_id
  136. )
  137. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  138. # Write operations forbidden (self and other)
  139. for target in [self.token, self.token_manage]:
  140. # Create
  141. response = self.client.create_policy(target, using=self.token)
  142. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  143. # Change
  144. data = dict(perm_write=True)
  145. policy = target.get_policy()
  146. response = self.client.patch_policy(
  147. target, using=self.token, policy_id=policy.pk, data=data
  148. )
  149. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  150. # Delete
  151. response = self.client.delete_policy(
  152. target, using=self.token, policy_id=default_policy_id
  153. )
  154. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  155. def test_policy_lifecycle(self):
  156. # Can't do anything unauthorized
  157. response = self.client.list_policies(self.token, using=None)
  158. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  159. response = self.client.create_policy(self.token, using=None)
  160. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  161. # Create
  162. ## without required field
  163. response = self.client.create_policy(self.token, using=self.token_manage)
  164. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  165. for field in ["domain", "subname", "type"]:
  166. self.assertEqual(response.data[field], ["This field is required."])
  167. ## without a default policy
  168. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  169. with transaction.atomic():
  170. response = self.client.create_policy(
  171. self.token, using=self.token_manage, data=data
  172. )
  173. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  174. self.assertEqual(
  175. response.data["non_field_errors"],
  176. ["Policy precedence: The first policy must be the default policy."],
  177. )
  178. # List: still empty
  179. response = self.client.list_policies(self.token, using=self.token_manage)
  180. self.assertStatus(response, status.HTTP_200_OK)
  181. self.assertEqual(response.data, [])
  182. # Other token gives 404
  183. other_token = self.create_token(user=self.create_user())
  184. response = self.client.list_policies(other_token, using=self.token_manage)
  185. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  186. # Create
  187. ## default policy
  188. data = {"domain": None, "subname": None, "type": None, "perm_write": True}
  189. # Other token gives 404
  190. response = self.client.create_policy(
  191. models.Token(), using=self.token_manage, data=data
  192. )
  193. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  194. # Existing token works
  195. response = self.client.create_policy(
  196. self.token, using=self.token_manage, data=data
  197. )
  198. self.assertStatus(response, status.HTTP_201_CREATED)
  199. default_policy_id = response.data["id"]
  200. ## can't create another default policy
  201. with transaction.atomic():
  202. response = self.client.create_policy(
  203. self.token,
  204. using=self.token_manage,
  205. data={"domain": None, "subname": None, "type": None},
  206. )
  207. self.assertStatus(response, status.HTTP_409_CONFLICT)
  208. ## verify object creation
  209. response = self.client.get_policy(
  210. self.token, using=self.token_manage, policy_id=default_policy_id
  211. )
  212. self.assertStatus(response, status.HTTP_200_OK)
  213. self.assertEqual(
  214. response.data, self.default_data | data | {"id": default_policy_id}
  215. )
  216. ## Non-existing policy gives 404
  217. response = self.client.get_policy(
  218. self.token, using=self.token_manage, policy_id=other_token.pk
  219. )
  220. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  221. ## can't create policy for other user's domain
  222. data = {
  223. "domain": self.other_domain.name,
  224. "subname": None,
  225. "type": None,
  226. "perm_dyndns": True,
  227. "perm_write": True,
  228. }
  229. response = self.client.create_policy(
  230. self.token, using=self.token_manage, data=data
  231. )
  232. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  233. self.assertEqual(response.data["domain"][0].code, "does_not_exist")
  234. ## another policy
  235. data = {
  236. "domain": self.my_domains[0].name,
  237. "subname": None,
  238. "type": None,
  239. }
  240. response = self.client.create_policy(
  241. self.token, using=self.token_manage, data=data
  242. )
  243. self.assertStatus(response, status.HTTP_201_CREATED)
  244. policy_id = response.data["id"]
  245. ## can't create policy for the same domain
  246. with transaction.atomic():
  247. response = self.client.create_policy(
  248. self.token, using=self.token_manage, data=data
  249. )
  250. self.assertStatus(response, status.HTTP_409_CONFLICT)
  251. ## verify object creation
  252. response = self.client.get_policy(
  253. self.token, using=self.token_manage, policy_id=policy_id
  254. )
  255. self.assertStatus(response, status.HTTP_200_OK)
  256. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  257. # List: now has two elements
  258. response = self.client.list_policies(self.token, using=self.token_manage)
  259. self.assertStatus(response, status.HTTP_200_OK)
  260. self.assertEqual(len(response.data), 2)
  261. # Change
  262. ## all fields of a policy
  263. data = dict(
  264. domain=self.my_domains[1].name,
  265. subname="_acme-challenge",
  266. type="TXT",
  267. perm_write=True,
  268. )
  269. response = self.client.patch_policy(
  270. self.token,
  271. using=self.token_manage,
  272. policy_id=policy_id,
  273. data=data,
  274. )
  275. self.assertStatus(response, status.HTTP_200_OK)
  276. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  277. ## verify modification
  278. response = self.client.get_policy(
  279. self.token, using=self.token_manage, policy_id=policy_id
  280. )
  281. self.assertStatus(response, status.HTTP_200_OK)
  282. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  283. ## verify that the default policy can't be changed to a non-default policy
  284. with transaction.atomic():
  285. response = self.client.patch_policy(
  286. self.token,
  287. using=self.token_manage,
  288. policy_id=default_policy_id,
  289. data=data,
  290. )
  291. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  292. self.assertEqual(
  293. response.data["non_field_errors"],
  294. ["When using policies, there must be exactly one default policy."],
  295. )
  296. ## partially modify the default policy
  297. data = dict()
  298. response = self.client.patch_policy(
  299. self.token, using=self.token_manage, policy_id=default_policy_id, data=data
  300. )
  301. self.assertStatus(response, status.HTTP_200_OK)
  302. self.assertEqual(
  303. response.data,
  304. {
  305. "id": default_policy_id,
  306. "domain": None,
  307. "subname": None,
  308. "type": None,
  309. "perm_write": True,
  310. }
  311. | data,
  312. )
  313. # Delete
  314. ## can't delete default policy while others exist
  315. with transaction.atomic():
  316. response = self.client.delete_policy(
  317. self.token, using=self.token_manage, policy_id=default_policy_id
  318. )
  319. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  320. self.assertEqual(
  321. response.data["non_field_errors"],
  322. ["Policy precedence: Can't delete default policy when there exist others."],
  323. )
  324. ## delete other policy
  325. response = self.client.delete_policy(
  326. self.token, using=self.token_manage, policy_id=policy_id
  327. )
  328. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  329. ## delete default policy
  330. response = self.client.delete_policy(
  331. self.token, using=self.token_manage, policy_id=default_policy_id
  332. )
  333. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  334. ## idempotence: delete a non-existing policy
  335. response = self.client.delete_policy(
  336. self.token, using=self.token_manage, policy_id=policy_id
  337. )
  338. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  339. ## verify that policies are gone
  340. for pid in [policy_id, default_policy_id]:
  341. response = self.client.get_policy(
  342. self.token, using=self.token_manage, policy_id=pid
  343. )
  344. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  345. # List: empty again
  346. response = self.client.list_policies(self.token, using=self.token_manage)
  347. self.assertStatus(response, status.HTTP_200_OK)
  348. self.assertEqual(response.data, [])
  349. def test_policy_permissions(self):
  350. def _reset_policies(token):
  351. for policy in token.tokendomainpolicy_set.all():
  352. for perm in self.default_data.keys():
  353. setattr(policy, perm, False)
  354. policy.save()
  355. # Create
  356. ## default policy
  357. data = {"domain": None, "subname": None, "type": None}
  358. response = self.client.create_policy(
  359. self.token, using=self.token_manage, data=data
  360. )
  361. self.assertStatus(response, status.HTTP_201_CREATED)
  362. default_policy_id = response.data["id"]
  363. ## another policy
  364. data = {"domain": self.my_domains[0].name, "subname": None, "type": None}
  365. response = self.client.create_policy(
  366. self.token, using=self.token_manage, data=data
  367. )
  368. self.assertStatus(response, status.HTTP_201_CREATED)
  369. policy_id = response.data["id"]
  370. ## verify object creation
  371. response = self.client.get_policy(
  372. self.token, using=self.token_manage, policy_id=policy_id
  373. )
  374. self.assertStatus(response, status.HTTP_200_OK)
  375. self.assertEqual(response.data, self.default_data | data | {"id": policy_id})
  376. policy_id_by_domain = {
  377. self.my_domains[0]: policy_id,
  378. self.my_domains[1]: default_policy_id,
  379. }
  380. kwargs = dict(HTTP_AUTHORIZATION=f"Token {self.token.plain}")
  381. # For each permission type
  382. for perm in self.default_data.keys():
  383. # For the domain with specific policy and for the domain covered by the default policy
  384. for domain in policy_id_by_domain.keys():
  385. # For both possible values of the permission
  386. for value in [True, False]:
  387. # Set only that permission for that domain (on its effective policy)
  388. _reset_policies(self.token)
  389. policy = self.token.tokendomainpolicy_set.get(
  390. pk=policy_id_by_domain[domain]
  391. )
  392. setattr(policy, perm, value)
  393. policy.save()
  394. # Can't create domain
  395. data = {"name": self.random_domain_name()}
  396. response = self.client.post(
  397. self.reverse("v1:domain-list"), data, **kwargs
  398. )
  399. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  400. # Can't delete domain
  401. response = self.client.delete(
  402. self.reverse("v1:domain-detail", name=domain), {}, **kwargs
  403. )
  404. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  405. # Can't access account details
  406. response = self.client.get(self.reverse("v1:account"), **kwargs)
  407. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  408. def test_dyndns_permission(self):
  409. def _perform_request(**kwargs):
  410. return self.client.get(
  411. self.reverse("v1:dyndns12update"),
  412. {
  413. "username": self.my_domains[1].name,
  414. "password": self.token.plain,
  415. **kwargs,
  416. },
  417. )
  418. def assert_allowed(**kwargs):
  419. response = _perform_request(**kwargs)
  420. self.assertStatus(response, status.HTTP_200_OK)
  421. self.assertEqual(response.data, "good")
  422. def assert_forbidden(**kwargs):
  423. response = _perform_request(**kwargs)
  424. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  425. self.assertEqual(response.data["detail"], "Insufficient token permissions.")
  426. # No policy
  427. assert_allowed(
  428. myipv4=""
  429. ) # empty IPv4 and delete IPv6 (no-op, prevents pdns request)
  430. # Default policy (deny)
  431. qs = self.token.tokendomainpolicy_set
  432. qs.create(domain=None, subname=None, type=None)
  433. assert_forbidden(myipv4="")
  434. assert_allowed(
  435. myipv4="preserve", myipv6="preserve"
  436. ) # no-op needs no permissions
  437. # Only A permission
  438. qs.create(domain=self.my_domains[1], subname=None, type="A", perm_write=True)
  439. assert_forbidden(myipv4="")
  440. assert_allowed(myipv4="", myipv6="preserve") # just IPv4
  441. # Only A permission
  442. qs.create(domain=self.my_domains[1], subname=None, type="AAAA")
  443. assert_forbidden(myipv4="")
  444. assert_allowed(myipv4="", myipv6="preserve") # just IPv4
  445. # A + AAAA permission
  446. qs.filter(domain=self.my_domains[1], type="AAAA").update(perm_write=True)
  447. assert_allowed(myipv4="") # empty IPv4 and delete IPv6
  448. # Only AAAA permission
  449. qs.filter(domain=self.my_domains[1], type="A").update(perm_write=False)
  450. assert_forbidden(myipv4="")
  451. assert_allowed(myipv4="preserve", myipv6="") # just IPv6
  452. # Update default policy to allow, but A deny policy overrides
  453. qs.filter(domain__isnull=True).update(perm_write=True)
  454. assert_forbidden(myipv4="")
  455. assert_allowed(myipv4="preserve", myipv6="") # just IPv6
  456. # AAAA (allow) and A (allow via default policy fallback)
  457. qs.filter(domain=self.my_domains[1], type="A").delete()
  458. assert_allowed(myipv4="", myipv6="")
  459. # Default policy (allow)
  460. qs.filter(domain=self.my_domains[1]).delete()
  461. assert_allowed(myipv4="", myipv6="")
  462. # No policy
  463. qs.filter().delete()
  464. assert_allowed(myipv4="", myipv6="")
  465. def test_domain_owner_consistency(self):
  466. models.TokenDomainPolicy(
  467. token=self.token, domain=None, subname=None, type=None
  468. ).save()
  469. domain = self.my_domains[0]
  470. policy = models.TokenDomainPolicy(
  471. token=self.token, domain=domain, subname=None, type=None
  472. )
  473. policy.save()
  474. domain.owner = self.other_domains[0].owner
  475. with self.assertRaises(IntegrityError):
  476. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  477. domain.save()
  478. policy.delete()
  479. domain.save()
  480. def test_token_user_consistency(self):
  481. policy = models.TokenDomainPolicy(
  482. token=self.token, domain=None, subname=None, type=None
  483. )
  484. policy.save()
  485. self.token.user = self.other_domains[0].owner
  486. with self.assertRaises(IntegrityError):
  487. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  488. self.token.save()
  489. policy.delete()
  490. self.token.save()
  491. def test_domain_owner_equals_token_user(self):
  492. models.TokenDomainPolicy(
  493. token=self.token, domain=None, subname=None, type=None
  494. ).save()
  495. with self.assertRaises(IntegrityError):
  496. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  497. models.TokenDomainPolicy(
  498. token=self.token,
  499. domain=self.other_domains[0],
  500. subname=None,
  501. type=None,
  502. ).save()
  503. self.token.user = self.other_domain.owner
  504. with self.assertRaises(IntegrityError):
  505. with transaction.atomic(): # https://stackoverflow.com/a/23326971/6867099
  506. self.token.save()
  507. def test_domain_deletion(self):
  508. domains = [None] + self.my_domains[:2]
  509. for domain in domains:
  510. models.TokenDomainPolicy(
  511. token=self.token, domain=domain, subname=None, type=None
  512. ).save()
  513. domain = domains.pop()
  514. domain.delete()
  515. self.assertEqual(
  516. set(policy.domain for policy in self.token.tokendomainpolicy_set.all()),
  517. set(domains),
  518. )
  519. def test_token_deletion(self):
  520. domains = [None] + self.my_domains[:2]
  521. policies = {}
  522. for domain in domains:
  523. policy = models.TokenDomainPolicy(
  524. token=self.token, domain=domain, subname=None, type=None
  525. )
  526. policies[domain] = policy
  527. policy.save()
  528. self.token.delete()
  529. for domain, policy in policies.items():
  530. self.assertFalse(
  531. models.TokenDomainPolicy.objects.filter(pk=policy.pk).exists()
  532. )
  533. if domain:
  534. self.assertTrue(models.Domain.objects.filter(pk=domain.pk).exists())
  535. def test_user_deletion(self):
  536. domains = [None] + self.my_domains[:2]
  537. for domain in domains:
  538. models.TokenDomainPolicy(
  539. token=self.token, domain=domain, subname=None, type=None
  540. ).save()
  541. # User can only be deleted when domains are deleted
  542. for domain in self.my_domains:
  543. domain.delete()
  544. # Only the default policy should be left, so get can simply get() it
  545. policy_pk = self.token.tokendomainpolicy_set.get().pk
  546. self.token.user.delete()
  547. self.assertFalse(models.TokenDomainPolicy.objects.filter(pk=policy_pk).exists())