test_rrsets.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. from ipaddress import IPv4Network
  2. import re
  3. from itertools import product
  4. from django.conf import settings
  5. from django.core.exceptions import ValidationError
  6. from django.core.management import call_command
  7. from rest_framework import status
  8. from desecapi.models import Domain, RRset, RR_SET_TYPES_AUTOMATIC, RR_SET_TYPES_UNSUPPORTED
  9. from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase
  10. class UnauthenticatedRRSetTestCase(DesecTestCase):
  11. def test_unauthorized_access(self):
  12. url = self.reverse('v1:rrsets', name='example.com')
  13. for method in [
  14. self.client.get,
  15. self.client.post,
  16. self.client.put,
  17. self.client.delete,
  18. self.client.patch
  19. ]:
  20. response = method(url)
  21. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  22. class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase):
  23. def test_subname_validity(self):
  24. for subname in [
  25. 'aEroport',
  26. 'AEROPORT',
  27. 'aéroport'
  28. ]:
  29. with self.assertRaises(ValidationError):
  30. RRset(domain=self.my_domain, subname=subname, ttl=60, type='A').save()
  31. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  32. def test_retrieve_my_rr_sets(self):
  33. for response in [
  34. self.client.get_rr_sets(self.my_domain.name),
  35. self.client.get_rr_sets(self.my_domain.name, subname=''),
  36. ]:
  37. self.assertStatus(response, status.HTTP_200_OK)
  38. self.assertEqual(len(response.data), 1, response.data)
  39. def test_retrieve_my_rr_sets_pagination(self):
  40. def convert_links(links):
  41. mapping = {}
  42. for link in links.split(', '):
  43. _url, label = link.split('; ')
  44. label = re.search('rel="(.*)"', label).group(1)
  45. _url = _url[1:-1]
  46. assert label not in mapping
  47. mapping[label] = _url
  48. return mapping
  49. def assertPaginationResponse(response, expected_length, expected_directional_links=[]):
  50. self.assertStatus(response, status.HTTP_200_OK)
  51. self.assertEqual(len(response.data), expected_length)
  52. _links = convert_links(response['Link'])
  53. self.assertEqual(len(_links), len(expected_directional_links) + 1) # directional links, plus "first"
  54. self.assertTrue(_links['first'].endswith('/?cursor='))
  55. for directional_link in expected_directional_links:
  56. self.assertEqual(_links['first'].find('/?cursor='), _links[directional_link].find('/?cursor='))
  57. self.assertTrue(len(_links[directional_link]) > len(_links['first']))
  58. # Prepare extra records so that we get three pages (total: n + 1)
  59. n = int(settings.REST_FRAMEWORK['PAGE_SIZE'] * 2.5)
  60. RRset.objects.bulk_create(
  61. [RRset(domain=self.my_domain, subname=str(i), ttl=123, type='A') for i in range(n)]
  62. )
  63. # No pagination
  64. response = self.client.get_rr_sets(self.my_domain.name)
  65. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  66. self.assertEqual(response.data['detail'],
  67. f'Pagination required. You can query up to {settings.REST_FRAMEWORK["PAGE_SIZE"]} items at a time ({n+1} total). '
  68. 'Please use the `first` page link (see Link header).')
  69. links = convert_links(response['Link'])
  70. self.assertEqual(len(links), 1)
  71. self.assertTrue(links['first'].endswith('/?cursor='))
  72. # First page
  73. url = links['first']
  74. response = self.client.get(url)
  75. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next'])
  76. # Next
  77. url = convert_links(response['Link'])['next']
  78. response = self.client.get(url)
  79. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  80. data_next = response.data.copy()
  81. # Next-next (last) page
  82. url = convert_links(response['Link'])['next']
  83. response = self.client.get(url)
  84. assertPaginationResponse(response, n/5 + 1, ['prev'])
  85. # Prev
  86. url = convert_links(response['Link'])['prev']
  87. response = self.client.get(url)
  88. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  89. # Make sure that one step forward equals two steps forward and one step back
  90. self.assertEqual(response.data, data_next)
  91. def test_retrieve_other_rr_sets(self):
  92. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  93. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  94. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  95. def test_retrieve_my_rr_sets_filter(self):
  96. response = self.client.get_rr_sets(self.my_rr_set_domain.name, query='?cursor=')
  97. self.assertStatus(response, status.HTTP_200_OK)
  98. expected_number_of_rrsets = min(len(self._test_rr_sets()), settings.REST_FRAMEWORK['PAGE_SIZE'])
  99. self.assertEqual(len(response.data), expected_number_of_rrsets)
  100. for subname in self.SUBNAMES:
  101. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  102. self.assertStatus(response, status.HTTP_200_OK)
  103. self.assertRRSetsCount(response.data, [dict(subname=subname)],
  104. count=len(self._test_rr_sets(subname=subname)))
  105. for type_ in self.ALLOWED_TYPES:
  106. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  107. self.assertStatus(response, status.HTTP_200_OK)
  108. def test_create_my_rr_sets(self):
  109. for subname in [None, 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  110. for data in [
  111. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'},
  112. {'subname': '' if subname is None else subname, 'records': ['desec.io.'], 'ttl': 36900, 'type': 'PTR'},
  113. {'subname': '' if subname is None else subname, 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']},
  114. ]:
  115. # Try POST with missing subname
  116. if data['subname'] is None:
  117. data.pop('subname')
  118. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  119. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  120. self.assertTrue(all(field in response.data for field in
  121. ['created', 'domain', 'subname', 'name', 'records', 'ttl', 'type', 'touched']))
  122. self.assertEqual(self.my_empty_domain.touched,
  123. max(rrset.touched for rrset in self.my_empty_domain.rrset_set.all()))
  124. self.assertStatus(response, status.HTTP_201_CREATED)
  125. # Check for uniqueness on second attempt
  126. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  127. self.assertContains(response, 'Another RRset with the same subdomain and type exists for this domain.',
  128. status_code=status.HTTP_400_BAD_REQUEST)
  129. response = self.client.get_rr_sets(self.my_empty_domain.name)
  130. self.assertStatus(response, status.HTTP_200_OK)
  131. self.assertRRSetsCount(response.data, [data])
  132. response = self.client.get_rr_set(self.my_empty_domain.name, data.get('subname', ''), data['type'])
  133. self.assertStatus(response, status.HTTP_200_OK)
  134. self.assertRRSet(response.data, **data)
  135. def test_create_my_rr_sets_type_restriction(self):
  136. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  137. for data in [
  138. {'subname': subname, 'ttl': 60, 'type': 'a'},
  139. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  140. ] + [
  141. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  142. for type_ in self.UNSUPPORTED_TYPES
  143. ] + [
  144. {'subname': subname, 'records': ['set.an.example. get.desec.io. 2584 10800 3600 604800 60'],
  145. 'ttl': 60, 'type': type_}
  146. for type_ in self.AUTOMATIC_TYPES
  147. ]:
  148. response = self.client.post_rr_set(self.my_domain.name, **data)
  149. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  150. response = self.client.get_rr_sets(self.my_domain.name)
  151. self.assertStatus(response, status.HTTP_200_OK)
  152. self.assertRRSetsCount(response.data, [data], count=0)
  153. def test_create_my_rr_sets_cname_at_apex(self):
  154. data = {'subname': '', 'ttl': 3600, 'type': 'CNAME', 'records': ['foobar.com.']}
  155. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  156. self.assertContains(response, 'CNAME RRset cannot have empty subname', status_code=status.HTTP_400_BAD_REQUEST)
  157. def test_create_my_rr_sets_without_records(self):
  158. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  159. for data in [
  160. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  161. {'subname': subname, 'ttl': 60, 'type': 'A'},
  162. ]:
  163. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  164. self.assertStatus(
  165. response,
  166. status.HTTP_400_BAD_REQUEST
  167. )
  168. response = self.client.get_rr_sets(self.my_empty_domain.name)
  169. self.assertStatus(response, status.HTTP_200_OK)
  170. self.assertRRSetsCount(response.data, [], count=0)
  171. def test_create_other_rr_sets(self):
  172. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  173. response = self.client.post_rr_set(self.other_domain.name, **data)
  174. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  175. @staticmethod
  176. def _create_test_txt_record(record, type_='TXT'):
  177. return {'records': [f'{record}'], 'ttl': 3600, 'type': type_, 'subname': f'name{len(record)}'}
  178. def test_create_my_rr_sets_chunk_too_long(self):
  179. for l, t in product([1, 255, 256, 498], ['TXT', 'SPF']):
  180. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  181. response = self.client.post_rr_set(
  182. self.my_empty_domain.name,
  183. **self._create_test_txt_record(f'"{"A" * l}"', t)
  184. )
  185. self.assertStatus(response, status.HTTP_201_CREATED)
  186. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  187. self.client.delete_rr_set(self.my_empty_domain.name, type_=t, subname=f'name{l+2}')
  188. def test_create_my_rr_sets_too_long_content(self):
  189. for t in ['SPF', 'TXT']:
  190. response = self.client.post_rr_set(
  191. self.my_empty_domain.name,
  192. # record of wire length 501 bytes in chunks of max 255 each (RFC 4408)
  193. **self._create_test_txt_record(f'"{"A" * 255}" "{"A" * 244}"', t)
  194. )
  195. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  196. self.assertIn(
  197. 'Ensure this value has no more than 500 byte in wire format (it has 501).',
  198. str(response.data)
  199. )
  200. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  201. response = self.client.post_rr_set(
  202. self.my_empty_domain.name,
  203. # record of wire length 500 bytes in chunks of max 255 each (RFC 4408)
  204. ** self._create_test_txt_record(f'"{"A" * 255}" "{"A" * 243}"')
  205. )
  206. self.assertStatus(response, status.HTTP_201_CREATED)
  207. def test_create_my_rr_sets_too_large_rrset(self):
  208. network = IPv4Network('127.0.0.0/20') # size: 4096 IP addresses
  209. data = {'records': [str(ip) for ip in network], 'ttl': 3600, 'type': 'A', 'subname': 'name'}
  210. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  211. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  212. excess_length = 28743 + len(self.my_empty_domain.name)
  213. self.assertIn(f'Total length of RRset exceeds limit by {excess_length} bytes.', str(response.data))
  214. def test_create_my_rr_sets_twice(self):
  215. data = {'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'}
  216. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  217. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  218. self.assertStatus(response, status.HTTP_201_CREATED)
  219. data['records'][0] = '3.2.2.1'
  220. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  221. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  222. def test_create_my_rr_sets_duplicate_content(self):
  223. for records in [
  224. ['::1', '0::1'],
  225. # TODO add more examples
  226. ]:
  227. data = {'records': records, 'ttl': 3660, 'type': 'AAAA'}
  228. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  229. self.assertContains(response, 'Duplicate', status_code=status.HTTP_400_BAD_REQUEST)
  230. def test_create_my_rr_sets_upper_case(self):
  231. for subname in ['asdF', 'cAse', 'asdf.FOO', '--F', 'ALLCAPS']:
  232. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': subname}
  233. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  234. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  235. self.assertIn('Subname can only use (lowercase)', str(response.data))
  236. def test_create_my_rr_sets_subname_too_many_dots(self):
  237. for subname in ['dottest.', '.dottest', 'dot..test']:
  238. data = {'subname': subname, 'records': ['10 example.com.'], 'ttl': 3600, 'type': 'MX'}
  239. response = self.client.post_rr_set(self.my_domain.name, **data)
  240. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  241. response = self.client.get_rr_sets(self.my_domain.name)
  242. self.assertStatus(response, status.HTTP_200_OK)
  243. self.assertRRSetsCount(response.data, [data], count=0)
  244. def test_create_my_rr_sets_empty_payload(self):
  245. response = self.client.post_rr_set(self.my_empty_domain.name)
  246. self.assertContains(response, 'No data provided', status_code=status.HTTP_400_BAD_REQUEST)
  247. def test_create_my_rr_sets_cname_two_records(self):
  248. data = {'subname': 'sub', 'records': ['example.com.', 'example.org.'], 'ttl': 3600, 'type': 'CNAME'}
  249. response = self.client.post_rr_set(self.my_domain.name, **data)
  250. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  251. def test_create_my_rr_sets_canonical_content(self):
  252. # TODO fill in more examples
  253. datas = [
  254. # record type: (non-canonical input, canonical output expectation)
  255. ('A', ('127.0.0.1', '127.0.0.1')),
  256. ('AAAA', ('0000::0000:0001', '::1')),
  257. ('AFSDB', ('02 turquoise.FEMTO.edu.', '2 turquoise.femto.edu.')),
  258. ('CAA', ('0128 "issue" "letsencrypt.org"', '128 issue "letsencrypt.org"')),
  259. ('CERT', ('06 00 00 sadfdd==', '6 0 0 sadfdQ==')),
  260. ('CNAME', ('EXAMPLE.COM.', 'example.com.')),
  261. ('DHCID', ('xxxx', 'xxxx')),
  262. ('DLV', ('6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  263. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  264. ('DLV', ('6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  265. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  266. ('DS', ('6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  267. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  268. ('DS', ('6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  269. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  270. ('EUI48', ('AA-BB-CC-DD-EE-FF', 'aa-bb-cc-dd-ee-ff')),
  271. ('EUI64', ('AA-BB-CC-DD-EE-FF-aa-aa', 'aa-bb-cc-dd-ee-ff-aa-aa')),
  272. ('HINFO', ('cpu os', '"cpu" "os"')),
  273. ('HINFO', ('"cpu" "os"', '"cpu" "os"')),
  274. # ('IPSECKEY', ('01 00 02 . ASDFAA==', '1 0 2 . ASDFAF==')),
  275. # ('IPSECKEY', ('01 00 02 . 00000w==', '1 0 2 . 000000==')),
  276. ('KX', ('010 example.com.', '10 example.com.')),
  277. ('LOC', ('023 012 59 N 042 022 48.500 W 65.00m 20.00m 10.00m 10.00m',
  278. '23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m')),
  279. ('MX', ('10 010.1.1.1.', '10 010.1.1.1.')),
  280. ('MX', ('010 010.1.1.2.', '10 010.1.1.2.')),
  281. ('NAPTR', ('100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.',
  282. '100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.')),
  283. ('NS', ('EXaMPLE.COM.', 'example.com.')),
  284. ('OPENPGPKEY', ('mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tS xLFJYhX+uabSgMrhOqUizJhkLx82',
  285. 'mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82')),
  286. ('PTR', ('EXAMPLE.COM.', 'example.com.')),
  287. ('RP', ('hostmaster.EXAMPLE.com. .', 'hostmaster.example.com. .')),
  288. # ('SMIMEA', ('3 01 0 aaBBccddeeff', '3 1 0 aabbccddeeff')),
  289. ('SPF', ('"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"',
  290. '"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"')),
  291. ('SPF', ('"foo" "bar"', '"foo" "bar"')),
  292. ('SPF', ('"foobar"', '"foobar"')),
  293. ('SRV', ('0 000 0 .', '0 0 0 .')),
  294. # ('SRV', ('100 1 5061 EXAMPLE.com.', '100 1 5061 example.com.')), # TODO fixed in dnspython 5c58601
  295. ('SRV', ('100 1 5061 example.com.', '100 1 5061 example.com.')),
  296. ('SSHFP', ('2 2 aabbccEEddff', '2 2 aabbcceeddff')),
  297. ('TLSA', ('3 0001 1 000AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', '3 1 1 000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')),
  298. ('TXT', ('"foo" "bar"', '"foo" "bar"')),
  299. ('TXT', ('"foobar"', '"foobar"')),
  300. ('TXT', ('"foo" "" "bar"', '"foo" "" "bar"')),
  301. ('TXT', ('"" "" "foo" "" "bar"', '"" "" "foo" "" "bar"')),
  302. ('URI', ('10 01 "ftp://ftp1.example.com/public"', '10 1 "ftp://ftp1.example.com/public"')),
  303. ]
  304. for t, (record, canonical_record) in datas:
  305. if not record:
  306. continue
  307. data = {'records': [record], 'ttl': 3660, 'type': t, 'subname': 'test'}
  308. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  309. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  310. self.assertStatus(response, status.HTTP_201_CREATED)
  311. self.assertEqual(canonical_record, response.data['records'][0],
  312. f'For RR set type {t}, expected \'{canonical_record}\' to be the canonical form of '
  313. f'\'{record}\', but saw \'{response.data["records"][0]}\'.')
  314. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  315. response = self.client.delete_rr_set(self.my_empty_domain.name, subname='test', type_=t)
  316. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  317. self.assertAllSupportedRRSetTypes(set(t for t, _ in datas))
  318. def test_create_my_rr_sets_known_type_benign(self):
  319. # TODO fill in more examples
  320. datas = {
  321. 'A': ['127.0.0.1', '127.0.0.2'],
  322. 'AAAA': ['::1', '::2'],
  323. 'AFSDB': ['2 turquoise.femto.edu.'],
  324. 'CAA': ['128 issue "letsencrypt.org"', '128 iodef "mailto:desec@example.com"', '1 issue "letsencrypt.org"'],
  325. 'CERT': ['6 0 0 sadfdd=='],
  326. 'CNAME': ['example.com.'],
  327. 'DHCID': ['aaaaaaaaaaaa', 'aa aaa aaaa a a a'],
  328. 'DLV': ['39556 13 1 aabbccddeeff'],
  329. 'DS': ['39556 13 1 aabbccddeeff'],
  330. 'EUI48': ['aa-bb-cc-dd-ee-ff', 'AA-BB-CC-DD-EE-FF'],
  331. 'EUI64': ['aa-bb-cc-dd-ee-ff-00-11', 'AA-BB-CC-DD-EE-FF-00-11'],
  332. 'HINFO': ['"ARMv8-A" "Linux"'],
  333. # 'IPSECKEY': ['12 0 2 . asdfdf==', '03 1 1 127.0.00.1 asdfdf==', '12 3 1 example.com. asdfdf==',],
  334. 'KX': ['4 example.com.', '28 io.'],
  335. 'LOC': ['23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m'],
  336. 'MX': ['10 example.com.', '20 1.1.1.1.'],
  337. 'NAPTR': ['100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.'],
  338. 'NS': ['ns1.example.com.'],
  339. 'OPENPGPKEY': [
  340. 'mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82', # key incomplete
  341. 'YWFh\xf0\x9f\x92\xa9YWFh', # valid as non-alphabet bytes will be ignored
  342. ],
  343. 'PTR': ['example.com.', '*.example.com.'],
  344. 'RP': ['hostmaster.example.com. .'],
  345. # 'SMIMEA': ['3 1 0 aabbccddeeff'],
  346. 'SPF': ['"v=spf1 include:example.com ~all"',
  347. '"v=spf1 ip4:10.1.1.1 ip4:127.0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"',
  348. '"spf2.0/pra,mfrom ip6:2001:558:fe14:76:68:87:28:0/120 -all"'],
  349. 'SRV': ['0 0 0 .', '100 1 5061 example.com.'],
  350. 'SSHFP': ['2 2 aabbcceeddff'],
  351. 'TLSA': ['3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'],
  352. 'TXT': ['"foobar"', '"foo" "bar"', '"“红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"', '"new\\010line"'
  353. '"🧥 👚 👕 👖 👔 👗 👙 👘 👠 👡 👢 👞 👟 🥾 🥿 🧦 🧤 🧣 🎩 🧢 👒 🎓 ⛑ 👑 👝 👛 👜 💼 🎒 👓 🕶 🥽 🥼 🌂 🧵"'],
  354. 'URI': ['10 1 "ftp://ftp1.example.com/public"'],
  355. }
  356. self.assertAllSupportedRRSetTypes(set(datas.keys()))
  357. for t, records in datas.items():
  358. for r in records:
  359. data = {'records': [r], 'ttl': 3660, 'type': t, 'subname': 'test'}
  360. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  361. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  362. self.assertStatus(response, status.HTTP_201_CREATED)
  363. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  364. response = self.client.delete_rr_set(self.my_empty_domain.name, subname='test', type_=t)
  365. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  366. def test_create_my_rr_sets_known_type_invalid(self):
  367. # TODO fill in more examples
  368. datas = {
  369. # recordtype: [list of examples expected to be rejected, individually]
  370. 'A': ['127.0.0.999', '127.000.0.01', '127.0.0.256', '::1', 'foobar', '10.0.1', '10!'],
  371. 'AAAA': ['::g', '1:1:1:1:1:1:1:1:', '1:1:1:1:1:1:1:1:1'],
  372. 'AFSDB': ['example.com.', '1 1', '1 de'],
  373. 'CAA': ['43235 issue "letsencrypt.org"'],
  374. 'CERT': ['6 0 sadfdd=='],
  375. 'CNAME': ['example.com', '10 example.com.'],
  376. 'DHCID': ['x', 'xx', 'xxx'],
  377. 'DLV': ['-34 13 1 aabbccddeeff'],
  378. 'DS': ['-34 13 1 aabbccddeeff'],
  379. 'EUI48': ['aa-bb-ccdd-ee-ff', 'AA-BB-CC-DD-EE-GG'],
  380. 'EUI64': ['aa-bb-cc-dd-ee-ff-gg-11', 'AA-BB-C C-DD-EE-FF-00-11'],
  381. 'HINFO': ['"ARMv8-A"', f'"a" "{"b"*256}"'],
  382. # 'IPSECKEY': [],
  383. 'KX': ['-1 example.com', '10 example.com'],
  384. 'LOC': ['23 12 61.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m', 'foo', '1.1.1.1'],
  385. 'MX': ['10 example.com', 'example.com.', '-5 asdf.', '65537 asdf.'],
  386. 'NAPTR': ['100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu',
  387. '100 50 "s" "" _z3950._tcp.gatech.edu.',
  388. '100 50 3 2 "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.'],
  389. 'NS': ['ns1.example.com', '127.0.0.1'],
  390. 'OPENPGPKEY': ['1 2 3'],
  391. 'PTR': ['"example.com."', '10 *.example.com.'],
  392. 'RP': ['hostmaster.example.com.', '10 foo.'],
  393. # 'SMIMEA': ['3 1 0 aGVsbG8gd29ybGQh'],
  394. 'SPF': ['"v=spf1', 'v=spf1 include:example.com ~all'],
  395. 'SRV': ['0 0 0 0', '100 5061 example.com.'],
  396. 'SSHFP': ['aabbcceeddff'],
  397. 'TLSA': ['3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'],
  398. 'TXT': ['foob"ar', 'v=spf1 include:example.com ~all', '"foo\nbar"', '"\x00" "NUL byte yo"'],
  399. 'URI': ['"1" "2" "3"'],
  400. }
  401. self.assertAllSupportedRRSetTypes(set(datas.keys()))
  402. for t, records in datas.items():
  403. for r in records:
  404. data = {'records': [r], 'ttl': 3660, 'type': t, 'subname': ''}
  405. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  406. self.assertNotContains(response, 'Duplicate', status_code=status.HTTP_400_BAD_REQUEST)
  407. def test_create_my_rr_sets_txt_splitting(self):
  408. for t in ['TXT', 'SPF']:
  409. for l in [200, 255, 256, 300, 400]:
  410. data = {'records': [f'"{"a"*l}"'], 'ttl': 3660, 'type': t, 'subname': f'x{l}'}
  411. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  412. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  413. self.assertStatus(response, status.HTTP_201_CREATED)
  414. response = self.client.get_rr_set(self.my_empty_domain.name, f'x{l}', t)
  415. num_tokens = response.data['records'][0].count(' ') + 1
  416. num_tokens_expected = l // 256 + 1
  417. self.assertEqual(num_tokens, num_tokens_expected,
  418. f'For a {t} record with a token of length of {l}, expected to see '
  419. f'{num_tokens_expected} tokens in the canonical format, but saw {num_tokens}.')
  420. self.assertEqual("".join(r.strip('" ') for r in response.data['records'][0]), 'a'*l)
  421. def test_create_my_rr_sets_unknown_type(self):
  422. for _type in ['AA', 'ASDF'] + list(RR_SET_TYPES_AUTOMATIC | RR_SET_TYPES_UNSUPPORTED):
  423. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=3660, type=_type)
  424. self.assertContains(
  425. response,
  426. text='managed automatically' if _type in RR_SET_TYPES_AUTOMATIC else 'type is currently unsupported',
  427. status_code=status.HTTP_400_BAD_REQUEST
  428. )
  429. def test_create_my_rr_sets_insufficient_ttl(self):
  430. ttl = settings.MINIMUM_TTL_DEFAULT - 1
  431. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.3.4'], ttl=ttl, type='A')
  432. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  433. detail = f'Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}.'
  434. self.assertEqual(response.data['ttl'][0], detail)
  435. ttl += 1
  436. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  437. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.23.4'], ttl=ttl, type='A')
  438. self.assertStatus(response, status.HTTP_201_CREATED)
  439. def test_retrieve_my_rr_sets_apex(self):
  440. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  441. self.assertStatus(response, status.HTTP_200_OK)
  442. self.assertEqual(response.data['records'][0], '1.2.3.4')
  443. self.assertEqual(response.data['ttl'], 3620)
  444. def test_retrieve_my_rr_sets_restricted_types(self):
  445. for type_ in self.AUTOMATIC_TYPES:
  446. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  447. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  448. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  449. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  450. def test_update_my_rr_sets(self):
  451. for subname in self.SUBNAMES:
  452. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  453. data = {'records': ['2.2.3.4'], 'ttl': 3630, 'type': 'A', 'subname': subname}
  454. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  455. self.assertStatus(response, status.HTTP_200_OK)
  456. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  457. self.assertStatus(response, status.HTTP_200_OK)
  458. self.assertEqual(response.data['records'], ['2.2.3.4'])
  459. self.assertEqual(response.data['ttl'], 3630)
  460. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'records': ['2.2.3.5']})
  461. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  462. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'ttl': 3637})
  463. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  464. def test_update_my_rr_set_with_invalid_payload_type(self):
  465. for subname in self.SUBNAMES:
  466. data = [{'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A', 'subname': subname}]
  467. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  468. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  469. self.assertEquals(response.data['non_field_errors'][0],
  470. 'Invalid data. Expected a dictionary, but got list.')
  471. data = 'foobar'
  472. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  473. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  474. self.assertEquals(response.data['non_field_errors'][0],
  475. 'Invalid data. Expected a dictionary, but got str.')
  476. def test_partially_update_my_rr_sets(self):
  477. for subname in self.SUBNAMES:
  478. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  479. for data in [
  480. {'records': ['2.2.3.4'], 'ttl': 3630},
  481. {'records': ['3.2.3.4']},
  482. {'records': ['3.2.3.4', '9.8.8.7']},
  483. {'ttl': 3637},
  484. ]:
  485. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  486. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  487. self.assertStatus(response, status.HTTP_200_OK)
  488. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  489. self.assertStatus(response, status.HTTP_200_OK)
  490. current_rr_set.update(data)
  491. self.assertEqual(response.data['records'], current_rr_set['records'])
  492. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  493. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  494. self.assertStatus(response, status.HTTP_200_OK)
  495. def test_rr_sets_touched_if_noop(self):
  496. for subname in self.SUBNAMES:
  497. touched_old = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  498. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  499. self.assertStatus(response, status.HTTP_200_OK)
  500. touched_new = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  501. self.assertGreater(touched_new, touched_old)
  502. self.assertEqual(Domain.objects.get(name=self.my_rr_set_domain.name).touched, touched_new)
  503. def test_partially_update_other_rr_sets(self):
  504. data = {'records': ['3.2.3.4'], 'ttl': 334}
  505. for subname in self.SUBNAMES:
  506. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  507. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  508. def test_update_other_rr_sets(self):
  509. data = {'ttl': 305}
  510. for subname in self.SUBNAMES:
  511. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  512. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  513. def test_update_essential_properties(self):
  514. # Changing the subname is expected to cause an error
  515. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  516. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test2', 'type': 'A'}
  517. response = self.client.patch(url, data)
  518. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  519. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  520. response = self.client.put(url, data)
  521. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  522. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  523. # Changing the type is expected to cause an error
  524. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test', 'type': 'TXT'}
  525. response = self.client.patch(url, data)
  526. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  527. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  528. response = self.client.put(url, data)
  529. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  530. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  531. # Changing "created" is no-op
  532. response = self.client.get(url)
  533. data = response.data
  534. created = data['created']
  535. data['created'] = '2019-07-19T17:22:49.575717Z'
  536. response = self.client.patch(url, data)
  537. self.assertStatus(response, status.HTTP_200_OK)
  538. response = self.client.put(url, data)
  539. self.assertStatus(response, status.HTTP_200_OK)
  540. # Check that nothing changed
  541. response = self.client.get(url)
  542. self.assertStatus(response, status.HTTP_200_OK)
  543. self.assertEqual(response.data['records'][0], '2.2.3.4')
  544. self.assertEqual(response.data['ttl'], 3620)
  545. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  546. self.assertEqual(response.data['subname'], 'test')
  547. self.assertEqual(response.data['type'], 'A')
  548. self.assertEqual(response.data['created'], created)
  549. # This is expected to work, but the fields are ignored
  550. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  551. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  552. response = self.client.patch(url, data)
  553. self.assertStatus(response, status.HTTP_200_OK)
  554. response = self.client.get(url)
  555. self.assertStatus(response, status.HTTP_200_OK)
  556. self.assertEqual(response.data['records'][0], '3.2.3.4')
  557. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  558. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  559. def test_update_unknown_rrset(self):
  560. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='doesnotexist', type='A')
  561. data = {'records': ['3.2.3.4'], 'ttl': 3620}
  562. response = self.client.patch(url, data)
  563. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  564. response = self.client.put(url, data)
  565. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  566. def test_delete_my_rr_sets_with_patch(self):
  567. data = {'records': []}
  568. for subname in self.SUBNAMES:
  569. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  570. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  571. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  572. # Deletion is only idempotent via DELETE. For PATCH/PUT, the view raises 404 if the instance does not
  573. # exist. By that time, the view has not parsed the payload yet and does not know it is a deletion.
  574. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  575. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  576. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  577. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  578. def test_delete_my_rr_sets_with_delete(self):
  579. for subname in self.SUBNAMES:
  580. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  581. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  582. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  583. domain = Domain.objects.get(name=self.my_rr_set_domain.name)
  584. self.assertEqual(domain.touched, domain.published)
  585. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  586. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  587. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  588. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  589. def test_delete_other_rr_sets(self):
  590. data = {'records': []}
  591. for subname in self.SUBNAMES:
  592. # Try PATCH empty
  593. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  594. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  595. # Try DELETE
  596. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname, 'A')
  597. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  598. # Make sure it actually is still there
  599. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  600. def test_import_rr_sets(self):
  601. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  602. call_command('sync-from-pdns', self.my_domain.name)
  603. for response in [
  604. self.client.get_rr_sets(self.my_domain.name),
  605. self.client.get_rr_sets(self.my_domain.name, subname=''),
  606. ]:
  607. self.assertStatus(response, status.HTTP_200_OK)
  608. self.assertEqual(len(response.data), 1, response.data)
  609. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])