testrrsets.py 19 KB


  1. import operator
  2. from functools import reduce
  3. from django.conf import settings
  4. from django.core.exceptions import ValidationError
  5. from django.core.management import call_command
  6. from rest_framework import status
  7. from desecapi.models import RRset
  8. from desecapi.tests.base import DesecTestCase, DomainOwnerTestCase
  9. class UnauthenticatedRRSetTestCase(DesecTestCase):
  10. def test_unauthorized_access(self):
  11. url = self.reverse('v1:rrsets', name='example.com')
  12. for method in [
  13. self.client.get,
  14. self.client.post,
  15. self.client.put,
  16. self.client.delete,
  17. self.client.patch
  18. ]:
  19. response = method(url)
  20. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  21. class AuthenticatedRRSetTestCase(DomainOwnerTestCase):
  22. DEAD_TYPES = ['ALIAS', 'DNAME']
  23. RESTRICTED_TYPES = ['SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM', 'OPT']
  24. # see https://doc.powerdns.com/md/types/
  25. PDNS_RR_TYPES = ['A', 'AAAA', 'AFSDB', 'ALIAS', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DNSKEY', 'DNAME', 'DS',
  26. 'HINFO', 'KEY', 'LOC', 'MX', 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'NSEC3PARAM', 'OPENPGPKEY', 'PTR',
  27. 'RP', 'RRSIG', 'SOA', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG', 'TLSA', 'SMIMEA', 'TXT', 'URI']
  28. ALLOWED_TYPES = ['A', 'AAAA', 'AFSDB', 'CAA', 'CERT', 'CDNSKEY', 'CDS', 'CNAME', 'DS', 'HINFO', 'KEY', 'LOC', 'MX',
  29. 'NAPTR', 'NS', 'NSEC', 'NSEC3', 'OPENPGPKEY', 'PTR', 'RP', 'SPF', 'SSHFP', 'SRV', 'TKEY', 'TSIG',
  30. 'TLSA', 'SMIMEA', 'TXT', 'URI']
  31. SUBNAMES = ['foo', 'bar.baz', 'q.w.e.r.t', '*', '*.foobar', '_', '-foo.test', '_bar']
  32. @classmethod
  33. def _test_rr_sets(cls, subname=None, type_=None, records=None, ttl=None):
  34. """
  35. Gives a list of example RR sets for testing.
  36. Args:
  37. subname: Filter by subname. None to allow any.
  38. type_: Filter by type. None to allow any.
  39. records: Filter by records. Must match exactly. None to allow any.
  40. ttl: Filter by ttl. None to allow any.
  41. Returns: Returns a list of tuples that represents example RR sets represented as 4-tuples consisting of
  42. subname, type_, records, ttl
  43. """
  44. # TODO add more examples of cls.ALLOWED_TYPES
  45. rr_sets = [
  46. ('', 'A', ['1.2.3.4'], 120),
  47. ('test', 'A', ['2.2.3.4'], 120),
  48. ('test', 'TXT', ['"foobar"'], 120),
  49. ] + [
  50. (subname, 'TXT', ['"hey ho, let\'s go!"'], 134)
  51. for subname in cls.SUBNAMES
  52. ] + [
  53. (subname, type_, ['"10 mx1.example.com."'], 101)
  54. for subname in cls.SUBNAMES
  55. for type_ in ['MX', 'SPF']
  56. ] + [
  57. (subname, 'A', ['"1.2.3.4"'], 187)
  58. for subname in cls.SUBNAMES
  59. ]
  60. if subname or type_ or records or ttl:
  61. rr_sets = [
  62. rr_set for rr_set in rr_sets
  63. if (
  64. (subname is None or subname == rr_set[0]) and
  65. (type_ is None or type_ == rr_set[1]) and
  66. (records is None or records == rr_set[2]) and
  67. (ttl is None or ttl == rr_set[3])
  68. )
  69. ]
  70. return rr_sets
  71. @classmethod
  72. def setUpTestDataWithPdns(cls):
  73. super().setUpTestDataWithPdns()
  74. # TODO this test does not cover "dyn" / auto delegation domains
  75. cls.my_empty_domain = cls.create_domain(suffix='', owner=cls.owner)
  76. cls.my_rr_set_domain = cls.create_domain(suffix='', owner=cls.owner)
  77. cls.other_rr_set_domain = cls.create_domain(suffix='')
  78. for domain in [cls.my_rr_set_domain, cls.other_rr_set_domain]:
  79. for (subname, type_, records, ttl) in cls._test_rr_sets():
  80. cls.create_rr_set(domain, subname=subname, type=type_, records=records, ttl=ttl)
  81. def assertRRSet(self, response_rr, domain=None, subname=None, records=None, type_=None, **kwargs):
  82. kwargs['domain'] = domain
  83. kwargs['subname'] = subname
  84. kwargs['records'] = records
  85. kwargs['type'] = type_
  86. for key, value in kwargs.items():
  87. if value is not None:
  88. self.assertEqual(
  89. response_rr[key], value,
  90. 'RR set did not have the expected %s: Expected "%s" but was "%s" in %s' % (
  91. key, value, response_rr[key], response_rr
  92. )
  93. )
  94. @staticmethod
  95. def _filter_rr_sets(rr_sets, **kwargs):
  96. return [
  97. rr_sets for rr_set in rr_sets
  98. if reduce(operator.and_, [rr_set.get(key, None) == value for key, value in kwargs.items()])
  99. ]
  100. def assertRRSetCount(self, rr_sets, count, **kwargs):
  101. filtered_rr_sets = self._filter_rr_sets(rr_sets, **kwargs)
  102. if len(filtered_rr_sets) != count:
  103. self.fail('Expected to find %i RR set(s) with %s, but only found %i in %s.' % (
  104. count, kwargs, len(filtered_rr_sets), rr_sets
  105. ))
  106. def assertContainsRRSet(self, rr_sets, **kwargs):
  107. filtered_rr_sets = self._filter_rr_sets(rr_sets, **kwargs)
  108. if not filtered_rr_sets:
  109. self.fail('Expected to find RR set with %s, but only found %s.' % (
  110. kwargs, rr_sets
  111. ))
  112. def test_uniqueness(self):
  113. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  114. with self.assertRaises(ValidationError):
  115. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  116. RRset(domain=self.my_domain, subname='AEROPORT', ttl=60, type='A').save()
  117. RRset(domain=self.my_domain, subname='aéroport', ttl=100, type='A').save()
  118. def test_retrieve_my_rr_sets(self):
  119. for response in [
  120. self.client.get_rr_sets(self.my_domain.name),
  121. self.client.get_rr_sets(self.my_domain.name, subname=''),
  122. ]:
  123. self.assertStatus(response, status.HTTP_200_OK)
  124. self.assertEqual(len(response.data), 2, response.data)
  125. self.assertContainsRRSet(response.data, subname='', records=settings.DEFAULT_NS, type='NS')
  126. def test_retrieve_other_rr_sets(self):
  127. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  128. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  129. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  130. def test_retrieve_my_rr_sets_filter(self):
  131. response = self.client.get_rr_sets(self.my_rr_set_domain.name)
  132. self.assertStatus(response, status.HTTP_200_OK)
  133. self.assertEqual(len(response.data), len(self._test_rr_sets()) + 1) # Don't forget about the NS type RR set
  134. for subname in self.SUBNAMES:
  135. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  136. self.assertStatus(response, status.HTTP_200_OK)
  137. self.assertRRSetCount(response.data, count=len(self._test_rr_sets(subname=subname)), subname=subname)
  138. for type_ in self.ALLOWED_TYPES:
  139. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  140. self.assertStatus(response, status.HTTP_200_OK)
  141. if type_ != 'NS': # count does not match for NS, that's okay
  142. self.assertRRSetCount(response.data, count=len(self._test_rr_sets(type_=type_)), type=type_)
  143. def test_create_my_rr_sets(self):
  144. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  145. for data in [
  146. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'},
  147. {'subname': subname, 'records': ['desec.io.'], 'ttl': 900, 'type': 'PTR'},
  148. ]:
  149. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  150. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  151. self.assertStatus(response, status.HTTP_201_CREATED)
  152. response = self.client.get_rr_sets(self.my_empty_domain.name)
  153. self.assertStatus(response, status.HTTP_200_OK)
  154. self.assertRRSetCount(response.data, count=1, **data)
  155. response = self.client.get_rr_set(self.my_empty_domain.name, data['subname'], data['type'])
  156. self.assertStatus(response, status.HTTP_200_OK)
  157. self.assertRRSet(response.data, **data)
  158. def test_create_my_rr_sets_type_restriction(self):
  159. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  160. for data in [
  161. {'subname': subname, 'ttl': 60, 'type': 'a'},
  162. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  163. ] + [
  164. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  165. for type_ in self.DEAD_TYPES
  166. ] + [
  167. {'subname': subname, 'records': ['ns1.desec.io. peter.desec.io. 2584 10800 3600 604800 60'],
  168. 'ttl': 60, 'type': type_}
  169. for type_ in self.RESTRICTED_TYPES
  170. ]:
  171. response = self.client.post_rr_set(self.my_domain.name, **data)
  172. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  173. response = self.client.get_rr_sets(self.my_domain.name)
  174. self.assertStatus(response, status.HTTP_200_OK)
  175. self.assertRRSetCount(response.data, count=0, **data)
  176. def test_create_my_rr_sets_without_records(self):
  177. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  178. for data in [
  179. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  180. {'subname': subname, 'ttl': 60, 'type': 'A'},
  181. ]:
  182. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  183. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  184. response = self.client.get_rr_sets(self.my_empty_domain.name)
  185. self.assertStatus(response, status.HTTP_200_OK)
  186. self.assertRRSetCount(response.data, count=0, **data)
  187. def test_create_other_rr_sets(self):
  188. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  189. response = self.client.post_rr_set(self.other_domain.name, **data)
  190. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  191. def test_create_my_rr_sets_twice(self):
  192. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  193. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  194. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  195. self.assertStatus(response, status.HTTP_201_CREATED)
  196. data['records'][0] = ['3.2.2.1']
  197. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  198. self.assertStatus(response, status.HTTP_409_CONFLICT)
  199. def test_create_my_rr_sets_unknown_type(self):
  200. for _type in ['AA', 'ASDF']:
  201. with self.assertPdnsRequests(
  202. self.request_pdns_zone_update_unknown_type(name=self.my_domain.name, unknown_types=_type)
  203. ):
  204. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=60, type=_type)
  205. self.assertStatus(response, status.HTTP_422_UNPROCESSABLE_ENTITY)
  206. def test_retrieve_my_rr_sets_apex(self):
  207. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  208. self.assertStatus(response, status.HTTP_200_OK)
  209. self.assertEqual(response.data['records'][0], '1.2.3.4')
  210. self.assertEqual(response.data['ttl'], 120)
  211. def test_retrieve_my_rr_sets_restricted_types(self):
  212. for type_ in self.RESTRICTED_TYPES:
  213. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  214. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  215. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  216. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  217. def test_update_my_rr_sets(self):
  218. for subname in self.SUBNAMES:
  219. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  220. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.4'], ttl=30)
  221. self.assertStatus(response, status.HTTP_200_OK)
  222. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  223. self.assertStatus(response, status.HTTP_200_OK)
  224. self.assertEqual(response.data['records'], ['2.2.3.4'])
  225. self.assertEqual(response.data['ttl'], 30)
  226. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.5'])
  227. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  228. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', ttl=37)
  229. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  230. def test_partially_update_my_rr_sets(self):
  231. for subname in self.SUBNAMES:
  232. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  233. for data in [
  234. {'records': ['2.2.3.4'], 'ttl': 30},
  235. {'records': ['3.2.3.4']},
  236. {'ttl': 37},
  237. ]:
  238. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  239. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', **data)
  240. self.assertStatus(response, status.HTTP_200_OK)
  241. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  242. self.assertStatus(response, status.HTTP_200_OK)
  243. current_rr_set.update(data)
  244. self.assertEqual(response.data['records'], current_rr_set['records'])
  245. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  246. data = {}
  247. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', **data)
  248. self.assertStatus(response, status.HTTP_200_OK)
  249. def test_partially_update_other_rr_sets(self):
  250. for subname in self.SUBNAMES:
  251. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname,
  252. type_='A', records=['3.2.3.4'], ttl=334)
  253. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  254. def test_update_other_rr_sets(self):
  255. for subname in self.SUBNAMES:
  256. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A', ttl=305)
  257. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  258. def test_update_essential_properties(self):
  259. # Changing the subname is expected to cause an error
  260. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  261. data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'}
  262. response = self.client.patch(url, data)
  263. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  264. response = self.client.put(url, data)
  265. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  266. # Changing the type is expected to cause an error
  267. data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'}
  268. response = self.client.patch(url, data)
  269. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  270. response = self.client.put(url, data)
  271. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  272. # Check that nothing changed
  273. response = self.client.get(url)
  274. self.assertStatus(response, status.HTTP_200_OK)
  275. self.assertEqual(response.data['records'][0], '2.2.3.4')
  276. self.assertEqual(response.data['ttl'], 120)
  277. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  278. self.assertEqual(response.data['subname'], 'test')
  279. self.assertEqual(response.data['type'], 'A')
  280. # This is expected to work, but the fields are ignored
  281. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  282. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  283. response = self.client.patch(url, data)
  284. self.assertStatus(response, status.HTTP_200_OK)
  285. response = self.client.get(url)
  286. self.assertStatus(response, status.HTTP_200_OK)
  287. self.assertEqual(response.data['records'][0], '3.2.3.4')
  288. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  289. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  290. def test_delete_my_rr_sets_with_patch(self):
  291. for subname in self.SUBNAMES:
  292. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  293. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A', records=[])
  294. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  295. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  296. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  297. def test_delete_my_rr_sets_with_delete(self):
  298. for subname in self.SUBNAMES:
  299. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  300. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  301. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  302. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  303. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  304. def test_delete_other_rr_sets(self):
  305. for subname in self.SUBNAMES:
  306. # Try PATCH empty
  307. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A', records=[])
  308. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  309. # Try DELETE
  310. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A')
  311. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  312. # Make sure it actually is still there
  313. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  314. def test_import_rr_sets(self):
  315. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  316. call_command('sync-from-pdns', self.my_domain.name)
  317. for response in [
  318. self.client.get_rr_sets(self.my_domain.name),
  319. self.client.get_rr_sets(self.my_domain.name, subname=''),
  320. ]:
  321. self.assertStatus(response, status.HTTP_200_OK)
  322. self.assertEqual(len(response.data), 1, response.data)
  323. self.assertContainsRRSet(response.data, subname='', records=settings.DEFAULT_NS, type='NS')