test_rrsets.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. from ipaddress import IPv4Network
  2. import re
  3. from itertools import product
  4. from math import ceil, floor
  5. from django.conf import settings
  6. from django.core.exceptions import ValidationError
  7. from django.core.management import call_command
  8. from rest_framework import status
  9. from desecapi.models import Domain, RRset, RR_SET_TYPES_AUTOMATIC, RR_SET_TYPES_UNSUPPORTED
  10. from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase
  11. class UnauthenticatedRRSetTestCase(DesecTestCase):
  12. def test_unauthorized_access(self):
  13. url = self.reverse('v1:rrsets', name='example.com')
  14. for method in [
  15. self.client.get,
  16. self.client.post,
  17. self.client.put,
  18. self.client.delete,
  19. self.client.patch
  20. ]:
  21. response = method(url)
  22. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  23. class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase):
  24. def test_subname_validity(self):
  25. for subname in [
  26. 'aEroport',
  27. 'AEROPORT',
  28. 'aéroport'
  29. ]:
  30. with self.assertRaises(ValidationError):
  31. RRset(domain=self.my_domain, subname=subname, ttl=60, type='A').save()
  32. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  33. def test_retrieve_my_rr_sets(self):
  34. for response in [
  35. self.client.get_rr_sets(self.my_domain.name),
  36. self.client.get_rr_sets(self.my_domain.name, subname=''),
  37. ]:
  38. self.assertStatus(response, status.HTTP_200_OK)
  39. self.assertEqual(len(response.data), 1, response.data)
  40. def test_retrieve_my_rr_sets_pagination(self):
  41. def convert_links(links):
  42. mapping = {}
  43. for link in links.split(', '):
  44. _url, label = link.split('; ')
  45. label = re.search('rel="(.*)"', label).group(1)
  46. _url = _url[1:-1]
  47. assert label not in mapping
  48. mapping[label] = _url
  49. return mapping
  50. def assertPaginationResponse(response, expected_length, expected_directional_links=[]):
  51. self.assertStatus(response, status.HTTP_200_OK)
  52. self.assertEqual(len(response.data), expected_length)
  53. _links = convert_links(response['Link'])
  54. self.assertEqual(len(_links), len(expected_directional_links) + 1) # directional links, plus "first"
  55. self.assertTrue(_links['first'].endswith('/?cursor='))
  56. for directional_link in expected_directional_links:
  57. self.assertEqual(_links['first'].find('/?cursor='), _links[directional_link].find('/?cursor='))
  58. self.assertTrue(len(_links[directional_link]) > len(_links['first']))
  59. # Prepare extra records so that we get three pages (total: n + 1)
  60. n = int(settings.REST_FRAMEWORK['PAGE_SIZE'] * 2.5)
  61. RRset.objects.bulk_create(
  62. [RRset(domain=self.my_domain, subname=str(i), ttl=123, type='A') for i in range(n)]
  63. )
  64. # No pagination
  65. response = self.client.get_rr_sets(self.my_domain.name)
  66. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  67. self.assertEqual(response.data['detail'],
  68. f'Pagination required. You can query up to {settings.REST_FRAMEWORK["PAGE_SIZE"]} items at a time ({n+1} total). '
  69. 'Please use the `first` page link (see Link header).')
  70. links = convert_links(response['Link'])
  71. self.assertEqual(len(links), 1)
  72. self.assertTrue(links['first'].endswith('/?cursor='))
  73. # First page
  74. url = links['first']
  75. response = self.client.get(url)
  76. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next'])
  77. # Next
  78. url = convert_links(response['Link'])['next']
  79. response = self.client.get(url)
  80. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  81. data_next = response.data.copy()
  82. # Next-next (last) page
  83. url = convert_links(response['Link'])['next']
  84. response = self.client.get(url)
  85. assertPaginationResponse(response, n/5 + 1, ['prev'])
  86. # Prev
  87. url = convert_links(response['Link'])['prev']
  88. response = self.client.get(url)
  89. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  90. # Make sure that one step forward equals two steps forward and one step back
  91. self.assertEqual(response.data, data_next)
  92. def test_retrieve_other_rr_sets(self):
  93. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  94. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  95. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  96. def test_retrieve_my_rr_sets_filter(self):
  97. response = self.client.get_rr_sets(self.my_rr_set_domain.name, query='?cursor=')
  98. self.assertStatus(response, status.HTTP_200_OK)
  99. expected_number_of_rrsets = min(len(self._test_rr_sets()), settings.REST_FRAMEWORK['PAGE_SIZE'])
  100. self.assertEqual(len(response.data), expected_number_of_rrsets)
  101. for subname in self.SUBNAMES:
  102. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  103. self.assertStatus(response, status.HTTP_200_OK)
  104. self.assertRRSetsCount(response.data, [dict(subname=subname)],
  105. count=len(self._test_rr_sets(subname=subname)))
  106. for type_ in self.ALLOWED_TYPES:
  107. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  108. self.assertStatus(response, status.HTTP_200_OK)
  109. def test_create_my_rr_sets(self):
  110. for subname in [None, 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  111. for data in [
  112. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'},
  113. {'subname': '' if subname is None else subname, 'records': ['desec.io.'], 'ttl': 36900, 'type': 'PTR'},
  114. {'subname': '' if subname is None else subname, 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']},
  115. {'subname': f'{subname}.cname'.lower(), 'ttl': 3600, 'type': 'CNAME', 'records': ['example.com.']},
  116. ]:
  117. # Try POST with missing subname
  118. if data['subname'] is None:
  119. data.pop('subname')
  120. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  121. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  122. self.assertStatus(response, status.HTTP_201_CREATED)
  123. self.assertTrue(all(field in response.data for field in
  124. ['created', 'domain', 'subname', 'name', 'records', 'ttl', 'type', 'touched']))
  125. self.assertEqual(self.my_empty_domain.touched,
  126. max(rrset.touched for rrset in self.my_empty_domain.rrset_set.all()))
  127. # Check for uniqueness on second attempt
  128. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  129. self.assertContains(response, 'Another RRset with the same subdomain and type exists for this domain.',
  130. status_code=status.HTTP_400_BAD_REQUEST)
  131. response = self.client.get_rr_sets(self.my_empty_domain.name)
  132. self.assertStatus(response, status.HTTP_200_OK)
  133. self.assertRRSetsCount(response.data, [data])
  134. response = self.client.get_rr_set(self.my_empty_domain.name, data.get('subname', ''), data['type'])
  135. self.assertStatus(response, status.HTTP_200_OK)
  136. self.assertRRSet(response.data, **data)
  137. def test_create_my_rr_sets_type_restriction(self):
  138. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  139. for data in [
  140. {'subname': subname, 'ttl': 60, 'type': 'a'},
  141. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  142. ] + [
  143. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  144. for type_ in self.UNSUPPORTED_TYPES
  145. ] + [
  146. {'subname': subname, 'records': ['get.desec.io. get.desec.io. 2584 10800 3600 604800 60'],
  147. 'ttl': 60, 'type': type_}
  148. for type_ in self.AUTOMATIC_TYPES
  149. ]:
  150. response = self.client.post_rr_set(self.my_domain.name, **data)
  151. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  152. response = self.client.get_rr_sets(self.my_domain.name)
  153. self.assertStatus(response, status.HTTP_200_OK)
  154. self.assertRRSetsCount(response.data, [data], count=0)
  155. def test_create_my_rr_sets_cname_at_apex(self):
  156. data = {'subname': '', 'ttl': 3600, 'type': 'CNAME', 'records': ['foobar.com.']}
  157. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  158. self.assertContains(response, 'CNAME RRset cannot have empty subname', status_code=status.HTTP_400_BAD_REQUEST)
  159. def test_create_my_rr_sets_cname_exclusivity(self):
  160. self.create_rr_set(self.my_domain, ['1.2.3.4'], type='A', ttl=3600, subname='a')
  161. self.create_rr_set(self.my_domain, ['example.com.'], type='CNAME', ttl=3600, subname='cname')
  162. # Can't add a CNAME where something else is
  163. data = {'subname': 'a', 'ttl': 3600, 'type': 'CNAME', 'records': ['foobar.com.']}
  164. response = self.client.post_rr_set(self.my_domain.name, **data)
  165. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  166. # Can't add something else where a CNAME is
  167. data = {'subname': 'cname', 'ttl': 3600, 'type': 'A', 'records': ['4.3.2.1']}
  168. response = self.client.post_rr_set(self.my_domain.name, **data)
  169. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  170. def test_create_my_rr_sets_without_records(self):
  171. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  172. for data in [
  173. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  174. {'subname': subname, 'ttl': 60, 'type': 'A'},
  175. ]:
  176. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  177. self.assertStatus(
  178. response,
  179. status.HTTP_400_BAD_REQUEST
  180. )
  181. response = self.client.get_rr_sets(self.my_empty_domain.name)
  182. self.assertStatus(response, status.HTTP_200_OK)
  183. self.assertRRSetsCount(response.data, [], count=0)
  184. def test_create_other_rr_sets(self):
  185. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  186. response = self.client.post_rr_set(self.other_domain.name, **data)
  187. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  188. @staticmethod
  189. def _create_test_txt_record(record, type_='TXT'):
  190. return {'records': [f'{record}'], 'ttl': 3600, 'type': type_, 'subname': f'name{len(record)}'}
  191. def test_create_my_rr_sets_chunk_too_long(self):
  192. for l, t in product([1, 255, 256, 498], ['TXT', 'SPF']):
  193. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  194. response = self.client.post_rr_set(
  195. self.my_empty_domain.name,
  196. **self._create_test_txt_record(f'"{"A" * l}"', t)
  197. )
  198. self.assertStatus(response, status.HTTP_201_CREATED)
  199. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  200. self.client.delete_rr_set(self.my_empty_domain.name, type_=t, subname=f'name{l+2}')
  201. def test_create_my_rr_sets_too_long_content(self):
  202. def token(length):
  203. if length == 0:
  204. return ''
  205. if length <= 255:
  206. return f'"{"A" * length}"'
  207. return f'{token(255)} ' * (length // 255) + token(length % 255)
  208. def p2w_length(length):
  209. return ceil(length / 255 * 256)
  210. def w2p_length(length):
  211. return floor(length / 256 * 255)
  212. max_wirelength = 64000
  213. max_preslength = w2p_length(max_wirelength)
  214. assert max_preslength == 63750
  215. assert p2w_length(max_preslength) == 64000
  216. assert p2w_length(max_preslength + 1) == 64002
  217. for t in ['SPF', 'TXT']:
  218. response = self.client.post_rr_set(
  219. self.my_empty_domain.name,
  220. # record of wire length 501 bytes in chunks of max 255 each (RFC 4408)
  221. **self._create_test_txt_record(token(max_preslength + 1), t)
  222. )
  223. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  224. self.assertIn(
  225. f'Ensure this value has no more than {max_wirelength} byte in wire format (it has {p2w_length(max_preslength + 1)}).',
  226. str(response.data)
  227. )
  228. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  229. response = self.client.post_rr_set(
  230. self.my_empty_domain.name,
  231. # record of wire length 500 bytes in chunks of max 255 each (RFC 4408)
  232. **self._create_test_txt_record(token(max_preslength))
  233. )
  234. self.assertStatus(response, status.HTTP_201_CREATED)
  235. def test_create_my_rr_sets_too_large_rrset(self):
  236. network = IPv4Network('127.0.0.0/20') # size: 4096 IP addresses
  237. data = {'records': [str(ip) for ip in network], 'ttl': 3600, 'type': 'A', 'subname': 'name'}
  238. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  239. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  240. excess_length = 28743 + len(self.my_empty_domain.name)
  241. self.assertIn(f'Total length of RRset exceeds limit by {excess_length} bytes.', str(response.data))
  242. def test_create_my_rr_sets_twice(self):
  243. data = {'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'}
  244. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  245. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  246. self.assertStatus(response, status.HTTP_201_CREATED)
  247. data['records'][0] = '3.2.2.1'
  248. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  249. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  250. def test_create_my_rr_sets_duplicate_content(self):
  251. for records in [
  252. ['::1', '0::1'],
  253. # TODO add more examples
  254. ]:
  255. data = {'records': records, 'ttl': 3660, 'type': 'AAAA'}
  256. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  257. self.assertContains(response, 'Duplicate', status_code=status.HTTP_400_BAD_REQUEST)
  258. def test_create_my_rr_sets_upper_case(self):
  259. for subname in ['asdF', 'cAse', 'asdf.FOO', '--F', 'ALLCAPS']:
  260. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': subname}
  261. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  262. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  263. self.assertIn('Subname can only use (lowercase)', str(response.data))
  264. def test_create_my_rr_sets_subname_too_many_dots(self):
  265. for subname in ['dottest.', '.dottest', 'dot..test']:
  266. data = {'subname': subname, 'records': ['10 example.com.'], 'ttl': 3600, 'type': 'MX'}
  267. response = self.client.post_rr_set(self.my_domain.name, **data)
  268. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  269. response = self.client.get_rr_sets(self.my_domain.name)
  270. self.assertStatus(response, status.HTTP_200_OK)
  271. self.assertRRSetsCount(response.data, [data], count=0)
  272. def test_create_my_rr_sets_empty_payload(self):
  273. response = self.client.post_rr_set(self.my_empty_domain.name)
  274. self.assertContains(response, 'No data provided', status_code=status.HTTP_400_BAD_REQUEST)
  275. def test_create_my_rr_sets_cname_two_records(self):
  276. data = {'subname': 'sub', 'records': ['example.com.', 'example.org.'], 'ttl': 3600, 'type': 'CNAME'}
  277. response = self.client.post_rr_set(self.my_domain.name, **data)
  278. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  279. def test_create_my_rr_sets_canonical_content(self):
  280. # TODO fill in more examples
  281. datas = [
  282. # record type: (non-canonical input, canonical output expectation)
  283. ('A', ('127.0.0.1', '127.0.0.1')),
  284. ('AAAA', ('0000::0000:0001', '::1')),
  285. ('AFSDB', ('02 turquoise.FEMTO.edu.', '2 turquoise.femto.edu.')),
  286. ('CAA', ('0128 "issue" "letsencrypt.org"', '128 issue "letsencrypt.org"')),
  287. ('CERT', ('06 00 00 sadfdd==', '6 0 0 sadfdQ==')),
  288. ('CNAME', ('EXAMPLE.COM.', 'example.com.')),
  289. ('DHCID', ('xxxx', 'xxxx')),
  290. ('DLV', ('6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  291. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  292. ('DLV', ('6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  293. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  294. ('DS', ('6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  295. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  296. ('DS', ('6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520',
  297. '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())),
  298. ('EUI48', ('AA-BB-CC-DD-EE-FF', 'aa-bb-cc-dd-ee-ff')),
  299. ('EUI64', ('AA-BB-CC-DD-EE-FF-aa-aa', 'aa-bb-cc-dd-ee-ff-aa-aa')),
  300. ('HINFO', ('cpu os', '"cpu" "os"')),
  301. ('HINFO', ('"cpu" "os"', '"cpu" "os"')),
  302. # ('IPSECKEY', ('01 00 02 . ASDFAF==', '1 0 2 . ASDFAA==')),
  303. # ('IPSECKEY', ('01 00 02 . 000000==', '1 0 2 . 00000w==')),
  304. ('KX', ('010 example.com.', '10 example.com.')),
  305. ('LOC', ('023 012 59 N 042 022 48.500 W 65.00m 20.00m 10.00m 10.00m',
  306. '23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m')),
  307. ('MX', ('10 010.1.1.1.', '10 010.1.1.1.')),
  308. ('MX', ('010 010.1.1.2.', '10 010.1.1.2.')),
  309. ('NAPTR', ('100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.',
  310. '100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.')),
  311. ('NS', ('EXaMPLE.COM.', 'example.com.')),
  312. ('OPENPGPKEY', ('mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tS xLFJYhX+uabSgMrhOqUizJhkLx82',
  313. 'mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82')),
  314. ('PTR', ('EXAMPLE.COM.', 'example.com.')),
  315. ('RP', ('hostmaster.EXAMPLE.com. .', 'hostmaster.example.com. .')),
  316. ('SMIMEA', ('3 01 0 aaBBccddeeff', '3 1 0 aabbccddeeff')),
  317. ('SPF', ('"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"',
  318. '"v=spf1 ip4:10.1" ".1.1 ip4:127" ".0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"')),
  319. ('SPF', ('"foo" "bar"', '"foo" "bar"')),
  320. ('SPF', ('"foobar"', '"foobar"')),
  321. ('SRV', ('0 000 0 .', '0 0 0 .')),
  322. ('SRV', ('100 1 5061 EXAMPLE.com.', '100 1 5061 example.com.')),
  323. ('SRV', ('100 1 5061 example.com.', '100 1 5061 example.com.')),
  324. ('SSHFP', ('2 2 aabbccEEddff', '2 2 aabbcceeddff')),
  325. ('TLSA', ('3 0001 1 000AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', '3 1 1 000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')),
  326. ('TLSA', ('003 00 002 696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd',
  327. '3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd')),
  328. ('TLSA', ('3 0 2 696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696B8F6B92A913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd',
  329. '3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd')),
  330. ('TXT', ('"foo" "bar"', '"foo" "bar"')),
  331. ('TXT', ('"foobar"', '"foobar"')),
  332. ('TXT', ('"foo" "" "bar"', '"foo" "" "bar"')),
  333. ('TXT', ('"" "" "foo" "" "bar"', '"" "" "foo" "" "bar"')),
  334. ('URI', ('10 01 "ftp://ftp1.example.com/public"', '10 1 "ftp://ftp1.example.com/public"')),
  335. ]
  336. for t, (record, canonical_record) in datas:
  337. if not record:
  338. continue
  339. data = {'records': [record], 'ttl': 3660, 'type': t, 'subname': 'test'}
  340. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  341. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  342. self.assertStatus(response, status.HTTP_201_CREATED)
  343. self.assertEqual(canonical_record, response.data['records'][0],
  344. f'For RR set type {t}, expected \'{canonical_record}\' to be the canonical form of '
  345. f'\'{record}\', but saw \'{response.data["records"][0]}\'.')
  346. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  347. response = self.client.delete_rr_set(self.my_empty_domain.name, subname='test', type_=t)
  348. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  349. self.assertAllSupportedRRSetTypes(set(t for t, _ in datas))
  350. def test_create_my_rr_sets_known_type_benign(self):
  351. # TODO fill in more examples
  352. datas = {
  353. 'A': ['127.0.0.1', '127.0.0.2'],
  354. 'AAAA': ['::1', '::2'],
  355. 'AFSDB': ['2 turquoise.femto.edu.'],
  356. 'CAA': ['128 issue "letsencrypt.org"', '128 iodef "mailto:desec@example.com"', '1 issue "letsencrypt.org"'],
  357. 'CERT': ['6 0 0 sadfdd=='],
  358. 'CNAME': ['example.com.'],
  359. 'DHCID': ['aaaaaaaaaaaa', 'aa aaa aaaa a a a'],
  360. 'DLV': ['39556 13 1 aabbccddeeff'],
  361. 'DS': ['39556 13 1 aabbccddeeff'],
  362. 'EUI48': ['aa-bb-cc-dd-ee-ff', 'AA-BB-CC-DD-EE-FF'],
  363. 'EUI64': ['aa-bb-cc-dd-ee-ff-00-11', 'AA-BB-CC-DD-EE-FF-00-11'],
  364. 'HINFO': ['"ARMv8-A" "Linux"'],
  365. # 'IPSECKEY': [
  366. # '12 0 2 . asdfdf==',
  367. # '03 1 1 127.0.0.1 asdfdf==',
  368. # '10 02 2 bade::affe AQNRU3mG7TVTO2BkR47usntb102uFJtugbo6BSGvgqt4AQ==',
  369. # '12 3 01 example.com. asdfdf==',
  370. # ],
  371. 'KX': ['4 example.com.', '28 io.'],
  372. 'LOC': ['23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m'],
  373. 'MX': ['10 example.com.', '20 1.1.1.1.'],
  374. 'NAPTR': ['100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.'],
  375. 'NS': ['ns1.example.com.'],
  376. 'OPENPGPKEY': [
  377. 'mG8EXtVIsRMFK4EEACIDAwQSZPNqE4tSxLFJYhX+uabSgMrhOqUizJhkLx82', # key incomplete
  378. 'YWFh\xf0\x9f\x92\xa9YWFh', # valid as non-alphabet bytes will be ignored
  379. ],
  380. 'PTR': ['example.com.', '*.example.com.'],
  381. 'RP': ['hostmaster.example.com. .'],
  382. 'SMIMEA': ['3 1 0 aabbccddeeff'],
  383. 'SPF': ['"v=spf1 include:example.com ~all"',
  384. '"v=spf1 ip4:10.1.1.1 ip4:127.0.0.0/16 ip4:192.168.0.0/27 include:example.com -all"',
  385. '"spf2.0/pra,mfrom ip6:2001:558:fe14:76:68:87:28:0/120 -all"'],
  386. 'SRV': ['0 0 0 .', '100 1 5061 example.com.'],
  387. 'SSHFP': ['2 2 aabbcceeddff'],
  388. 'TLSA': ['3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA',
  389. '3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd',
  390. '3 0 2 696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd696b8f6b92a913560b23ef5720c378881faffe74432d04eb35db957c0a93987b47adf26abb5dac10ba482597ae16edb069b511bec3e26010d1927bf6392760dd'],
  391. 'TXT': ['"foobar"', '"foo" "bar"', '"“红色联合”对“四·二八兵团”总部大楼的攻击已持续了两天"', '"new\\010line"'
  392. '"🧥 👚 👕 👖 👔 👗 👙 👘 👠 👡 👢 👞 👟 🥾 🥿 🧦 🧤 🧣 🎩 🧢 👒 🎓 ⛑ 👑 👝 👛 👜 💼 🎒 👓 🕶 🥽 🥼 🌂 🧵"'],
  393. 'URI': ['10 1 "ftp://ftp1.example.com/public"'],
  394. }
  395. self.assertAllSupportedRRSetTypes(set(datas.keys()))
  396. for t, records in datas.items():
  397. for r in records:
  398. data = {'records': [r], 'ttl': 3660, 'type': t, 'subname': 'test'}
  399. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  400. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  401. self.assertStatus(response, status.HTTP_201_CREATED)
  402. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  403. response = self.client.delete_rr_set(self.my_empty_domain.name, subname='test', type_=t)
  404. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  405. def test_create_my_rr_sets_known_type_invalid(self):
  406. # TODO fill in more examples
  407. datas = {
  408. # recordtype: [list of examples expected to be rejected, individually]
  409. 'A': ['127.0.0.999', '127.000.0.01', '127.0.0.256', '::1', 'foobar', '10.0.1', '10!'],
  410. 'AAAA': ['::g', '1:1:1:1:1:1:1:1:', '1:1:1:1:1:1:1:1:1'],
  411. 'AFSDB': ['example.com.', '1 1', '1 de'],
  412. 'CAA': ['43235 issue "letsencrypt.org"'],
  413. 'CERT': ['6 0 sadfdd=='],
  414. 'CNAME': ['example.com', '10 example.com.'],
  415. 'DHCID': ['x', 'xx', 'xxx'],
  416. 'DLV': ['-34 13 1 aabbccddeeff'],
  417. 'DS': ['-34 13 1 aabbccddeeff'],
  418. 'EUI48': ['aa-bb-ccdd-ee-ff', 'AA-BB-CC-DD-EE-GG'],
  419. 'EUI64': ['aa-bb-cc-dd-ee-ff-gg-11', 'AA-BB-C C-DD-EE-FF-00-11'],
  420. 'HINFO': ['"ARMv8-A"', f'"a" "{"b"*256}"'],
  421. # 'IPSECKEY': [],
  422. 'KX': ['-1 example.com', '10 example.com'],
  423. 'LOC': ['23 12 61.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m', 'foo', '1.1.1.1'],
  424. 'MX': ['10 example.com', 'example.com.', '-5 asdf.', '65537 asdf.'],
  425. 'NAPTR': ['100 50 "s" "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu',
  426. '100 50 "s" "" _z3950._tcp.gatech.edu.',
  427. '100 50 3 2 "z3950+I2L+I2C" "" _z3950._tcp.gatech.edu.'],
  428. 'NS': ['ns1.example.com', '127.0.0.1'],
  429. 'OPENPGPKEY': ['1 2 3'],
  430. 'PTR': ['"example.com."', '10 *.example.com.'],
  431. 'RP': ['hostmaster.example.com.', '10 foo.'],
  432. 'SMIMEA': ['3 1 0 aGVsbG8gd29ybGQh'],
  433. 'SPF': ['"v=spf1', 'v=spf1 include:example.com ~all'],
  434. 'SRV': ['0 0 0 0', '100 5061 example.com.'],
  435. 'SSHFP': ['aabbcceeddff'],
  436. 'TLSA': ['3 1 1 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'],
  437. 'TXT': ['foob"ar', 'v=spf1 include:example.com ~all', '"foo\nbar"', '"\x00" "NUL byte yo"'],
  438. 'URI': ['"1" "2" "3"'],
  439. }
  440. self.assertAllSupportedRRSetTypes(set(datas.keys()))
  441. for t, records in datas.items():
  442. for r in records:
  443. data = {'records': [r], 'ttl': 3660, 'type': t, 'subname': ''}
  444. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  445. self.assertNotContains(response, 'Duplicate', status_code=status.HTTP_400_BAD_REQUEST)
  446. def test_create_my_rr_sets_txt_splitting(self):
  447. for t in ['TXT', 'SPF']:
  448. for l in [200, 255, 256, 300, 400]:
  449. data = {'records': [f'"{"a"*l}"'], 'ttl': 3660, 'type': t, 'subname': f'x{l}'}
  450. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  451. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  452. self.assertStatus(response, status.HTTP_201_CREATED)
  453. response = self.client.get_rr_set(self.my_empty_domain.name, f'x{l}', t)
  454. num_tokens = response.data['records'][0].count(' ') + 1
  455. num_tokens_expected = l // 256 + 1
  456. self.assertEqual(num_tokens, num_tokens_expected,
  457. f'For a {t} record with a token of length of {l}, expected to see '
  458. f'{num_tokens_expected} tokens in the canonical format, but saw {num_tokens}.')
  459. self.assertEqual("".join(r.strip('" ') for r in response.data['records'][0]), 'a'*l)
  460. def test_create_my_rr_sets_unknown_type(self):
  461. for _type in ['AA', 'ASDF'] + list(RR_SET_TYPES_AUTOMATIC | RR_SET_TYPES_UNSUPPORTED):
  462. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=3660, type=_type)
  463. self.assertContains(
  464. response,
  465. text='managed automatically' if _type in RR_SET_TYPES_AUTOMATIC else 'type is currently unsupported',
  466. status_code=status.HTTP_400_BAD_REQUEST
  467. )
  468. def test_create_my_rr_sets_insufficient_ttl(self):
  469. ttl = settings.MINIMUM_TTL_DEFAULT - 1
  470. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.3.4'], ttl=ttl, type='A')
  471. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  472. detail = f'Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}.'
  473. self.assertEqual(response.data['ttl'][0], detail)
  474. ttl += 1
  475. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  476. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.23.4'], ttl=ttl, type='A')
  477. self.assertStatus(response, status.HTTP_201_CREATED)
  478. def test_retrieve_my_rr_sets_apex(self):
  479. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  480. self.assertStatus(response, status.HTTP_200_OK)
  481. self.assertEqual(response.data['records'][0], '1.2.3.4')
  482. self.assertEqual(response.data['ttl'], 3620)
  483. def test_retrieve_my_rr_sets_restricted_types(self):
  484. for type_ in self.AUTOMATIC_TYPES:
  485. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  486. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  487. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  488. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  489. def test_update_my_rr_sets(self):
  490. for subname in self.SUBNAMES:
  491. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  492. data = {'records': ['2.2.3.4'], 'ttl': 3630, 'type': 'A', 'subname': subname}
  493. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  494. self.assertStatus(response, status.HTTP_200_OK)
  495. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  496. self.assertStatus(response, status.HTTP_200_OK)
  497. self.assertEqual(response.data['records'], ['2.2.3.4'])
  498. self.assertEqual(response.data['ttl'], 3630)
  499. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'records': ['2.2.3.5']})
  500. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  501. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'ttl': 3637})
  502. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  503. def test_update_my_rr_set_with_invalid_payload_type(self):
  504. for subname in self.SUBNAMES:
  505. data = [{'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A', 'subname': subname}]
  506. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  507. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  508. self.assertEquals(response.data['non_field_errors'][0],
  509. 'Invalid data. Expected a dictionary, but got list.')
  510. data = 'foobar'
  511. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  512. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  513. self.assertEquals(response.data['non_field_errors'][0],
  514. 'Invalid data. Expected a dictionary, but got str.')
  515. def test_partially_update_my_rr_sets(self):
  516. for subname in self.SUBNAMES:
  517. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  518. for data in [
  519. {'records': ['2.2.3.4'], 'ttl': 3630},
  520. {'records': ['3.2.3.4']},
  521. {'records': ['3.2.3.4', '9.8.8.7']},
  522. {'ttl': 3637},
  523. ]:
  524. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  525. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  526. self.assertStatus(response, status.HTTP_200_OK)
  527. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  528. self.assertStatus(response, status.HTTP_200_OK)
  529. current_rr_set.update(data)
  530. self.assertEqual(response.data['records'], current_rr_set['records'])
  531. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  532. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  533. self.assertStatus(response, status.HTTP_200_OK)
  534. def test_rr_sets_touched_if_noop(self):
  535. for subname in self.SUBNAMES:
  536. touched_old = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  537. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  538. self.assertStatus(response, status.HTTP_200_OK)
  539. touched_new = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  540. self.assertGreater(touched_new, touched_old)
  541. self.assertEqual(Domain.objects.get(name=self.my_rr_set_domain.name).touched, touched_new)
  542. def test_partially_update_other_rr_sets(self):
  543. data = {'records': ['3.2.3.4'], 'ttl': 334}
  544. for subname in self.SUBNAMES:
  545. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  546. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  547. def test_update_other_rr_sets(self):
  548. data = {'ttl': 305}
  549. for subname in self.SUBNAMES:
  550. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  551. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  552. def test_update_essential_properties(self):
  553. # Changing the subname is expected to cause an error
  554. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  555. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test2', 'type': 'A'}
  556. response = self.client.patch(url, data)
  557. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  558. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  559. response = self.client.put(url, data)
  560. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  561. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  562. # Changing the type is expected to cause an error
  563. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test', 'type': 'TXT'}
  564. response = self.client.patch(url, data)
  565. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  566. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  567. response = self.client.put(url, data)
  568. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  569. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  570. # Changing "created" is no-op
  571. response = self.client.get(url)
  572. data = response.data
  573. created = data['created']
  574. data['created'] = '2019-07-19T17:22:49.575717Z'
  575. response = self.client.patch(url, data)
  576. self.assertStatus(response, status.HTTP_200_OK)
  577. response = self.client.put(url, data)
  578. self.assertStatus(response, status.HTTP_200_OK)
  579. # Check that nothing changed
  580. response = self.client.get(url)
  581. self.assertStatus(response, status.HTTP_200_OK)
  582. self.assertEqual(response.data['records'][0], '2.2.3.4')
  583. self.assertEqual(response.data['ttl'], 3620)
  584. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  585. self.assertEqual(response.data['subname'], 'test')
  586. self.assertEqual(response.data['type'], 'A')
  587. self.assertEqual(response.data['created'], created)
  588. # This is expected to work, but the fields are ignored
  589. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  590. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  591. response = self.client.patch(url, data)
  592. self.assertStatus(response, status.HTTP_200_OK)
  593. response = self.client.get(url)
  594. self.assertStatus(response, status.HTTP_200_OK)
  595. self.assertEqual(response.data['records'][0], '3.2.3.4')
  596. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  597. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  598. def test_update_unknown_rrset(self):
  599. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='doesnotexist', type='A')
  600. data = {'records': ['3.2.3.4'], 'ttl': 3620}
  601. response = self.client.patch(url, data)
  602. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  603. response = self.client.put(url, data)
  604. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  605. def test_delete_my_rr_sets_with_patch(self):
  606. data = {'records': []}
  607. for subname in self.SUBNAMES:
  608. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  609. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  610. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  611. # Deletion is only idempotent via DELETE. For PATCH/PUT, the view raises 404 if the instance does not
  612. # exist. By that time, the view has not parsed the payload yet and does not know it is a deletion.
  613. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  614. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  615. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  616. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  617. def test_delete_my_rr_sets_with_delete(self):
  618. for subname in self.SUBNAMES:
  619. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  620. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  621. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  622. domain = Domain.objects.get(name=self.my_rr_set_domain.name)
  623. self.assertEqual(domain.touched, domain.published)
  624. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  625. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  626. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  627. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  628. def test_delete_other_rr_sets(self):
  629. data = {'records': []}
  630. for subname in self.SUBNAMES:
  631. # Try PATCH empty
  632. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  633. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  634. # Try DELETE
  635. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname, 'A')
  636. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  637. # Make sure it actually is still there
  638. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  639. def test_import_rr_sets(self):
  640. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  641. call_command('sync-from-pdns', self.my_domain.name)
  642. for response in [
  643. self.client.get_rr_sets(self.my_domain.name),
  644. self.client.get_rr_sets(self.my_domain.name, subname=''),
  645. ]:
  646. self.assertStatus(response, status.HTTP_200_OK)
  647. self.assertEqual(len(response.data), 1, response.data)
  648. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])