test_rrsets.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. from django.conf import settings
  2. from django.core.exceptions import ValidationError
  3. from django.core.management import call_command
  4. from rest_framework import status
  5. from desecapi.models import RRset
  6. from desecapi.tests.base import DesecTestCase, AuthenticatedRRSetBaseTestCase
  7. class UnauthenticatedRRSetTestCase(DesecTestCase):
  8. def test_unauthorized_access(self):
  9. url = self.reverse('v1:rrsets', name='example.com')
  10. for method in [
  11. self.client.get,
  12. self.client.post,
  13. self.client.put,
  14. self.client.delete,
  15. self.client.patch
  16. ]:
  17. response = method(url)
  18. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  19. class AuthenticatedRRSetTestCase(AuthenticatedRRSetBaseTestCase):
  20. def test_subname_validity(self):
  21. for subname in [
  22. 'aEroport',
  23. 'AEROPORT',
  24. 'aéroport'
  25. ]:
  26. with self.assertRaises(ValidationError):
  27. RRset(domain=self.my_domain, subname=subname, ttl=60, type='A').save()
  28. RRset(domain=self.my_domain, subname='aeroport', ttl=60, type='A').save()
  29. def test_retrieve_my_rr_sets(self):
  30. for response in [
  31. self.client.get_rr_sets(self.my_domain.name),
  32. self.client.get_rr_sets(self.my_domain.name, subname=''),
  33. ]:
  34. self.assertStatus(response, status.HTTP_200_OK)
  35. self.assertEqual(len(response.data), 1, response.data)
  36. def test_retrieve_other_rr_sets(self):
  37. self.assertStatus(self.client.get_rr_sets(self.other_domain.name), status.HTTP_404_NOT_FOUND)
  38. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, subname='test'), status.HTTP_404_NOT_FOUND)
  39. self.assertStatus(self.client.get_rr_sets(self.other_domain.name, type='A'), status.HTTP_404_NOT_FOUND)
  40. def test_retrieve_my_rr_sets_filter(self):
  41. response = self.client.get_rr_sets(self.my_rr_set_domain.name)
  42. self.assertStatus(response, status.HTTP_200_OK)
  43. self.assertEqual(len(response.data), len(self._test_rr_sets()))
  44. for subname in self.SUBNAMES:
  45. response = self.client.get_rr_sets(self.my_rr_set_domain.name, subname=subname)
  46. self.assertStatus(response, status.HTTP_200_OK)
  47. self.assertRRSetsCount(response.data, [dict(subname=subname)],
  48. count=len(self._test_rr_sets(subname=subname)))
  49. for type_ in self.ALLOWED_TYPES:
  50. response = self.client.get_rr_sets(self.my_rr_set_domain.name, type=type_)
  51. self.assertStatus(response, status.HTTP_200_OK)
  52. def test_create_my_rr_sets(self):
  53. for subname in [None, 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  54. for data in [
  55. {'subname': subname, 'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'},
  56. {'subname': '' if subname is None else subname, 'records': ['desec.io.'], 'ttl': 900, 'type': 'PTR'},
  57. {'subname': '' if subname is None else subname, 'ttl': 50, 'type': 'TXT', 'records': ['"foo"']},
  58. ]:
  59. # Try POST with missing subname
  60. if data['subname'] is None:
  61. data.pop('subname')
  62. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
  63. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  64. self.assertStatus(response, status.HTTP_201_CREATED)
  65. # Check for uniqueness on second attempt
  66. response = self.client.post_rr_set(domain_name=self.my_empty_domain.name, **data)
  67. self.assertContains(response, 'Another RRset with the same subdomain and type exists for this domain.',
  68. status_code=status.HTTP_400_BAD_REQUEST)
  69. response = self.client.get_rr_sets(self.my_empty_domain.name)
  70. self.assertStatus(response, status.HTTP_200_OK)
  71. self.assertRRSetsCount(response.data, [data])
  72. response = self.client.get_rr_set(self.my_empty_domain.name, data.get('subname', ''), data['type'])
  73. self.assertStatus(response, status.HTTP_200_OK)
  74. self.assertRRSet(response.data, **data)
  75. def test_create_my_rr_sets_type_restriction(self):
  76. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  77. for data in [
  78. {'subname': subname, 'ttl': 60, 'type': 'a'},
  79. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': 'txt'}
  80. ] + [
  81. {'subname': subname, 'records': ['10 example.com.'], 'ttl': 60, 'type': type_}
  82. for type_ in self.DEAD_TYPES
  83. ] + [
  84. {'subname': subname, 'records': ['ns1.desec.io. peter.desec.io. 2584 10800 3600 604800 60'],
  85. 'ttl': 60, 'type': type_}
  86. for type_ in self.RESTRICTED_TYPES
  87. ]:
  88. response = self.client.post_rr_set(self.my_domain.name, **data)
  89. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  90. response = self.client.get_rr_sets(self.my_domain.name)
  91. self.assertStatus(response, status.HTTP_200_OK)
  92. self.assertRRSetsCount(response.data, [data], count=0)
  93. def test_create_my_rr_sets_without_records(self):
  94. for subname in ['', 'create-my-rr-sets', 'foo.create-my-rr-sets', 'bar.baz.foo.create-my-rr-sets']:
  95. for data in [
  96. {'subname': subname, 'records': [], 'ttl': 60, 'type': 'A'},
  97. {'subname': subname, 'ttl': 60, 'type': 'A'},
  98. ]:
  99. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  100. self.assertStatus(
  101. response,
  102. status.HTTP_400_BAD_REQUEST
  103. )
  104. response = self.client.get_rr_sets(self.my_empty_domain.name)
  105. self.assertStatus(response, status.HTTP_200_OK)
  106. self.assertRRSetsCount(response.data, [], count=0)
  107. def test_create_other_rr_sets(self):
  108. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  109. response = self.client.post_rr_set(self.other_domain.name, **data)
  110. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  111. def test_create_my_rr_sets_twice(self):
  112. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  113. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
  114. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  115. self.assertStatus(response, status.HTTP_201_CREATED)
  116. data['records'][0] = '3.2.2.1'
  117. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  118. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  119. def test_create_my_rr_sets_upper_case(self):
  120. for subname in ['asdF', 'cAse', 'asdf.FOO', '--F', 'ALLCAPS']:
  121. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': subname}
  122. response = self.client.post_rr_set(self.my_empty_domain.name, **data)
  123. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  124. self.assertIn('Subname can only use (lowercase)', str(response.data))
  125. def test_create_my_rr_sets_unknown_type(self):
  126. for _type in ['AA', 'ASDF']:
  127. with self.assertPdnsRequests(
  128. self.request_pdns_zone_update_unknown_type(name=self.my_domain.name, unknown_types=_type)
  129. ):
  130. response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=60, type=_type)
  131. self.assertStatus(response, status.HTTP_422_UNPROCESSABLE_ENTITY)
  132. def test_retrieve_my_rr_sets_apex(self):
  133. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname='', type_='A')
  134. self.assertStatus(response, status.HTTP_200_OK)
  135. self.assertEqual(response.data['records'][0], '1.2.3.4')
  136. self.assertEqual(response.data['ttl'], 120)
  137. def test_retrieve_my_rr_sets_restricted_types(self):
  138. for type_ in self.RESTRICTED_TYPES:
  139. response = self.client.get_rr_sets(self.my_domain.name, type=type_)
  140. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  141. response = self.client.get_rr_sets(self.my_domain.name, type=type_, subname='')
  142. self.assertStatus(response, status.HTTP_403_FORBIDDEN)
  143. def test_update_my_rr_sets(self):
  144. for subname in self.SUBNAMES:
  145. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  146. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.4'], ttl=30)
  147. self.assertStatus(response, status.HTTP_200_OK)
  148. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  149. self.assertStatus(response, status.HTTP_200_OK)
  150. self.assertEqual(response.data['records'], ['2.2.3.4'])
  151. self.assertEqual(response.data['ttl'], 30)
  152. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', records=['2.2.3.5'])
  153. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  154. response = self.client.put_rr_set(self.my_rr_set_domain.name, subname, 'A', ttl=37)
  155. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  156. def test_partially_update_my_rr_sets(self):
  157. for subname in self.SUBNAMES:
  158. current_rr_set = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A').data
  159. for data in [
  160. {'records': ['2.2.3.4'], 'ttl': 30},
  161. {'records': ['3.2.3.4']},
  162. {'records': ['3.2.3.4', '9.8.8.7']},
  163. {'ttl': 37},
  164. ]:
  165. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  166. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A', **data)
  167. self.assertStatus(response, status.HTTP_200_OK)
  168. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname, 'A')
  169. self.assertStatus(response, status.HTTP_200_OK)
  170. current_rr_set.update(data)
  171. self.assertEqual(response.data['records'], current_rr_set['records'])
  172. self.assertEqual(response.data['ttl'], current_rr_set['ttl'])
  173. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname, 'A')
  174. self.assertStatus(response, status.HTTP_200_OK)
  175. def test_partially_update_other_rr_sets(self):
  176. for subname in self.SUBNAMES:
  177. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname,
  178. type_='A', records=['3.2.3.4'], ttl=334)
  179. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  180. def test_update_other_rr_sets(self):
  181. for subname in self.SUBNAMES:
  182. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A', ttl=305)
  183. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  184. def test_update_essential_properties(self):
  185. # Changing the subname is expected to cause an error
  186. url = self.reverse('v1:rrset', name=self.my_rr_set_domain.name, subname='test', type='A')
  187. data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'}
  188. response = self.client.patch(url, data)
  189. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  190. response = self.client.put(url, data)
  191. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  192. # Changing the type is expected to cause an error
  193. data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'}
  194. response = self.client.patch(url, data)
  195. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  196. response = self.client.put(url, data)
  197. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  198. # Check that nothing changed
  199. response = self.client.get(url)
  200. self.assertStatus(response, status.HTTP_200_OK)
  201. self.assertEqual(response.data['records'][0], '2.2.3.4')
  202. self.assertEqual(response.data['ttl'], 120)
  203. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  204. self.assertEqual(response.data['subname'], 'test')
  205. self.assertEqual(response.data['type'], 'A')
  206. # This is expected to work, but the fields are ignored
  207. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  208. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  209. response = self.client.patch(url, data)
  210. self.assertStatus(response, status.HTTP_200_OK)
  211. response = self.client.get(url)
  212. self.assertStatus(response, status.HTTP_200_OK)
  213. self.assertEqual(response.data['records'][0], '3.2.3.4')
  214. self.assertEqual(response.data['domain'], self.my_rr_set_domain.name)
  215. self.assertEqual(response.data['name'], 'test.' + self.my_rr_set_domain.name + '.')
  216. def test_delete_my_rr_sets_with_patch(self):
  217. for subname in self.SUBNAMES:
  218. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  219. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A', records=[])
  220. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  221. response = self.client.patch_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A', records=[])
  222. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  223. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  224. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  225. def test_delete_my_rr_sets_with_delete(self):
  226. for subname in self.SUBNAMES:
  227. with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_rr_set_domain.name)):
  228. response = self.client.delete_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  229. self.assertStatus(response, status.HTTP_204_NO_CONTENT)
  230. response = self.client.get_rr_set(self.my_rr_set_domain.name, subname=subname, type_='A')
  231. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  232. def test_delete_other_rr_sets(self):
  233. for subname in self.SUBNAMES:
  234. # Try PATCH empty
  235. response = self.client.patch_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A', records=[])
  236. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  237. # Try DELETE
  238. response = self.client.delete_rr_set(self.other_rr_set_domain.name, subname=subname, type_='A')
  239. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  240. # Make sure it actually is still there
  241. self.assertGreater(len(self.other_rr_set_domain.rrset_set.filter(subname=subname, type='A')), 0)
  242. def test_import_rr_sets(self):
  243. with self.assertPdnsRequests(self.request_pdns_zone_retrieve(name=self.my_domain.name)):
  244. call_command('sync-from-pdns', self.my_domain.name)
  245. for response in [
  246. self.client.get_rr_sets(self.my_domain.name),
  247. self.client.get_rr_sets(self.my_domain.name, subname=''),
  248. ]:
  249. self.assertStatus(response, status.HTTP_200_OK)
  250. self.assertEqual(len(response.data), 1, response.data)
  251. self.assertContainsRRSets(response.data, [dict(subname='', records=settings.DEFAULT_NS, type='NS')])