test_rrsets.py 21 KB


  1. import re
  2. from django.conf import settings
  3. from django.core.exceptions import ValidationError
  4. from django.core.management import call_command
  5. from rest_framework import status
  6. from desecapi.models import RRset
  7. from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase
  8. class UnauthenticatedRRSetTestCase(DesecTestCase):
  9. def test_unauthorized_access(self):
  10. url = self.reverse('v1:rrsets', name='example.com')
  11. for method in [
  12. self.client.get,
  13. self.client.post,
  14. self.client.put,
  15. self.client.delete,
  16. self.client.patch
  17. ]:
  18. response = method(url)
  19. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  20. class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase):
  21. def test_subname_validity(self):
  22. for subname in [
  23. 'aEroport',
  24. 'AEROPORT',
  25. 'aéroport'
  26. ]:
  27. with self.assertRaises(ValidationError):
  28. RRset(domain=self.my_domain, subname=subname, ttl=60, type='A').save()
  29. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  30. def test_retrieve_my_rr_sets(self):
  31. for response in [
  32. self.client.get_rr_sets(self.my_domain.name),
  33. self.client.get_rr_sets(self.my_domain.name, subname=''),
  34. ]:
  35. self.assertStatus(response, status.HTTP_200_OK)
  36. self.assertEqual(len(response.data), 1, response.data)
  37. def test_retrieve_my_rr_sets_pagination(self):
  38. def convert_links(links):
  39. mapping = {}
  40. for link in links.split(', '):
  41. _url, label = link.split('; ')
  42. label = re.search('rel="(.*)"', label).group(1)
  43. _url = _url[1:-1]
  44. assert label not in mapping
  45. mapping[label] = _url
  46. return mapping
  47. def assertPaginationResponse(response, expected_length, expected_directional_links=[]):
  48. self.assertStatus(response, status.HTTP_200_OK)
  49. self.assertEqual(len(response.data), expected_length)
  50. _links = convert_links(response['Link'])
  51. self.assertEqual(len(_links), len(expected_directional_links) + 1) # directional links, plus "first"
  52. self.assertTrue(_links['first'].endswith('/?cursor='))
  53. for directional_link in expected_directional_links:
  54. self.assertEqual(_links['first'].find('/?cursor='), _links[directional_link].find('/?cursor='))
  55. self.assertTrue(len(_links[directional_link]) > len(_links['first']))
  56. # Prepare extra records so that we get three pages (total: n + 1)
  57. n = int(settings.REST_FRAMEWORK['PAGE_SIZE'] * 2.5)
  58. RRset.objects.bulk_create(
  59. [RRset(domain=self.my_domain, subname=str(i), ttl=123, type='A') for i in range(n)]
  60. )
  61. # No pagination
  62. response = self.client.get_rr_sets(self.my_domain.name)
  63. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  64. self.assertEqual(response.data['detail'],
  65. f'Pagination required. You can query up to {settings.REST_FRAMEWORK["PAGE_SIZE"]} items at a time ({n+1} total). '
  66. 'Please use the `first` page link (see Link header).')
  67. links = convert_links(response['Link'])
  68. self.assertEqual(len(links), 1)
  69. self.assertTrue(links['first'].endswith('/?cursor='))
  70. # First page
  71. url = links['first']
  72. response = self.client.get(url)
  73. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next'])
  74. # Next
  75. url = convert_links(response['Link'])['next']
  76. response = self.client.get(url)
  77. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  78. data_next = response.data.copy()
  79. # Next-next (last) page
  80. url = convert_links(response['Link'])['next']
  81. response = self.client.get(url)
  82. assertPaginationResponse(response, n/5 + 1, ['prev'])
  83. # Prev
  84. url = convert_links(response['Link'])['prev']
  85. response = self.client.get(url)
  86. assertPaginationResponse(response, settings.REST_FRAMEWORK['PAGE_SIZE'], ['next', 'prev'])
  87. # Make sure that one step forward equals two steps forward and one step back
  88. self.assertEqual(response.data, data_next)
  89. def test_retrieve_other_rr_sets(self):
  90. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  91. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  92. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  93. def test_retrieve_my_rr_sets_filter(self):
  94. response = self.client.get_rr_sets(self.my_rr_set_domain.name, query='?cursor=')
  95. self.assertStatus(response, status.HTTP_200_OK)
  96. expected_number_of_rrsets = min(len(self._test_rr_sets()), settings.REST_FRAMEWORK['PAGE_SIZE'])
  97. self.assertEqual(len(response.data), expected_number_of_rrsets)
  98. for subname in self.SUBNAMES:
  99. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  100. self.assertStatus(response, status.HTTP_200_OK)
  101. self.assertRRSetsCount(response.data, [dict(subname=subname)],
  102. count=len(self._test_rr_sets(subname=subname)))
  103. for type_ in self.ALLOWED_TYPES:
  104. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  105. self.assertStatus(response, status.HTTP_200_OK)
  106. def test_create_my_rr_sets(self):
  107. for subname in [None, 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  108. for data in [
  109. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'},
  110. {'subname': '' if subname is None else subname, 'records': ['desec.io.'], 'ttl': 36900, 'type': 'PTR'},
  111. {'subname': '' if subname is None else subname, 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']},
  112. ]:
  113. # Try POST with missing subname
  114. if data['subname'] is None:
  115. data.pop('subname')
  116. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  117. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  118. self.assertStatus(response, status.HTTP_201_CREATED)
  119. # Check for uniqueness on second attempt
  120. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  121. self.assertContains(response, 'Another RRset with the same subdomain and type exists for this domain.',
  122. status_code=status.HTTP_400_BAD_REQUEST)
  123. response = self.client.get_rr_sets(self.my_empty_domain.name)
  124. self.assertStatus(response, status.HTTP_200_OK)
  125. self.assertRRSetsCount(response.data, [data])
  126. response = self.client.get_rr_set(self.my_empty_domain.name, data.get('subname', ''), data['type'])
  127. self.assertStatus(response, status.HTTP_200_OK)
  128. self.assertRRSet(response.data, **data)
  129. def test_create_my_rr_sets_type_restriction(self):
  130. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  131. for data in [
  132. {'subname': subname, 'ttl': 60, 'type': 'a'},
  133. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  134. ] + [
  135. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  136. for type_ in self.DEAD_TYPES
  137. ] + [
  138. {'subname': subname, 'records': ['set.an.example. get.desec.io. 2584 10800 3600 604800 60'],
  139. 'ttl': 60, 'type': type_}
  140. for type_ in self.RESTRICTED_TYPES
  141. ]:
  142. response = self.client.post_rr_set(self.my_domain.name, **data)
  143. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  144. response = self.client.get_rr_sets(self.my_domain.name)
  145. self.assertStatus(response, status.HTTP_200_OK)
  146. self.assertRRSetsCount(response.data, [data], count=0)
  147. def test_create_my_rr_sets_without_records(self):
  148. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  149. for data in [
  150. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  151. {'subname': subname, 'ttl': 60, 'type': 'A'},
  152. ]:
  153. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  154. self.assertStatus(
  155. response,
  156. status.HTTP_400_BAD_REQUEST
  157. )
  158. response = self.client.get_rr_sets(self.my_empty_domain.name)
  159. self.assertStatus(response, status.HTTP_200_OK)
  160. self.assertRRSetsCount(response.data, [], count=0)
  161. def test_create_other_rr_sets(self):
  162. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  163. response = self.client.post_rr_set(self.other_domain.name, **data)
  164. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  165. def test_create_my_rr_sets_twice(self):
  166. data = {'records': ['1.2.3.4'], 'ttl': 3660, 'type': 'A'}
  167. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  168. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  169. self.assertStatus(response, status.HTTP_201_CREATED)
  170. data['records'][0] = '3.2.2.1'
  171. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  172. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  173. def test_create_my_rr_sets_upper_case(self):
  174. for subname in ['asdF', 'cAse', 'asdf.FOO', '--F', 'ALLCAPS']:
  175. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': subname}
  176. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  177. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  178. self.assertIn('Subname can only use (lowercase)', str(response.data))
  179. def test_create_my_rr_sets_unknown_type(self):
  180. for _type in ['AA', 'ASDF']:
  181. with self.assertPdnsRequests(
  182. self.request_pdns_zone_update_unknown_type(name=self.my_domain.name, unknown_types=_type)
  183. ):
  184. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=3660, type=_type)
  185. self.assertStatus(response, status.HTTP_422_UNPROCESSABLE_ENTITY)
  186. def test_create_my_rr_sets_insufficient_ttl(self):
  187. ttl = settings.MINIMUM_TTL_DEFAULT - 1
  188. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.3.4'], ttl=ttl, type='A')
  189. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  190. detail = f'Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}.'
  191. self.assertEqual(response.data['ttl'][0], detail)
  192. ttl += 1
  193. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  194. response = self.client.post_rr_set(self.my_empty_domain.name, records=['1.2.23.4'], ttl=ttl, type='A')
  195. self.assertStatus(response, status.HTTP_201_CREATED)
  196. def test_retrieve_my_rr_sets_apex(self):
  197. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  198. self.assertStatus(response, status.HTTP_200_OK)
  199. self.assertEqual(response.data['records'][0], '1.2.3.4')
  200. self.assertEqual(response.data['ttl'], 3620)
  201. def test_retrieve_my_rr_sets_restricted_types(self):
  202. for type_ in self.RESTRICTED_TYPES:
  203. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  204. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  205. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  206. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  207. def test_update_my_rr_sets(self):
  208. for subname in self.SUBNAMES:
  209. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  210. data = {'records': ['2.2.3.4'], 'ttl': 3630, 'type': 'A', 'subname': subname}
  211. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  212. self.assertStatus(response, status.HTTP_200_OK)
  213. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  214. self.assertStatus(response, status.HTTP_200_OK)
  215. self.assertEqual(response.data['records'], ['2.2.3.4'])
  216. self.assertEqual(response.data['ttl'], 3630)
  217. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'records': ['2.2.3.5']})
  218. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  219. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', {'ttl': 3637})
  220. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  221. def test_update_my_rr_set_with_invalid_payload_type(self):
  222. for subname in self.SUBNAMES:
  223. data = [{'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A', 'subname': subname}]
  224. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  225. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  226. self.assertEquals(response.data['non_field_errors'][0],
  227. 'Invalid data. Expected a dictionary, but got list.')
  228. data = 'foobar'
  229. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  230. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  231. self.assertEquals(response.data['non_field_errors'][0],
  232. 'Invalid data. Expected a dictionary, but got str.')
  233. def test_partially_update_my_rr_sets(self):
  234. for subname in self.SUBNAMES:
  235. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  236. for data in [
  237. {'records': ['2.2.3.4'], 'ttl': 3630},
  238. {'records': ['3.2.3.4']},
  239. {'records': ['3.2.3.4', '9.8.8.7']},
  240. {'ttl': 3637},
  241. ]:
  242. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  243. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  244. self.assertStatus(response, status.HTTP_200_OK)
  245. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  246. self.assertStatus(response, status.HTTP_200_OK)
  247. current_rr_set.update(data)
  248. self.assertEqual(response.data['records'], current_rr_set['records'])
  249. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  250. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', {})
  251. self.assertStatus(response, status.HTTP_200_OK)
  252. def test_partially_update_other_rr_sets(self):
  253. data = {'records': ['3.2.3.4'], 'ttl': 334}
  254. for subname in self.SUBNAMES:
  255. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  256. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  257. def test_update_other_rr_sets(self):
  258. data = {'ttl': 305}
  259. for subname in self.SUBNAMES:
  260. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  261. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  262. def test_update_essential_properties(self):
  263. # Changing the subname is expected to cause an error
  264. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  265. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test2', 'type': 'A'}
  266. response = self.client.patch(url, data)
  267. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  268. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  269. response = self.client.put(url, data)
  270. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  271. self.assertEquals(response.data['subname'][0].code, 'read-only-on-update')
  272. # Changing the type is expected to cause an error
  273. data = {'records': ['3.2.3.4'], 'ttl': 3620, 'subname': 'test', 'type': 'TXT'}
  274. response = self.client.patch(url, data)
  275. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  276. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  277. response = self.client.put(url, data)
  278. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  279. self.assertEquals(response.data['type'][0].code, 'read-only-on-update')
  280. # Check that nothing changed
  281. response = self.client.get(url)
  282. self.assertStatus(response, status.HTTP_200_OK)
  283. self.assertEqual(response.data['records'][0], '2.2.3.4')
  284. self.assertEqual(response.data['ttl'], 3620)
  285. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  286. self.assertEqual(response.data['subname'], 'test')
  287. self.assertEqual(response.data['type'], 'A')
  288. # This is expected to work, but the fields are ignored
  289. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  290. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  291. response = self.client.patch(url, data)
  292. self.assertStatus(response, status.HTTP_200_OK)
  293. response = self.client.get(url)
  294. self.assertStatus(response, status.HTTP_200_OK)
  295. self.assertEqual(response.data['records'][0], '3.2.3.4')
  296. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  297. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  298. def test_update_unknown_rrset(self):
  299. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='doesnotexist', type='A')
  300. data = {'records': ['3.2.3.4'], 'ttl': 3620}
  301. response = self.client.patch(url, data)
  302. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  303. response = self.client.put(url, data)
  304. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  305. def test_delete_my_rr_sets_with_patch(self):
  306. data = {'records': []}
  307. for subname in self.SUBNAMES:
  308. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  309. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  310. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  311. # Deletion is only idempotent via DELETE. For PATCH/PUT, the view raises 404 if the instance does not
  312. # exist. By that time, the view has not parsed the payload yet and does not know it is a deletion.
  313. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', data)
  314. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  315. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  316. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  317. def test_delete_my_rr_sets_with_delete(self):
  318. for subname in self.SUBNAMES:
  319. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  320. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  321. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  322. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  323. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  324. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  325. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  326. def test_delete_other_rr_sets(self):
  327. data = {'records': []}
  328. for subname in self.SUBNAMES:
  329. # Try PATCH empty
  330. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname, 'A', data)
  331. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  332. # Try DELETE
  333. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname, 'A')
  334. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  335. # Make sure it actually is still there
  336. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  337. def test_import_rr_sets(self):
  338. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  339. call_command('sync-from-pdns', self.my_domain.name)
  340. for response in [
  341. self.client.get_rr_sets(self.my_domain.name),
  342. self.client.get_rr_sets(self.my_domain.name, subname=''),
  343. ]:
  344. self.assertStatus(response, status.HTTP_200_OK)
  345. self.assertEqual(len(response.data), 1, response.data)
  346. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])