testrrsets.py 19 KB


  1. import operator
  2. from functools import reduce
  3. from django.conf import settings
  4. from django.core.management import call_command
  5. from rest_framework import status
  6. from desecapi.tests.base import DesecTestCase, DomainOwnerTestCase
  7. class UnauthenticatedRRSetTestCase(DesecTestCase):
  8. def test_unauthorized_access(self):
  9. url = self.reverse('v1:rrsets', name='example.com')
  10. for method in [
  11. self.client.get,
  12. self.client.post,
  13. self.client.put,
  14. self.client.delete,
  15. self.client.patch
  16. ]:
  17. response = method(url)
  18. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  19. class AuthenticatedRRSetTestCase(DomainOwnerTestCase):
  20. DEAD_TYPES = ['ALIAS', 'DNAME']
  21. RESTRICTED_TYPES = ['SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM', 'OPT']
  22. # see https://doc.powerdns.com/md/types/
  23. PDNS_RR_TYPES = ['A', 'AAAA', 'AFSDB', 'ALIAS', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DNSKEY', 'DNAME', 'DS',
  24. 'HINFO', 'KEY', 'LOC', 'MX', 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'NSEC3PARAM', 'OPENPGPKEY', 'PTR',
  25. 'RP', 'RRSIG', 'SOA', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG', 'TLSA', 'SMIMEA', 'TXT', 'URI']
  26. ALLOWED_TYPES = ['A', 'AAAA', 'AFSDB', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DS', 'HINFO', 'KEY', 'LOC', 'MX',
  27. 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'OPENPGPKEY', 'PTR', 'RP', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG',
  28. 'TLSA', 'SMIMEA', 'TXT', 'URI']
  29. SUBNAMES = ['foo', 'bar.baz', 'q.w.e.r.t', '*', '*.foobar']
  30. @classmethod
  31. def _test_rr_sets(cls, subname=None, type_=None, records=None, ttl=None):
  32. """
  33. Gives a list of example RR sets for testing.
  34. Args:
  35. subname: Filter by subname. None to allow any.
  36. type_: Filter by type. None to allow any.
  37. records: Filter by records. Must match exactly. None to allow any.
  38. ttl: Filter by ttl. None to allow any.
  39. Returns: Returns a list of tuples that represents example RR sets represented as 4-tuples consisting of
  40. subname, type_, records, ttl
  41. """
  42. # TODO add more examples of cls.ALLOWED_TYPES
  43. rr_sets = [
  44. ('', 'A', ['1.2.3.4'], 120),
  45. ('test', 'A', ['2.2.3.4'], 120),
  46. ('test', 'TXT', ['"foobar"'], 120),
  47. ] + [
  48. (subname, 'TXT', ['"hey ho, let\'s go!"'], 134)
  49. for subname in cls.SUBNAMES
  50. ] + [
  51. (subname, type_, ['"10 mx1.example.com."'], 101)
  52. for subname in cls.SUBNAMES
  53. for type_ in ['MX', 'SPF']
  54. ] + [
  55. (subname, 'A', ['"1.2.3.4"'], 187)
  56. for subname in cls.SUBNAMES
  57. ]
  58. if subname or type_ or records or ttl:
  59. rr_sets = [
  60. rr_set for rr_set in rr_sets
  61. if (
  62. (subname is None or subname == rr_set[0]) and
  63. (type_ is None or type_ == rr_set[1]) and
  64. (records is None or records == rr_set[2]) and
  65. (ttl is None or ttl == rr_set[3])
  66. )
  67. ]
  68. return rr_sets
  69. @classmethod
  70. def setUpTestDataWithPdns(cls):
  71. super().setUpTestDataWithPdns()
  72. # TODO this test does not cover "dyn" / auto delegation domains
  73. cls.my_empty_domain = cls.create_domain(suffix='', owner=cls.owner)
  74. cls.my_rr_set_domain = cls.create_domain(suffix='', owner=cls.owner)
  75. cls.other_rr_set_domain = cls.create_domain(suffix='')
  76. for domain in [cls.my_rr_set_domain, cls.other_rr_set_domain]:
  77. for (subname, type_, records, ttl) in cls._test_rr_sets():
  78. cls.create_rr_set(domain, subname=subname, type=type_, records=records, ttl=ttl)
  79. def assertRRSet(self, response_rr, domain=None, subname=None, records=None, type_=None, **kwargs):
  80. kwargs['domain'] = domain
  81. kwargs['subname'] = subname
  82. kwargs['records'] = records
  83. kwargs['type'] = type_
  84. for key, value in kwargs.items():
  85. if value is not None:
  86. self.assertEqual(
  87. response_rr[key], value,
  88. 'RR set did not have the expected %s: Expected "%s" but was "%s" in %s' % (
  89. key, value, response_rr[key], response_rr
  90. )
  91. )
  92. @staticmethod
  93. def _filter_rr_sets(rr_sets, **kwargs):
  94. return [
  95. rr_sets for rr_set in rr_sets
  96. if reduce(operator.and_, [rr_set.get(key, None) == value for key, value in kwargs.items()])
  97. ]
  98. def assertRRSetCount(self, rr_sets, count, **kwargs):
  99. filtered_rr_sets = self._filter_rr_sets(rr_sets, **kwargs)
  100. if len(filtered_rr_sets) != count:
  101. self.fail('Expected to find %i RR set(s) with %s, but only found %i in %s.' % (
  102. count, kwargs, len(filtered_rr_sets), rr_sets
  103. ))
  104. def assertContainsRRSet(self, rr_sets, **kwargs):
  105. filtered_rr_sets = self._filter_rr_sets(rr_sets, **kwargs)
  106. if not filtered_rr_sets:
  107. self.fail('Expected to find RR set with %s, but only found %s.' % (
  108. kwargs, rr_sets
  109. ))
  110. def test_retrieve_my_rr_sets(self):
  111. for response in [
  112. self.client.get_rr_sets(self.my_domain.name),
  113. self.client.get_rr_sets(self.my_domain.name, subname=''),
  114. ]:
  115. self.assertEqual(response.status_code, status.HTTP_200_OK)
  116. self.assertEqual(len(response.data), 2, response.data)
  117. self.assertContainsRRSet(response.data, subname='', records=settings.DEFAULT_NS, type='NS')
  118. def test_retrieve_other_rr_sets(self):
  119. self.assertEqual(self.client.get_rr_sets(self.other_domain.name).status_code, status.HTTP_404_NOT_FOUND)
  120. self.assertEqual(
  121. self.client.get_rr_sets(self.other_domain.name, subname='test').status_code, status.HTTP_404_NOT_FOUND)
  122. self.assertEqual(
  123. self.client.get_rr_sets(self.other_domain.name, type='A').status_code, status.HTTP_404_NOT_FOUND)
  124. def test_retrieve_my_rr_sets_filter(self):
  125. response = self.client.get_rr_sets(self.my_rr_set_domain.name)
  126. self.assertEqual(response.status_code, status.HTTP_200_OK)
  127. self.assertEqual(len(response.data), len(self._test_rr_sets()) + 1) # Don't forget about the NS type RR set
  128. for subname in self.SUBNAMES:
  129. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  130. self.assertEqual(response.status_code, status.HTTP_200_OK)
  131. self.assertRRSetCount(response.data, count=len(self._test_rr_sets(subname=subname)), subname=subname)
  132. for type_ in self.ALLOWED_TYPES:
  133. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  134. self.assertEqual(response.status_code, status.HTTP_200_OK)
  135. if type_ != 'NS': # count does not match for NS, that's okay
  136. self.assertRRSetCount(response.data, count=len(self._test_rr_sets(type_=type_)), type=type_)
  137. def test_create_my_rr_sets(self):
  138. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  139. for data in [
  140. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'},
  141. {'subname': subname, 'records': ['desec.io.'], 'ttl': 900, 'type': 'PTR'},
  142. ]:
  143. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  144. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  145. self.assertEqual(response.status_code, status.HTTP_201_CREATED, response.data)
  146. response = self.client.get_rr_sets(self.my_empty_domain.name)
  147. self.assertEqual(response.status_code, status.HTTP_200_OK)
  148. self.assertRRSetCount(response.data, count=1, **data)
  149. response = self.client.get_rr_set(self.my_empty_domain.name, data['subname'], data['type'])
  150. self.assertEqual(response.status_code, status.HTTP_200_OK)
  151. self.assertRRSet(response.data, **data)
  152. def test_create_my_rr_sets_type_restriction(self):
  153. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  154. for data in [
  155. {'subname': subname, 'ttl': 60, 'type': 'a'},
  156. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  157. ] + [
  158. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  159. for type_ in self.DEAD_TYPES
  160. ] + [
  161. {'subname': subname, 'records': ['ns1.desec.io. peter.desec.io. 2584 10800 3600 604800 60'],
  162. 'ttl': 60, 'type': type_}
  163. for type_ in self.RESTRICTED_TYPES
  164. ]:
  165. response = self.client.post_rr_set(self.my_domain.name, **data)
  166. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, response.data)
  167. response = self.client.get_rr_sets(self.my_domain.name)
  168. self.assertEqual(response.status_code, status.HTTP_200_OK)
  169. self.assertRRSetCount(response.data, count=0, **data)
  170. def test_create_my_rr_sets_without_records(self):
  171. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  172. for data in [
  173. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  174. {'subname': subname, 'ttl': 60, 'type': 'A'},
  175. ]:
  176. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  177. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, response.data)
  178. response = self.client.get_rr_sets(self.my_empty_domain.name)
  179. self.assertEqual(response.status_code, status.HTTP_200_OK)
  180. self.assertRRSetCount(response.data, count=0, **data)
  181. def test_create_other_rr_sets(self):
  182. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  183. response = self.client.post_rr_set(self.other_domain.name, **data)
  184. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  185. def test_create_my_rr_sets_twice(self):
  186. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  187. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  188. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  189. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  190. data['records'][0] = ['3.2.2.1']
  191. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  192. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  193. def test_create_my_rr_sets_unknown_type(self):
  194. for _type in ['AA', 'ASDF']:
  195. with self.assertPdnsRequests(
  196. self.request_pdns_zone_update_unknown_type(name=self.my_domain.name, unknown_types=_type)
  197. ):
  198. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=60, type=_type)
  199. self.assertEqual(response.status_code, status.HTTP_422_UNPROCESSABLE_ENTITY)
  200. def test_retrieve_my_rr_sets_apex(self):
  201. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  202. self.assertEqual(response.status_code, status.HTTP_200_OK)
  203. self.assertEqual(response.data['records'][0], '1.2.3.4')
  204. self.assertEqual(response.data['ttl'], 120)
  205. def test_retrieve_my_rr_sets_restricted_types(self):
  206. for type_ in self.RESTRICTED_TYPES:
  207. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  208. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  209. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  210. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  211. def test_update_my_rr_sets(self):
  212. for subname in self.SUBNAMES:
  213. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  214. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.4'], ttl=30)
  215. self.assertEqual(response.status_code, status.HTTP_200_OK)
  216. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  217. self.assertEqual(response.status_code, status.HTTP_200_OK)
  218. self.assertEqual(response.data['records'], ['2.2.3.4'])
  219. self.assertEqual(response.data['ttl'], 30)
  220. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.5'])
  221. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  222. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', ttl=37)
  223. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  224. def test_partially_update_my_rr_sets(self):
  225. for subname in self.SUBNAMES:
  226. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  227. for data in [
  228. {'records': ['2.2.3.4'], 'ttl': 30},
  229. {'records': ['3.2.3.4']},
  230. {'ttl': 37},
  231. ]:
  232. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  233. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', **data)
  234. self.assertEqual(response.status_code, status.HTTP_200_OK)
  235. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  236. self.assertEqual(response.status_code, status.HTTP_200_OK)
  237. current_rr_set.update(data)
  238. self.assertEqual(response.data['records'], current_rr_set['records'])
  239. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  240. data = {}
  241. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', **data)
  242. self.assertEqual(response.status_code, status.HTTP_200_OK)
  243. def test_partially_update_other_rr_sets(self):
  244. for subname in self.SUBNAMES:
  245. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname='',
  246. type_='A', records=['3.2.3.4'], ttl=334)
  247. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  248. def test_update_other_rr_sets(self):
  249. for subname in self.SUBNAMES:
  250. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname='', type_='A', ttl=305)
  251. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  252. def test_update_essential_properties(self):
  253. # Changing the subname is expected to cause an error
  254. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  255. data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'}
  256. response = self.client.patch(url, data)
  257. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  258. response = self.client.put(url, data)
  259. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  260. # Changing the type is expected to cause an error
  261. data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'}
  262. response = self.client.patch(url, data)
  263. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  264. response = self.client.put(url, data)
  265. self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
  266. # Check that nothing changed
  267. response = self.client.get(url)
  268. self.assertEqual(response.status_code, status.HTTP_200_OK)
  269. self.assertEqual(response.data['records'][0], '2.2.3.4')
  270. self.assertEqual(response.data['ttl'], 120)
  271. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  272. self.assertEqual(response.data['subname'], 'test')
  273. self.assertEqual(response.data['type'], 'A')
  274. # This is expected to work, but the fields are ignored
  275. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  276. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  277. response = self.client.patch(url, data)
  278. self.assertEqual(response.status_code, status.HTTP_200_OK)
  279. response = self.client.get(url)
  280. self.assertEqual(response.status_code, status.HTTP_200_OK)
  281. self.assertEqual(response.data['records'][0], '3.2.3.4')
  282. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  283. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  284. def test_delete_my_rr_sets_with_patch(self):
  285. for subname in self.SUBNAMES:
  286. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  287. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A', records=[])
  288. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  289. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  290. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  291. def test_delete_my_rr_sets_with_delete(self):
  292. for subname in self.SUBNAMES:
  293. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  294. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  295. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  296. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  297. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  298. def test_delete_other_rr_sets(self):
  299. for subname in self.SUBNAMES:
  300. # Try PATCH empty
  301. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A', records=[])
  302. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  303. # Try DELETE
  304. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A')
  305. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  306. # Make sure it actually is still there
  307. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  308. def test_import_rr_sets(self):
  309. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  310. call_command('sync-from-pdns', self.my_domain.name)
  311. for response in [
  312. self.client.get_rr_sets(self.my_domain.name),
  313. self.client.get_rr_sets(self.my_domain.name, subname=''),
  314. ]:
  315. self.assertEqual(response.status_code, status.HTTP_200_OK)
  316. self.assertEqual(len(response.data), 1, response.data)
  317. self.assertContainsRRSet(response.data, subname='', records=settings.DEFAULT_NS, type='NS')