testrrsets.py 21 KB


  1. from django.core.urlresolvers import reverse
  2. from rest_framework import status
  3. from rest_framework.test import APITestCase
  4. from .utils import utils
  5. import httpretty
  6. from django.conf import settings
  7. import json
  8. from django.core.management import call_command
  9. class UnauthenticatedDomainTests(APITestCase):
  10. def testExpectUnauthorizedOnGet(self):
  11. url = reverse('rrsets', args=('example.com',))
  12. response = self.client.get(url, format='json')
  13. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  14. def testExpectUnauthorizedOnPost(self):
  15. url = reverse('rrsets', args=('example.com',))
  16. response = self.client.post(url, format='json')
  17. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  18. def testExpectUnauthorizedOnPut(self):
  19. url = reverse('rrsets', args=('example.com',))
  20. response = self.client.put(url, format='json')
  21. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  22. def testExpectUnauthorizedOnDelete(self):
  23. url = reverse('rrsets', args=('example.com',))
  24. response = self.client.delete(url, format='json')
  25. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  26. class AuthenticatedRRsetTests(APITestCase):
  27. restricted_types = ('SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM')
  28. def setUp(self):
  29. httpretty.reset()
  30. httpretty.disable()
  31. if not hasattr(self, 'owner'):
  32. self.owner = utils.createUser()
  33. self.ownedDomains = [utils.createDomain(self.owner), utils.createDomain(self.owner)]
  34. self.token = utils.createToken(user=self.owner)
  35. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  36. self.otherOwner = utils.createUser()
  37. self.otherDomains = [utils.createDomain(self.otherOwner), utils.createDomain()]
  38. self.otherToken = utils.createToken(user=self.otherOwner)
  39. def testCanGetOwnRRsets(self):
  40. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  41. response = self.client.get(url)
  42. self.assertEqual(response.status_code, status.HTTP_200_OK)
  43. self.assertEqual(len(response.data), 1) # don't forget NS RRset
  44. def testCantGetForeignRRsets(self):
  45. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  46. response = self.client.get(url)
  47. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  48. def testCanGetOwnRRsetsEmptySubname(self):
  49. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  50. response = self.client.get(url + '?subname=')
  51. self.assertEqual(response.status_code, status.HTTP_200_OK)
  52. self.assertEqual(len(response.data), 1) # don't forget NS RRset
  53. def testCanGetOwnRRsetsFromSubname(self):
  54. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  55. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  56. response = self.client.post(url, json.dumps(data), content_type='application/json')
  57. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  58. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  59. response = self.client.post(url, json.dumps(data), content_type='application/json')
  60. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  61. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  62. response = self.client.post(url, json.dumps(data), content_type='application/json')
  63. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  64. response = self.client.get(url)
  65. self.assertEqual(response.status_code, status.HTTP_200_OK)
  66. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  67. response = self.client.get(url + '?subname=test')
  68. self.assertEqual(response.status_code, status.HTTP_200_OK)
  69. self.assertEqual(len(response.data), 2)
  70. def testCantGetForeignRRsetsFromSubname(self):
  71. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  72. response = self.client.get(url + '?subname=test')
  73. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  74. def testCanGetOwnRRsetsFromType(self):
  75. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  76. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  77. response = self.client.post(url, json.dumps(data), content_type='application/json')
  78. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  79. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  80. response = self.client.post(url, json.dumps(data), content_type='application/json')
  81. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  82. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  83. response = self.client.post(url, json.dumps(data), content_type='application/json')
  84. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  85. response = self.client.get(url)
  86. self.assertEqual(response.status_code, status.HTTP_200_OK)
  87. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  88. response = self.client.get(url + '?type=A')
  89. self.assertEqual(response.status_code, status.HTTP_200_OK)
  90. self.assertEqual(len(response.data), 2)
  91. def testCantGetForeignRRsetsFromType(self):
  92. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  93. response = self.client.get(url + '?test=A')
  94. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  95. def testCanPostOwnRRsets(self):
  96. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  97. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  98. response = self.client.post(url, json.dumps(data), content_type='application/json')
  99. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  100. response = self.client.get(url)
  101. self.assertEqual(response.status_code, status.HTTP_200_OK)
  102. self.assertEqual(len(response.data), 1 + 1) # don't forget NS RRset
  103. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  104. response = self.client.get(url)
  105. self.assertEqual(response.status_code, status.HTTP_200_OK)
  106. self.assertEqual(response.data['records'][0], '1.2.3.4')
  107. def testCantPostRestrictedTypes(self):
  108. for type_ in self.restricted_types:
  109. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  110. data = {'records': ['ns1.desec.io. peter.desec.io. 2584 10800 3600 604800 60'], 'ttl': 60, 'type': type_}
  111. response = self.client.post(url, json.dumps(data), content_type='application/json')
  112. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  113. def testCantPostForeignRRsets(self):
  114. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  115. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  116. response = self.client.post(url, json.dumps(data), content_type='application/json')
  117. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  118. def testCanGetOwnRRset(self):
  119. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  120. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  121. response = self.client.post(url, json.dumps(data), content_type='application/json')
  122. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  123. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  124. response = self.client.get(url)
  125. self.assertEqual(response.status_code, status.HTTP_200_OK)
  126. self.assertEqual(response.data['records'][0], '1.2.3.4')
  127. self.assertEqual(response.data['ttl'], 60)
  128. def testCantGetRestrictedTypes(self):
  129. for type_ in self.restricted_types:
  130. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  131. response = self.client.get(url + '?type=%s' % type_)
  132. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  133. url = reverse('rrset', args=(self.ownedDomains[1].name, '', type_,))
  134. response = self.client.get(url)
  135. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  136. def testCantGetForeignRRset(self):
  137. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  138. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  139. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  140. response = self.client.post(url, json.dumps(data), content_type='application/json')
  141. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  142. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  143. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  144. response = self.client.get(url)
  145. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  146. def testCanGetOwnRRsetWithSubname(self):
  147. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  148. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  149. response = self.client.post(url, json.dumps(data), content_type='application/json')
  150. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  151. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  152. response = self.client.post(url, json.dumps(data), content_type='application/json')
  153. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  154. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  155. response = self.client.post(url, json.dumps(data), content_type='application/json')
  156. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  157. response = self.client.get(url)
  158. self.assertEqual(response.status_code, status.HTTP_200_OK)
  159. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  160. url = reverse('rrset', args=(self.ownedDomains[1].name, 'test', 'A',))
  161. response = self.client.get(url)
  162. self.assertEqual(response.status_code, status.HTTP_200_OK)
  163. self.assertEqual(response.data['records'][0], '2.2.3.4')
  164. self.assertEqual(response.data['ttl'], 120)
  165. self.assertEqual(response.data['name'], 'test.' + self.ownedDomains[1].name + '.')
  166. def testCanGetOwnRRsetWithWildcard(self):
  167. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  168. data = {'records': ['"barfoo"'], 'ttl': 120, 'type': 'TXT', 'subname': '*.foobar'}
  169. response = self.client.post(url, json.dumps(data), content_type='application/json')
  170. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  171. response1 = self.client.get(url + '?subname=*.foobar')
  172. self.assertEqual(response1.status_code, status.HTTP_200_OK)
  173. self.assertEqual(response1.data[0]['records'][0], '"barfoo"')
  174. self.assertEqual(response1.data[0]['ttl'], 120)
  175. self.assertEqual(response1.data[0]['name'], '*.foobar.' + self.ownedDomains[1].name + '.')
  176. url = reverse('rrset', args=(self.ownedDomains[1].name, '*.foobar', 'TXT',))
  177. response2 = self.client.get(url)
  178. self.assertEqual(response2.data, response1.data[0])
  179. def testCanPutOwnRRset(self):
  180. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  181. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  182. response = self.client.post(url, json.dumps(data), content_type='application/json')
  183. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  184. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  185. data = {'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A'}
  186. response = self.client.put(url, json.dumps(data), content_type='application/json')
  187. self.assertEqual(response.status_code, status.HTTP_200_OK)
  188. response = self.client.get(url)
  189. self.assertEqual(response.status_code, status.HTTP_200_OK)
  190. self.assertEqual(response.data['records'][0], '2.2.3.4')
  191. self.assertEqual(response.data['ttl'], 30)
  192. def testCanPatchOwnRRset(self):
  193. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  194. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  195. response = self.client.post(url, json.dumps(data), content_type='application/json')
  196. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  197. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  198. data = {'records': ['3.2.3.4'], 'ttl': 32}
  199. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  200. self.assertEqual(response.status_code, status.HTTP_200_OK)
  201. response = self.client.get(url)
  202. self.assertEqual(response.status_code, status.HTTP_200_OK)
  203. self.assertEqual(response.data['records'][0], '3.2.3.4')
  204. self.assertEqual(response.data['ttl'], 32)
  205. def testCantPatchOForeignRRset(self):
  206. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  207. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  208. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  209. response = self.client.post(url, json.dumps(data), content_type='application/json')
  210. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  211. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  212. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  213. data = {'records': ['3.2.3.4'], 'ttl': 32}
  214. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  215. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  216. def testCantPutForeignRRset(self):
  217. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  218. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  219. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  220. response = self.client.post(url, json.dumps(data), content_type='application/json')
  221. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  222. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  223. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  224. data = {'records': ['3.2.3.4'], 'ttl': 30, 'type': 'A'}
  225. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  226. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  227. def testCantChangeEssentialProperties(self):
  228. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  229. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': 'test1'}
  230. response = self.client.post(url, json.dumps(data), content_type='application/json')
  231. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  232. # Changing the type is expected to cause an error
  233. url = reverse('rrset', args=(self.ownedDomains[1].name, 'test1', 'A',))
  234. data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'}
  235. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  236. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  237. # Changing the subname is expected to cause an error
  238. data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'}
  239. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  240. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  241. # Check that nothing changed
  242. response = self.client.get(url)
  243. self.assertEqual(response.status_code, status.HTTP_200_OK)
  244. self.assertEqual(response.data['records'][0], '1.2.3.4')
  245. self.assertEqual(response.data['ttl'], 60)
  246. self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.')
  247. self.assertEqual(response.data['subname'], 'test1')
  248. self.assertEqual(response.data['type'], 'A')
  249. # This is expected to work, but the fields are ignored
  250. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  251. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  252. self.assertEqual(response.status_code, status.HTTP_200_OK)
  253. response = self.client.get(url)
  254. self.assertEqual(response.status_code, status.HTTP_200_OK)
  255. self.assertEqual(response.data['records'][0], '3.2.3.4')
  256. self.assertEqual(response.data['domain'], self.ownedDomains[1].name)
  257. self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.')
  258. def testCanDeleteOwnRRset(self):
  259. # Try PATCH with empty records
  260. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  261. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  262. response = self.client.post(url, json.dumps(data), content_type='application/json')
  263. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  264. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  265. data = {'records': []}
  266. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  267. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  268. response = self.client.get(url)
  269. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  270. # Try DELETE
  271. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  272. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  273. response = self.client.post(url, json.dumps(data), content_type='application/json')
  274. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  275. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  276. response = self.client.delete(url)
  277. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  278. response = self.client.get(url)
  279. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  280. def testCantDeleteForeignRRset(self):
  281. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  282. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  283. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  284. response = self.client.post(url, json.dumps(data), content_type='application/json')
  285. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  286. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  287. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  288. # Try PATCH with empty records
  289. data = {'records': []}
  290. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  291. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  292. # Try DELETE
  293. response = self.client.delete(url)
  294. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  295. def testPostCausesPdnsAPICall(self):
  296. httpretty.enable()
  297. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
  298. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + './notify')
  299. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  300. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  301. response = self.client.post(url, json.dumps(data), content_type='application/json')
  302. result = json.loads(httpretty.httpretty.latest_requests[-2].parsed_body)
  303. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  304. self.assertEqual(result['rrsets'][0]['records'][0]['content'], '1.2.3.4')
  305. def testDeleteCausesPdnsAPICall(self):
  306. httpretty.enable()
  307. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
  308. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + './notify')
  309. # Create record, should cause a pdns PATCH request and a notify
  310. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  311. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  312. response = self.client.post(url, json.dumps(data), content_type='application/json')
  313. # Delete record, should cause a pdns PATCH request and a notify
  314. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  315. response = self.client.delete(url)
  316. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  317. # Check pdns requests from creation
  318. result = json.loads(httpretty.httpretty.latest_requests[-4].parsed_body)
  319. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  320. self.assertEqual(result['rrsets'][0]['records'][0]['content'], '1.2.3.4')
  321. self.assertEqual(httpretty.httpretty.latest_requests[-3].method, 'PUT')
  322. # Check pdns requests from deletion
  323. result = json.loads(httpretty.httpretty.latest_requests[-2].parsed_body)
  324. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  325. self.assertEqual(result['rrsets'][0]['records'], [])
  326. self.assertEqual(httpretty.httpretty.latest_requests[-1].method, 'PUT')
  327. def testImportRRsets(self):
  328. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  329. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  330. response = self.client.post(url, json.dumps(data), content_type='application/json')
  331. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  332. # Not checking anything here; errors will raise an exception
  333. call_command('sync-from-pdns', self.ownedDomains[1].name)