test_domains.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import re
  2. from django.conf import settings
  3. from django.core import mail
  4. from django.core.exceptions import ValidationError
  5. from psl_dns.exceptions import UnsupportedRule
  6. from rest_framework import status
  7. from desecapi.models import Domain, Token
  8. from desecapi.pdns_change_tracker import PDNSChangeTracker
  9. from desecapi.tests.base import DesecTestCase, DomainOwnerTestCase, PublicSuffixMockMixin
  10. class IsRegistrableTestCase(DesecTestCase, PublicSuffixMockMixin):
  11. """ Tests which domains can be registered by whom, depending on domain ownership and public suffix
  12. configuration. Note that we use "global public suffix" to refer to public suffixes which appear on the
  13. Internet-wide Public Suffix List (accessible, e.g., via psl_dns), and "local public suffix" to public
  14. suffixes which are configured in the local Django settings.LOCAL_PUBLIC_SUFFIXES. Consequently, a
  15. public suffix can be just local, just global, or both. """
  16. def mock(self, global_public_suffixes, local_public_suffixes):
  17. self.setUpMockPatch()
  18. test_case = self
  19. class _MockSuffixLists:
  20. settings_mocker = None
  21. psl_mocker = None
  22. def __enter__(self):
  23. self.settings_mocker = test_case.settings(LOCAL_PUBLIC_SUFFIXES=local_public_suffixes)
  24. self.settings_mocker.__enter__()
  25. self.psl_mocker = test_case.get_psl_context_manager(global_public_suffixes)
  26. self.psl_mocker.__enter__()
  27. def __exit__(self, exc_type, exc_val, exc_tb):
  28. if exc_type or exc_val or exc_tb:
  29. raise exc_val
  30. self.settings_mocker.__exit__(None, None, None)
  31. self.psl_mocker.__exit__(None, None, None)
  32. return _MockSuffixLists()
  33. def assertRegistrable(self, domain_name, user=None):
  34. """ Raises if the given user (fresh if None) cannot register the given domain name. """
  35. self.assertTrue(Domain.is_registrable(domain_name, user or self.create_user()),
  36. f'{domain_name} was expected to be registrable for {user or "a new user"}, but wasn\'t.')
  37. def assertNotRegistrable(self, domain_name, user=None):
  38. """ Raises if the given user (fresh if None) can register the given domain name. """
  39. self.assertFalse(Domain.is_registrable(domain_name, user or self.create_user()),
  40. f'{domain_name} was expected to be not registrable for {user or "a new user"}, but was.')
  41. def test_cant_register_global_non_local_public_suffix(self):
  42. with self.mock(
  43. global_public_suffixes=['com', 'de', 'xxx', 'com.uk'],
  44. local_public_suffixes=['something.else'],
  45. ):
  46. self.assertNotRegistrable('xxx')
  47. self.assertNotRegistrable('com.uk')
  48. self.assertRegistrable('something.else')
  49. def test_can_register_local_public_suffix(self):
  50. with self.mock(
  51. global_public_suffixes=['com', 'de', 'xxx', 'com.uk'],
  52. local_public_suffixes=['something.else', 'our.public.suffix', 'com', 'com.uk'],
  53. ):
  54. self.assertRegistrable('something.else')
  55. self.assertRegistrable('out.public.suffix')
  56. self.assertRegistrable('com')
  57. self.assertRegistrable('com.uk')
  58. self.assertRegistrable('foo.bar.com')
  59. def test_cant_register_descendants_of_children_of_public_suffixes(self):
  60. with self.mock(
  61. global_public_suffixes={'public.suffix'},
  62. local_public_suffixes={'public.suffix'},
  63. ):
  64. # let A own a.public.suffix
  65. user_a = self.create_user()
  66. self.assertRegistrable('a.public.suffix', user_a)
  67. self.create_domain(owner=user_a, name='a.public.suffix')
  68. # user B shall not register b.a.public.suffix, but A may
  69. user_b = self.create_user()
  70. self.assertNotRegistrable('b.a.public.suffix', user_b)
  71. self.assertRegistrable('b.a.public.suffix', user_a)
  72. def test_can_register_public_suffixes_under_private_domains(self):
  73. with self.mock(
  74. global_public_suffixes={'public.suffix'},
  75. local_public_suffixes={'another.public.suffix.private.public.suffix', 'public.suffix'},
  76. ):
  77. # let A own public.suffix
  78. user_a = self.create_user()
  79. self.assertRegistrable('public.suffix', user_a)
  80. self.create_domain(owner=user_a, name='public.suffix')
  81. # user B may register private.public.suffix
  82. user_b = self.create_user()
  83. self.assertRegistrable('private.public.suffix', user_b)
  84. self.create_domain(owner=user_b, name='private.public.suffix')
  85. # user C may register b.another.public.suffix.private.public.suffix,
  86. # or long.silly.prefix.another.public.suffix.private.public.suffix,
  87. # but not b.private.public.suffix.
  88. user_c = self.create_user()
  89. self.assertRegistrable('b.another.public.suffix.private.public.suffix', user_c)
  90. self.assertRegistrable('long.silly.prefix.another.public.suffix.private.public.suffix', user_c)
  91. self.assertNotRegistrable('b.private.public.suffix', user_c)
  92. self.assertRegistrable('b.private.public.suffix', user_b)
  93. class UnauthenticatedDomainTests(DesecTestCase):
  94. def test_unauthorized_access(self):
  95. for url in [
  96. self.reverse('v1:domain-list'),
  97. self.reverse('v1:domain-detail', name='example.com.')
  98. ]:
  99. for method in [self.client.put, self.client.delete]:
  100. self.assertStatus(method(url), status.HTTP_401_UNAUTHORIZED)
  101. class DomainOwnerTestCase1(DomainOwnerTestCase):
  102. def test_name_validity(self):
  103. for name in [
  104. 'FOO.BAR.com',
  105. 'tEst.dedyn.io',
  106. 'ORG',
  107. '--BLAH.example.com',
  108. '_ASDF.jp',
  109. ]:
  110. with self.assertRaises(ValidationError):
  111. Domain(owner=self.owner, name=name).save()
  112. for name in [
  113. '_example.com', '_.example.com',
  114. '-dedyn.io', '--dedyn.io', '-.dedyn123.io',
  115. 'foobar.io', 'exam_ple.com',
  116. ]:
  117. with self.assertPdnsRequests(
  118. self.requests_desec_domain_creation(name=name)[:-1] # no serializer, no cryptokeys API call
  119. ), PDNSChangeTracker():
  120. Domain(owner=self.owner, name=name).save()
  121. def test_list_domains(self):
  122. with self.assertPdnsNoRequestsBut(self.request_pdns_zone_retrieve_crypto_keys()):
  123. response = self.client.get(self.reverse('v1:domain-list'))
  124. self.assertStatus(response, status.HTTP_200_OK)
  125. self.assertEqual(len(response.data), self.NUM_OWNED_DOMAINS)
  126. response_set = {data['name'] for data in response.data}
  127. expected_set = {domain.name for domain in self.my_domains}
  128. self.assertEqual(response_set, expected_set)
  129. def test_delete_my_domain(self):
  130. url = self.reverse('v1:domain-detail', name=self.my_domain.name)
  131. with self.assertPdnsRequests(self.requests_desec_domain_deletion()):
  132. response = self.client.delete(url)
  133. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  134. self.assertFalse(Domain.objects.filter(pk=self.my_domain.pk).exists())
  135. response = self.client.get(url)
  136. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  137. def test_delete_other_domain(self):
  138. url = self.reverse('v1:domain-detail', name=self.other_domain.name)
  139. response = self.client.delete(url)
  140. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  141. self.assertTrue(Domain.objects.filter(pk=self.other_domain.pk).exists())
  142. def test_retrieve_my_domain(self):
  143. url = self.reverse('v1:domain-detail', name=self.my_domain.name)
  144. with self.assertPdnsRequests(
  145. self.request_pdns_zone_retrieve_crypto_keys(name=self.my_domain.name)
  146. ):
  147. response = self.client.get(url)
  148. self.assertStatus(response, status.HTTP_200_OK)
  149. self.assertEqual(response.data['name'], self.my_domain.name)
  150. self.assertTrue(isinstance(response.data['keys'], list))
  151. def test_retrieve_other_domains(self):
  152. for domain in self.other_domains:
  153. response = self.client.get(self.reverse('v1:domain-detail', name=domain.name))
  154. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  155. def test_update_my_domain_name(self):
  156. url = self.reverse('v1:domain-detail', name=self.my_domain.name)
  157. with self.assertPdnsRequests(self.request_pdns_zone_retrieve_crypto_keys(name=self.my_domain.name)):
  158. response = self.client.get(url)
  159. self.assertStatus(response, status.HTTP_200_OK)
  160. response.data['name'] = self.random_domain_name()
  161. response = self.client.put(url, response.data, format='json')
  162. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  163. with self.assertPdnsRequests(self.request_pdns_zone_retrieve_crypto_keys(name=self.my_domain.name)):
  164. response = self.client.get(url)
  165. self.assertStatus(response, status.HTTP_200_OK)
  166. self.assertEqual(response.data['name'], self.my_domain.name)
  167. def test_update_my_domain_immutable(self):
  168. url = self.reverse('v1:domain-detail', name=self.my_domain.name)
  169. with self.assertPdnsRequests(self.request_pdns_zone_retrieve_crypto_keys(name=self.my_domain.name)):
  170. response = self.client.get(url)
  171. self.assertStatus(response, status.HTTP_200_OK)
  172. created = response.data['created']
  173. keys = response.data['keys']
  174. published = response.data['published']
  175. response.data['created'] = '2019-08-07T18:34:39.249227Z'
  176. response.data['published'] = '2019-08-07T18:34:39.249227Z'
  177. response.data['keys'] = [{'dnskey': '257 3 13 badefefe'}]
  178. self.assertNotEqual(response.data['created'], created)
  179. self.assertNotEqual(response.data['published'], published)
  180. self.assertNotEqual(response.data['keys'], keys)
  181. with self.assertPdnsRequests(self.request_pdns_zone_retrieve_crypto_keys(name=self.my_domain.name)):
  182. response = self.client.put(url, response.data, format='json')
  183. self.assertStatus(response, status.HTTP_200_OK)
  184. self.assertEqual(response.data['created'], created)
  185. self.assertEqual(response.data['published'], published)
  186. self.assertEqual(response.data['keys'], keys)
  187. def test_update_other_domains(self):
  188. url = self.reverse('v1:domain-detail', name=self.other_domain.name)
  189. response = self.client.put(url, {}, format='json')
  190. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  191. def test_create_domains(self):
  192. self.owner.limit_domains = 100
  193. self.owner.save()
  194. for name in [
  195. '0.8.0.0.0.1.c.a.2.4.6.0.c.e.e.d.4.4.0.1.a.0.1.0.8.f.4.0.1.0.a.2.ip6.arpa',
  196. 'very.long.domain.name.' + self.random_domain_name(),
  197. self.random_domain_name(),
  198. 'very.long.domain.name.with_underscore.' + self.random_domain_name(),
  199. ]:
  200. with self.assertPdnsRequests(self.requests_desec_domain_creation(name)):
  201. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  202. self.assertStatus(response, status.HTTP_201_CREATED)
  203. self.assertEqual(len(mail.outbox), 0)
  204. with self.assertPdnsRequests(self.request_pdns_zone_retrieve_crypto_keys(name)):
  205. self.assertStatus(
  206. self.client.get(self.reverse('v1:domain-detail', name=name), {'name': name}),
  207. status.HTTP_200_OK
  208. )
  209. response = self.client.get_rr_sets(name, type='NS', subname='')
  210. self.assertStatus(response, status.HTTP_200_OK)
  211. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])
  212. def test_create_api_known_domain(self):
  213. url = self.reverse('v1:domain-list')
  214. for name in [
  215. self.random_domain_name(),
  216. 'www.' + self.my_domain.name,
  217. ]:
  218. with self.assertPdnsRequests(self.requests_desec_domain_creation(name)):
  219. response = self.client.post(url, {'name': name})
  220. self.assertStatus(response, status.HTTP_201_CREATED)
  221. response = self.client.post(url, {'name': name})
  222. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  223. def test_create_domain_with_whitespace(self):
  224. for name in [
  225. ' ' + self.random_domain_name(),
  226. self.random_domain_name() + ' ',
  227. ]:
  228. self.assertResponse(
  229. self.client.post(self.reverse('v1:domain-list'), {'name': name}),
  230. status.HTTP_400_BAD_REQUEST,
  231. {'name': ['Invalid value (not a DNS name).']},
  232. )
  233. def test_create_public_suffixes(self):
  234. for name in self.PUBLIC_SUFFIXES:
  235. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  236. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  237. self.assertEqual(response.data['name'][0].code, 'name_unavailable')
  238. def test_create_domain_under_public_suffix_with_private_parent(self):
  239. name = 'amazonaws.com'
  240. with self.assertPdnsRequests(self.requests_desec_domain_creation(name)[:-1]), PDNSChangeTracker():
  241. Domain(owner=self.create_user(), name=name).save()
  242. self.assertTrue(Domain.objects.filter(name=name).exists())
  243. # If amazonaws.com is owned by another user, we cannot register test.s4.amazonaws.com
  244. name = 'test.s4.amazonaws.com'
  245. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  246. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  247. self.assertEqual(response.data['name'][0].code, 'name_unavailable')
  248. # s3.amazonaws.com is a public suffix. Therefore, test.s3.amazonaws.com can be
  249. # registered even if the parent zone amazonaws.com is owned by another user
  250. name = 'test.s3.amazonaws.com'
  251. psl_cm = self.get_psl_context_manager('s3.amazonaws.com')
  252. with psl_cm, self.assertPdnsRequests(self.requests_desec_domain_creation(name)):
  253. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  254. self.assertStatus(response, status.HTTP_201_CREATED)
  255. def test_create_domain_under_unsupported_public_suffix_rule(self):
  256. # Show lenience if the PSL library produces an UnsupportedRule exception
  257. name = 'unsupported.wildcard.test'
  258. psl_cm = self.get_psl_context_manager(UnsupportedRule)
  259. with psl_cm, self.assertPdnsRequests():
  260. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  261. self.assertStatus(response, status.HTTP_503_SERVICE_UNAVAILABLE)
  262. def test_create_domain_policy(self):
  263. for name in ['1.2.3..4.test.dedyn.io', 'test..de', '*.' + self.random_domain_name(), 'a' * 64 + '.bla.test']:
  264. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  265. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  266. self.assertTrue("Invalid value (not a DNS name)." in response.data['name'][0])
  267. def test_create_domain_other_parent(self):
  268. name = 'something.' + self.other_domain.name
  269. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  270. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  271. self.assertEqual(response.data['name'][0].code, 'name_unavailable')
  272. def test_create_domain_atomicity(self):
  273. name = self.random_domain_name()
  274. with self.assertPdnsRequests(self.request_pdns_zone_create_422()):
  275. self.client.post(self.reverse('v1:domain-list'), {'name': name})
  276. self.assertFalse(Domain.objects.filter(name=name).exists())
  277. def test_create_domain_punycode(self):
  278. names = ['公司.cn', 'aéroport.ci']
  279. for name in names:
  280. self.assertStatus(
  281. self.client.post(self.reverse('v1:domain-list'), {'name': name}),
  282. status.HTTP_400_BAD_REQUEST
  283. )
  284. for name in [n.encode('idna').decode('ascii') for n in names]:
  285. with self.assertPdnsRequests(self.requests_desec_domain_creation(name=name)):
  286. self.assertStatus(
  287. self.client.post(self.reverse('v1:domain-list'), {'name': name}),
  288. status.HTTP_201_CREATED
  289. )
  290. def test_create_domain_name_validation(self):
  291. for name in [
  292. 'with space.dedyn.io',
  293. 'another space.de',
  294. ' spaceatthebeginning.com',
  295. 'percentage%sign.com',
  296. '%percentagesign.dedyn.io',
  297. 'slash/desec.io',
  298. '/slashatthebeginning.dedyn.io',
  299. '\\backslashatthebeginning.dedyn.io',
  300. 'backslash\\inthemiddle.at',
  301. '@atsign.com',
  302. 'at@sign.com',
  303. ]:
  304. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  305. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  306. self.assertEqual(len(mail.outbox), 0)
  307. def test_domain_minimum_ttl(self):
  308. url = self.reverse('v1:domain-list')
  309. name = self.random_domain_name()
  310. with self.assertPdnsRequests(self.requests_desec_domain_creation(name=name)):
  311. response = self.client.post(url, {'name': name})
  312. self.assertStatus(response, status.HTTP_201_CREATED)
  313. self.assertEqual(response.data['minimum_ttl'], settings.MINIMUM_TTL_DEFAULT)
  314. class AutoDelegationDomainOwnerTests(DomainOwnerTestCase):
  315. DYN = True
  316. def test_delete_my_domain(self):
  317. url = self.reverse('v1:domain-detail', name=self.my_domain.name)
  318. with self.assertPdnsRequests(
  319. self.requests_desec_domain_deletion_auto_delegation(name=self.my_domain.name)
  320. ):
  321. response = self.client.delete(url)
  322. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  323. response = self.client.get(url)
  324. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  325. def test_delete_other_domains(self):
  326. url = self.reverse('v1:domain-detail', name=self.other_domain.name)
  327. with self.assertPdnsRequests():
  328. response = self.client.delete(url)
  329. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  330. self.assertTrue(Domain.objects.filter(pk=self.other_domain.pk).exists())
  331. def test_create_auto_delegated_domains(self):
  332. for i, suffix in enumerate(self.AUTO_DELEGATION_DOMAINS):
  333. name = self.random_domain_name(suffix)
  334. with self.assertPdnsRequests(self.requests_desec_domain_creation_auto_delegation(name=name)):
  335. response = self.client.post(self.reverse('v1:domain-list'), {'name': name})
  336. self.assertStatus(response, status.HTTP_201_CREATED)
  337. self.assertFalse(mail.outbox) # do not send email
  338. def test_domain_limit(self):
  339. url = self.reverse('v1:domain-list')
  340. user_quota = settings.LIMIT_USER_DOMAIN_COUNT_DEFAULT - self.NUM_OWNED_DOMAINS
  341. for i in range(user_quota):
  342. name = self.random_domain_name(self.AUTO_DELEGATION_DOMAINS)
  343. with self.assertPdnsRequests(self.requests_desec_domain_creation_auto_delegation(name)):
  344. response = self.client.post(url, {'name': name})
  345. self.assertStatus(response, status.HTTP_201_CREATED)
  346. response = self.client.post(url, {'name': self.random_domain_name(self.AUTO_DELEGATION_DOMAINS)})
  347. self.assertContains(response, 'Domain limit', status_code=status.HTTP_403_FORBIDDEN)
  348. self.assertFalse(mail.outbox) # do not send email
  349. def test_domain_minimum_ttl(self):
  350. url = self.reverse('v1:domain-list')
  351. name = self.random_domain_name(self.AUTO_DELEGATION_DOMAINS)
  352. with self.assertPdnsRequests(self.requests_desec_domain_creation_auto_delegation(name)):
  353. response = self.client.post(url, {'name': name})
  354. self.assertStatus(response, status.HTTP_201_CREATED)
  355. self.assertEqual(response.data['minimum_ttl'], 60)