testrrsets.py 22 KB


  1. from django.core.urlresolvers import reverse
  2. from rest_framework import status
  3. from rest_framework.test import APITestCase
  4. from .utils import utils
  5. import httpretty
  6. from django.conf import settings
  7. import json
  8. from django.core.management import call_command
  9. class UnauthenticatedDomainTests(APITestCase):
  10. def testExpectUnauthorizedOnGet(self):
  11. url = reverse('rrsets', args=('example.com',))
  12. response = self.client.get(url, format='json')
  13. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  14. def testExpectUnauthorizedOnPost(self):
  15. url = reverse('rrsets', args=('example.com',))
  16. response = self.client.post(url, format='json')
  17. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  18. def testExpectUnauthorizedOnPut(self):
  19. url = reverse('rrsets', args=('example.com',))
  20. response = self.client.put(url, format='json')
  21. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  22. def testExpectUnauthorizedOnDelete(self):
  23. url = reverse('rrsets', args=('example.com',))
  24. response = self.client.delete(url, format='json')
  25. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  26. class AuthenticatedRRsetTests(APITestCase):
  27. restricted_types = ('SOA', 'RRSIG', 'DNSKEY', 'NSEC3PARAM')
  28. def setUp(self):
  29. httpretty.reset()
  30. httpretty.disable()
  31. if not hasattr(self, 'owner'):
  32. self.owner = utils.createUser()
  33. self.ownedDomains = [utils.createDomain(self.owner), utils.createDomain(self.owner)]
  34. self.token = utils.createToken(user=self.owner)
  35. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  36. self.otherOwner = utils.createUser()
  37. self.otherDomains = [utils.createDomain(self.otherOwner), utils.createDomain()]
  38. self.otherToken = utils.createToken(user=self.otherOwner)
  39. def testCanGetOwnRRsets(self):
  40. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  41. response = self.client.get(url)
  42. self.assertEqual(response.status_code, status.HTTP_200_OK)
  43. self.assertEqual(len(response.data), 1) # don't forget NS RRset
  44. def testCantGetForeignRRsets(self):
  45. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  46. response = self.client.get(url)
  47. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  48. def testCanGetOwnRRsetsEmptySubname(self):
  49. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  50. response = self.client.get(url + '?subname=')
  51. self.assertEqual(response.status_code, status.HTTP_200_OK)
  52. self.assertEqual(len(response.data), 1) # don't forget NS RRset
  53. def testCanGetOwnRRsetsFromSubname(self):
  54. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  55. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  56. response = self.client.post(url, json.dumps(data), content_type='application/json')
  57. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  58. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  59. response = self.client.post(url, json.dumps(data), content_type='application/json')
  60. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  61. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  62. response = self.client.post(url, json.dumps(data), content_type='application/json')
  63. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  64. response = self.client.get(url)
  65. self.assertEqual(response.status_code, status.HTTP_200_OK)
  66. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  67. response = self.client.get(url + '?subname=test')
  68. self.assertEqual(response.status_code, status.HTTP_200_OK)
  69. self.assertEqual(len(response.data), 2)
  70. def testCantGetForeignRRsetsFromSubname(self):
  71. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  72. response = self.client.get(url + '?subname=test')
  73. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  74. def testCanGetOwnRRsetsFromType(self):
  75. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  76. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  77. response = self.client.post(url, json.dumps(data), content_type='application/json')
  78. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  79. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  80. response = self.client.post(url, json.dumps(data), content_type='application/json')
  81. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  82. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  83. response = self.client.post(url, json.dumps(data), content_type='application/json')
  84. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  85. response = self.client.get(url)
  86. self.assertEqual(response.status_code, status.HTTP_200_OK)
  87. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  88. response = self.client.get(url + '?type=A')
  89. self.assertEqual(response.status_code, status.HTTP_200_OK)
  90. self.assertEqual(len(response.data), 2)
  91. def testCantGetForeignRRsetsFromType(self):
  92. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  93. response = self.client.get(url + '?test=A')
  94. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  95. def testCanPostOwnRRsets(self):
  96. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  97. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  98. response = self.client.post(url, json.dumps(data), content_type='application/json')
  99. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  100. response = self.client.get(url)
  101. self.assertEqual(response.status_code, status.HTTP_200_OK)
  102. self.assertEqual(len(response.data), 1 + 1) # don't forget NS RRset
  103. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  104. response = self.client.get(url)
  105. self.assertEqual(response.status_code, status.HTTP_200_OK)
  106. self.assertEqual(response.data['records'][0], '1.2.3.4')
  107. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  108. data = {'records': ['desec.io.'], 'ttl': 900, 'type': 'PTR'}
  109. response = self.client.post(url, json.dumps(data), content_type='application/json')
  110. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  111. def testCantPostRestrictedTypes(self):
  112. for type_ in self.restricted_types:
  113. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  114. data = {'records': ['ns1.desec.io. peter.desec.io. 2584 10800 3600 604800 60'], 'ttl': 60, 'type': type_}
  115. response = self.client.post(url, json.dumps(data), content_type='application/json')
  116. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  117. def testCantPostForeignRRsets(self):
  118. url = reverse('rrsets', args=(self.otherDomains[1].name,))
  119. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  120. response = self.client.post(url, json.dumps(data), content_type='application/json')
  121. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  122. def testCanGetOwnRRset(self):
  123. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  124. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  125. response = self.client.post(url, json.dumps(data), content_type='application/json')
  126. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  127. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  128. response = self.client.get(url)
  129. self.assertEqual(response.status_code, status.HTTP_200_OK)
  130. self.assertEqual(response.data['records'][0], '1.2.3.4')
  131. self.assertEqual(response.data['ttl'], 60)
  132. def testCantGetRestrictedTypes(self):
  133. for type_ in self.restricted_types:
  134. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  135. response = self.client.get(url + '?type=%s' % type_)
  136. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  137. url = reverse('rrset', args=(self.ownedDomains[1].name, '', type_,))
  138. response = self.client.get(url)
  139. self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
  140. def testCantGetForeignRRset(self):
  141. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  142. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  143. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  144. response = self.client.post(url, json.dumps(data), content_type='application/json')
  145. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  146. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  147. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  148. response = self.client.get(url)
  149. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  150. def testCanGetOwnRRsetWithSubname(self):
  151. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  152. data = {'records': ['1.2.3.4'], 'ttl': 120, 'type': 'A'}
  153. response = self.client.post(url, json.dumps(data), content_type='application/json')
  154. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  155. data = {'records': ['2.2.3.4'], 'ttl': 120, 'type': 'A', 'subname': 'test'}
  156. response = self.client.post(url, json.dumps(data), content_type='application/json')
  157. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  158. data = {'records': ['"test"'], 'ttl': 120, 'type': 'TXT', 'subname': 'test'}
  159. response = self.client.post(url, json.dumps(data), content_type='application/json')
  160. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  161. response = self.client.get(url)
  162. self.assertEqual(response.status_code, status.HTTP_200_OK)
  163. self.assertEqual(len(response.data), 3 + 1) # don't forget NS RRset
  164. url = reverse('rrset', args=(self.ownedDomains[1].name, 'test', 'A',))
  165. response = self.client.get(url)
  166. self.assertEqual(response.status_code, status.HTTP_200_OK)
  167. self.assertEqual(response.data['records'][0], '2.2.3.4')
  168. self.assertEqual(response.data['ttl'], 120)
  169. self.assertEqual(response.data['name'], 'test.' + self.ownedDomains[1].name + '.')
  170. def testCanGetOwnRRsetWithWildcard(self):
  171. for subname in ('*', '*.foobar'):
  172. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  173. data = {'records': ['"barfoo"'], 'ttl': 120, 'type': 'TXT', 'subname': subname}
  174. response = self.client.post(url, json.dumps(data), content_type='application/json')
  175. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  176. response1 = self.client.get(url + '?subname=' + subname)
  177. self.assertEqual(response1.status_code, status.HTTP_200_OK)
  178. self.assertEqual(response1.data[0]['records'][0], '"barfoo"')
  179. self.assertEqual(response1.data[0]['ttl'], 120)
  180. self.assertEqual(response1.data[0]['name'], subname + '.' + self.ownedDomains[1].name + '.')
  181. url = reverse('rrset', args=(self.ownedDomains[1].name, subname, 'TXT',))
  182. response2 = self.client.get(url)
  183. self.assertEqual(response2.data, response1.data[0])
  184. def testCanPutOwnRRset(self):
  185. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  186. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  187. response = self.client.post(url, json.dumps(data), content_type='application/json')
  188. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  189. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  190. data = {'records': ['2.2.3.4'], 'ttl': 30, 'type': 'A'}
  191. response = self.client.put(url, json.dumps(data), content_type='application/json')
  192. self.assertEqual(response.status_code, status.HTTP_200_OK)
  193. response = self.client.get(url)
  194. self.assertEqual(response.status_code, status.HTTP_200_OK)
  195. self.assertEqual(response.data['records'][0], '2.2.3.4')
  196. self.assertEqual(response.data['ttl'], 30)
  197. def testCanPatchOwnRRset(self):
  198. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  199. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  200. response = self.client.post(url, json.dumps(data), content_type='application/json')
  201. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  202. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  203. data = {'records': ['3.2.3.4'], 'ttl': 32}
  204. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  205. self.assertEqual(response.status_code, status.HTTP_200_OK)
  206. response = self.client.get(url)
  207. self.assertEqual(response.status_code, status.HTTP_200_OK)
  208. self.assertEqual(response.data['records'][0], '3.2.3.4')
  209. self.assertEqual(response.data['ttl'], 32)
  210. def testCantPatchOForeignRRset(self):
  211. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  212. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  213. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  214. response = self.client.post(url, json.dumps(data), content_type='application/json')
  215. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  216. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  217. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  218. data = {'records': ['3.2.3.4'], 'ttl': 32}
  219. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  220. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  221. def testCantPutForeignRRset(self):
  222. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  223. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  224. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  225. response = self.client.post(url, json.dumps(data), content_type='application/json')
  226. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  227. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  228. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  229. data = {'records': ['3.2.3.4'], 'ttl': 30, 'type': 'A'}
  230. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  231. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  232. def testCantChangeEssentialProperties(self):
  233. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  234. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A', 'subname': 'test1'}
  235. response = self.client.post(url, json.dumps(data), content_type='application/json')
  236. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  237. # Changing the type is expected to cause an error
  238. url = reverse('rrset', args=(self.ownedDomains[1].name, 'test1', 'A',))
  239. data = {'records': ['3.2.3.4'], 'ttl': 120, 'subname': 'test2'}
  240. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  241. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  242. # Changing the subname is expected to cause an error
  243. data = {'records': ['3.2.3.4'], 'ttl': 120, 'type': 'TXT'}
  244. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  245. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  246. # Check that nothing changed
  247. response = self.client.get(url)
  248. self.assertEqual(response.status_code, status.HTTP_200_OK)
  249. self.assertEqual(response.data['records'][0], '1.2.3.4')
  250. self.assertEqual(response.data['ttl'], 60)
  251. self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.')
  252. self.assertEqual(response.data['subname'], 'test1')
  253. self.assertEqual(response.data['type'], 'A')
  254. # This is expected to work, but the fields are ignored
  255. data = {'records': ['3.2.3.4'], 'name': 'example.com.', 'domain': 'example.com'}
  256. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  257. self.assertEqual(response.status_code, status.HTTP_200_OK)
  258. response = self.client.get(url)
  259. self.assertEqual(response.status_code, status.HTTP_200_OK)
  260. self.assertEqual(response.data['records'][0], '3.2.3.4')
  261. self.assertEqual(response.data['domain'], self.ownedDomains[1].name)
  262. self.assertEqual(response.data['name'], 'test1.' + self.ownedDomains[1].name + '.')
  263. def testCanDeleteOwnRRset(self):
  264. # Try PATCH with empty records
  265. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  266. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  267. response = self.client.post(url, json.dumps(data), content_type='application/json')
  268. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  269. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  270. data = {'records': []}
  271. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  272. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  273. response = self.client.get(url)
  274. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  275. # Try DELETE
  276. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  277. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  278. response = self.client.post(url, json.dumps(data), content_type='application/json')
  279. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  280. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  281. response = self.client.delete(url)
  282. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  283. response = self.client.get(url)
  284. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  285. def testCantDeleteForeignRRset(self):
  286. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.otherToken)
  287. url = reverse('rrsets', args=(self.otherDomains[0].name,))
  288. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  289. response = self.client.post(url, json.dumps(data), content_type='application/json')
  290. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  291. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  292. url = reverse('rrset', args=(self.otherDomains[0].name, '', 'A',))
  293. # Try PATCH with empty records
  294. data = {'records': []}
  295. response = self.client.patch(url, json.dumps(data), content_type='application/json')
  296. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  297. # Try DELETE
  298. response = self.client.delete(url)
  299. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  300. def testPostCausesPdnsAPICall(self):
  301. httpretty.enable()
  302. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
  303. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + './notify')
  304. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  305. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  306. response = self.client.post(url, json.dumps(data), content_type='application/json')
  307. result = json.loads(httpretty.httpretty.latest_requests[-2].parsed_body)
  308. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  309. self.assertEqual(result['rrsets'][0]['records'][0]['content'], '1.2.3.4')
  310. def testDeleteCausesPdnsAPICall(self):
  311. httpretty.enable()
  312. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.')
  313. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + './notify')
  314. # Create record, should cause a pdns PATCH request and a notify
  315. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  316. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  317. response = self.client.post(url, json.dumps(data), content_type='application/json')
  318. # Delete record, should cause a pdns PATCH request and a notify
  319. url = reverse('rrset', args=(self.ownedDomains[1].name, '', 'A',))
  320. response = self.client.delete(url)
  321. self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
  322. # Check pdns requests from creation
  323. result = json.loads(httpretty.httpretty.latest_requests[-4].parsed_body)
  324. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  325. self.assertEqual(result['rrsets'][0]['records'][0]['content'], '1.2.3.4')
  326. self.assertEqual(httpretty.httpretty.latest_requests[-3].method, 'PUT')
  327. # Check pdns requests from deletion
  328. result = json.loads(httpretty.httpretty.latest_requests[-2].parsed_body)
  329. self.assertEqual(result['rrsets'][0]['name'], self.ownedDomains[1].name + '.')
  330. self.assertEqual(result['rrsets'][0]['records'], [])
  331. self.assertEqual(httpretty.httpretty.latest_requests[-1].method, 'PUT')
  332. def testImportRRsets(self):
  333. url = reverse('rrsets', args=(self.ownedDomains[1].name,))
  334. data = {'records': ['1.2.3.4'], 'ttl': 60, 'type': 'A'}
  335. response = self.client.post(url, json.dumps(data), content_type='application/json')
  336. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  337. # Not checking anything here; errors will raise an exception
  338. call_command('sync-from-pdns', self.ownedDomains[1].name)