test_rrsets.py 25 KB


  1. from ipaddress import IPv4Network
  2. import re
  3. from django.conf import settings
  4. from django.core.exceptions import ValidationError
  5. from django.core.management import call_command
  6. from rest_framework import status
  7. from desecapi.models import Domain, RRset
  8. from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase
  9. class UnauthenticatedRRSetTestCase(DesecTestCase):
  10. def test_unauthorized_access(self):
  11. url = self.reverse('v1:rrsets', name='example.com')
  12. for method in [
  13. self.client.get,
  14. self.client.post,
  15. self.client.put,
  16. self.client.delete,
  17. self.client.patch
  18. ]:
  19. response = method(url)
  20. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  21. class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase):
  22. def test_subname_validity(self):
  23. for subname in [
  24. 'aEroport',
  25. 'AEROPORT',
  26. 'aéroport'
  27. ]:
  28. with self.assertRaises(ValidationError):
  29. RRset(domain=self.my_domain, subname=subname, ttl=60, type='A').save()
  30. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  31. def test_retrieve_my_rr_sets(self):
  32. for response in [
  33. self.client.get_rr_sets(self.my_domain.name),
  34. self.client.get_rr_sets(self.my_domain.name, subname=''),
  35. ]:
  36. self.assertStatus(response, status.HTTP_200_OK)
  37. self.assertEqual(len(response.data), 1, response.data)
  38. def test_retrieve_my_rr_sets_pagination(self):
  39. def convert_links(links):
  40. mapping = {}
  41. for link in links.split(', '):
  42. _url, label = link.split('; ')
  43. label = re.search('rel="(.*)"', label).group(1)
  44. _url = _url[1:-1]
  45. assert label not in mapping
  46. mapping[label] = _url
  47. return mapping
  48. def assertPaginationResponse(response, expected_length, expected_directional_links=[]):
  49. self.assertStatus(response, status.HTTP_200_OK)
  50. self.assertEqual(len(response.data), expected_length)
  51. _links = convert_links(response['Link'])
  52. self.assertEqual(len(_links), len(expected_directional_links) + 1) # directional links, plus "first"
  53. self.assertTrue(_links['first'].endswith('/?cursor='))
  54. for directional_link in expected_directional_links:
  55. self.assertEqual(_links['first'].find('/?cursor='), _links[directional_link].find('/?cursor='))
  56. self.assertTrue(len(_links[directional_link]) > len(_links['first']))
  57. # Prepare extra records so that we get three pages (total: n + 1)
  58. n = int(settings.REST_FRAMEWORK['PAGE_SIZE'] * 2.5)
  59. RRset.objects.bulk_create(
  60. [RRset(domain=self.my_domain, subname=str(i), ttl=123, type='A') for i in range(n)]
  61. )
  62. # No pagination
  63. response = self.client.get_rr_sets(self.my_domain.name)
  64. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  65. self.assertEqual(response.data['detail'],
  66. f'Pagination required. You can query up to {settings.REST_FRAMEWORK["PAGE_SIZE"]} items at a time ({n+1} total). '
  67. 'Please use the `first` page link (see Link header).')
  68. links = convert_links(response['Link'])
  69. self.assertEqual(len(links), 1)
  70. self.assertTrue(links['first'].endswith('/?cursor='))
  71. # First page
  72. url = links['first']
  73. response = self.client.get(url)
  74. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next'])
  75. # Next
  76. url = convert_links(response['Link'])['next']
  77. response = self.client.get(url)
  78. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  79. data_next = response.data.copy()
  80. # Next-next (last) page
  81. url = convert_links(response['Link'])['next']
  82. response = self.client.get(url)
  83. assertPaginationResponse(response, n/5 + 1, ['prev'])
  84. # Prev
  85. url = convert_links(response['Link'])['prev']
  86. response = self.client.get(url)
  87. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  88. # Make sure that one step forward equals two steps forward and one step back
  89. self.assertEqual(response.data, data_next)
  90. def test_retrieve_other_rr_sets(self):
  91. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  92. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  93. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  94. def test_retrieve_my_rr_sets_filter(self):
  95. response = self.client.get_rr_sets(self.my_rr_set_domain.name, query='?cursor=')
  96. self.assertStatus(response, status.HTTP_200_OK)
  97. expected_number_of_rrsets = min(len(self._test_rr_sets()), settings.REST_FRAMEWORK['PAGE_SIZE'])
  98. self.assertEqual(len(response.data), expected_number_of_rrsets)
  99. for subname in self.SUBNAMES:
  100. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  101. self.assertStatus(response, status.HTTP_200_OK)
  102. self.assertRRSetsCount(response.data, [dict(subname=subname)],
  103. count=len(self._test_rr_sets(subname=subname)))
  104. for type_ in self.ALLOWED_TYPES:
  105. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  106. self.assertStatus(response, status.HTTP_200_OK)
  107. def test_create_my_rr_sets(self):
  108. for subname in [None, 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  109. for data in [
  110. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'},
  111. {'subname': '' if subname is None else subname, 'records': ['desec.io.'], 'ttl': 36900, 'type': 'PTR'},
  112. {'subname': '' if subname is None else subname, 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']},
  113. ]:
  114. # Try POST with missing subname
  115. if data['subname'] is None:
  116. data.pop('subname')
  117. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  118. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  119. self.assertTrue(all(field in response.data for field in
  120. ['created', 'domain', 'subname', 'name', 'records', 'ttl', 'type', 'touched']))
  121. self.assertEqual(self.my_empty_domain.touched,
  122. max(rrset.touched for rrset in self.my_empty_domain.rrset_set.all()))
  123. self.assertStatus(response, status.HTTP_201_CREATED)
  124. # Check for uniqueness on second attempt
  125. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  126. self.assertContains(response, 'Another RRset with the same subdomain and type exists for this domain.',
  127. status_code=status.HTTP_400_BAD_REQUEST)
  128. response = self.client.get_rr_sets(self.my_empty_domain.name)
  129. self.assertStatus(response, status.HTTP_200_OK)
  130. self.assertRRSetsCount(response.data, [data])
  131. response = self.client.get_rr_set(self.my_empty_domain.name, data.get('subname', ''), data['type'])
  132. self.assertStatus(response, status.HTTP_200_OK)
  133. self.assertRRSet(response.data, **data)
  134. def test_create_my_rr_sets_type_restriction(self):
  135. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  136. for data in [
  137. {'subname': subname, 'ttl': 60, 'type': 'a'},
  138. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  139. ] + [
  140. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  141. for type_ in self.DEAD_TYPES
  142. ] + [
  143. {'subname': subname, 'records': ['set.an.example. get.desec.io. 2584 10800 3600 604800 60'],
  144. 'ttl': 60, 'type': type_}
  145. for type_ in self.RESTRICTED_TYPES
  146. ]:
  147. response = self.client.post_rr_set(self.my_domain.name, **data)
  148. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  149. response = self.client.get_rr_sets(self.my_domain.name)
  150. self.assertStatus(response, status.HTTP_200_OK)
  151. self.assertRRSetsCount(response.data, [data], count=0)
  152. def test_create_my_rr_sets_without_records(self):
  153. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  154. for data in [
  155. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  156. {'subname': subname, 'ttl': 60, 'type': 'A'},
  157. ]:
  158. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  159. self.assertStatus(
  160. response,
  161. status.HTTP_400_BAD_REQUEST
  162. )
  163. response = self.client.get_rr_sets(self.my_empty_domain.name)
  164. self.assertStatus(response, status.HTTP_200_OK)
  165. self.assertRRSetsCount(response.data, [], count=0)
  166. def test_create_other_rr_sets(self):
  167. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  168. response = self.client.post_rr_set(self.other_domain.name, **data)
  169. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  170. def test_create_my_rr_sets_too_long_content(self):
  171. def _create_data(length):
  172. content_string = 'A' * (length - 2) # we have two quotes
  173. return {'records': [f'"{content_string}"'], 'ttl': 3600, 'type': 'TXT', 'subname': f'name{length}'}
  174. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  175. response = self.client.post_rr_set(self.my_empty_domain.name, **_create_data(500))
  176. self.assertStatus(response, status.HTTP_201_CREATED)
  177. response = self.client.post_rr_set(self.my_empty_domain.name, **_create_data(501))
  178. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  179. self.assertIn('Ensure this field has no more than 500 characters.', str(response.data))
  180. def test_create_my_rr_sets_too_large_rrset(self):
  181. network = IPv4Network('127.0.0.0/20') # size: 4096 IP addresses
  182. data = {'records': [str(ip) for ip in network], 'ttl': 3600, 'type': 'A', 'subname': 'name'}
  183. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  184. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  185. excess_length = 28743 + len(self.my_empty_domain.name)
  186. self.assertIn(f'Total length of RRset exceeds limit by {excess_length} bytes.', str(response.data))
  187. def test_create_my_rr_sets_twice(self):
  188. data = {'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'}
  189. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  190. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  191. self.assertStatus(response, status.HTTP_201_CREATED)
  192. data['records'][0] = '3.2.2.1'
  193. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  194. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  195. def test_create_my_rr_sets_upper_case(self):
  196. for subname in ['asdF', 'cAse', 'asdf.FOO', '--F', 'ALLCAPS']:
  197. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': subname}
  198. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  199. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  200. self.assertIn('Subname can only use (lowercase)', str(response.data))
  201. def test_create_my_rr_sets_empty_payload(self):
  202. response = self.client.post_rr_set(self.my_empty_domain.name)
  203. self.assertContains(response, 'No data provided', status_code=status.HTTP_400_BAD_REQUEST)
  204. def test_create_my_rr_sets_unknown_type(self):
  205. for _type in ['AA', 'ASDF']:
  206. with self.assertPdnsRequests(
  207. self.request_pdns_zone_update_unknown_type(name=self.my_domain.name, unknown_types=_type)
  208. ):
  209. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=3660, type=_type)
  210. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  211. def test_create_my_rr_sets_insufficient_ttl(self):
  212. ttl = settings.MINIMUM_TTL_DEFAULT - 1
  213. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.3.4'], ttl=ttl, type='A')
  214. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  215. detail = f'Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}.'
  216. self.assertEqual(response.data['ttl'][0], detail)
  217. ttl += 1
  218. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  219. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.23.4'], ttl=ttl, type='A')
  220. self.assertStatus(response, status.HTTP_201_CREATED)
  221. def test_retrieve_my_rr_sets_apex(self):
  222. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  223. self.assertStatus(response, status.HTTP_200_OK)
  224. self.assertEqual(response.data['records'][0], '1.2.3.4')
  225. self.assertEqual(response.data['ttl'], 3620)
  226. def test_retrieve_my_rr_sets_restricted_types(self):
  227. for type_ in self.RESTRICTED_TYPES:
  228. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  229. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  230. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  231. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  232. def test_update_my_rr_sets(self):
  233. for subname in self.SUBNAMES:
  234. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  235. data = {'records': ['2.2.3.4'], 'ttl': 3630, 'type': 'A', 'subname': subname}
  236. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  237. self.assertStatus(response, status.HTTP_200_OK)
  238. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  239. self.assertStatus(response, status.HTTP_200_OK)
  240. self.assertEqual(response.data['records'], ['2.2.3.4'])
  241. self.assertEqual(response.data['ttl'], 3630)
  242. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'records': ['2.2.3.5']})
  243. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  244. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'ttl': 3637})
  245. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  246. def test_update_my_rr_set_with_invalid_payload_type(self):
  247. for subname in self.SUBNAMES:
  248. data = [{'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A', 'subname': subname}]
  249. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  250. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  251. self.assertEquals(response.data['non_field_errors'][0],
  252. 'Invalid data. Expected a dictionary, but got list.')
  253. data = 'foobar'
  254. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  255. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  256. self.assertEquals(response.data['non_field_errors'][0],
  257. 'Invalid data. Expected a dictionary, but got str.')
  258. def test_partially_update_my_rr_sets(self):
  259. for subname in self.SUBNAMES:
  260. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  261. for data in [
  262. {'records': ['2.2.3.4'], 'ttl': 3630},
  263. {'records': ['3.2.3.4']},
  264. {'records': ['3.2.3.4', '9.8.8.7']},
  265. {'ttl': 3637},
  266. ]:
  267. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  268. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  269. self.assertStatus(response, status.HTTP_200_OK)
  270. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  271. self.assertStatus(response, status.HTTP_200_OK)
  272. current_rr_set.update(data)
  273. self.assertEqual(response.data['records'], current_rr_set['records'])
  274. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  275. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  276. self.assertStatus(response, status.HTTP_200_OK)
  277. def test_rr_sets_touched_if_noop(self):
  278. for subname in self.SUBNAMES:
  279. touched_old = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  280. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  281. self.assertStatus(response, status.HTTP_200_OK)
  282. touched_new = RRset.objects.get(domain=self.my_rr_set_domain, type='A', subname=subname).touched
  283. self.assertGreater(touched_new, touched_old)
  284. self.assertEqual(Domain.objects.get(name=self.my_rr_set_domain.name).touched, touched_new)
  285. def test_partially_update_other_rr_sets(self):
  286. data = {'records': ['3.2.3.4'], 'ttl': 334}
  287. for subname in self.SUBNAMES:
  288. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  289. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  290. def test_update_other_rr_sets(self):
  291. data = {'ttl': 305}
  292. for subname in self.SUBNAMES:
  293. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  294. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  295. def test_update_essential_properties(self):
  296. # Changing the subname is expected to cause an error
  297. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  298. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test2', 'type': 'A'}
  299. response = self.client.patch(url, data)
  300. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  301. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  302. response = self.client.put(url, data)
  303. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  304. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  305. # Changing the type is expected to cause an error
  306. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test', 'type': 'TXT'}
  307. response = self.client.patch(url, data)
  308. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  309. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  310. response = self.client.put(url, data)
  311. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  312. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  313. # Changing "created" is no-op
  314. response = self.client.get(url)
  315. data = response.data
  316. created = data['created']
  317. data['created'] = '2019-07-19T17:22:49.575717Z'
  318. response = self.client.patch(url, data)
  319. self.assertStatus(response, status.HTTP_200_OK)
  320. response = self.client.put(url, data)
  321. self.assertStatus(response, status.HTTP_200_OK)
  322. # Check that nothing changed
  323. response = self.client.get(url)
  324. self.assertStatus(response, status.HTTP_200_OK)
  325. self.assertEqual(response.data['records'][0], '2.2.3.4')
  326. self.assertEqual(response.data['ttl'], 3620)
  327. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  328. self.assertEqual(response.data['subname'], 'test')
  329. self.assertEqual(response.data['type'], 'A')
  330. self.assertEqual(response.data['created'], created)
  331. # This is expected to work, but the fields are ignored
  332. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  333. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  334. response = self.client.patch(url, data)
  335. self.assertStatus(response, status.HTTP_200_OK)
  336. response = self.client.get(url)
  337. self.assertStatus(response, status.HTTP_200_OK)
  338. self.assertEqual(response.data['records'][0], '3.2.3.4')
  339. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  340. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  341. def test_update_unknown_rrset(self):
  342. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='doesnotexist', type='A')
  343. data = {'records': ['3.2.3.4'], 'ttl': 3620}
  344. response = self.client.patch(url, data)
  345. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  346. response = self.client.put(url, data)
  347. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  348. def test_delete_my_rr_sets_with_patch(self):
  349. data = {'records': []}
  350. for subname in self.SUBNAMES:
  351. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  352. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  353. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  354. # Deletion is only idempotent via DELETE. For PATCH/PUT, the view raises 404 if the instance does not
  355. # exist. By that time, the view has not parsed the payload yet and does not know it is a deletion.
  356. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  357. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  358. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  359. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  360. def test_delete_my_rr_sets_with_delete(self):
  361. for subname in self.SUBNAMES:
  362. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  363. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  364. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  365. domain = Domain.objects.get(name=self.my_rr_set_domain.name)
  366. self.assertEqual(domain.touched, domain.published)
  367. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  368. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  369. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  370. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  371. def test_delete_other_rr_sets(self):
  372. data = {'records': []}
  373. for subname in self.SUBNAMES:
  374. # Try PATCH empty
  375. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  376. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  377. # Try DELETE
  378. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname, 'A')
  379. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  380. # Make sure it actually is still there
  381. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  382. def test_import_rr_sets(self):
  383. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  384. call_command('sync-from-pdns', self.my_domain.name)
  385. for response in [
  386. self.client.get_rr_sets(self.my_domain.name),
  387. self.client.get_rr_sets(self.my_domain.name, subname=''),
  388. ]:
  389. self.assertStatus(response, status.HTTP_200_OK)
  390. self.assertEqual(len(response.data), 1, response.data)
  391. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])