testdyndns12update.py 14 KB


  1. from django.core.urlresolvers import reverse
  2. from rest_framework import status
  3. from rest_framework.test import APITestCase
  4. from .utils import utils
  5. from django.db import transaction
  6. import base64
  7. import httpretty
  8. from django.conf import settings
  9. class DynDNS12UpdateTest(APITestCase):
  10. owner = None
  11. token = None
  12. username = None
  13. password = None
  14. def setUp(self):
  15. self.owner = utils.createUser()
  16. self.token = utils.createToken(user=self.owner)
  17. self.domain = utils.generateDynDomainname()
  18. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  19. url = reverse('domain-list')
  20. data = {'name': self.domain}
  21. response = self.client.post(url, data)
  22. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  23. self.username = response.data['name']
  24. self.password = self.token
  25. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  26. httpretty.enable()
  27. httpretty.HTTPretty.allow_net_connect = False
  28. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  29. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  30. httpretty.register_uri(httpretty.GET,
  31. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  32. body='{"rrsets": []}',
  33. content_type="application/json")
  34. httpretty.register_uri(httpretty.GET,
  35. settings.NSLORD_PDNS_API + '/zones/' + self.domain + './cryptokeys',
  36. body='[]',
  37. content_type="application/json")
  38. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify')
  39. def tearDown(self):
  40. httpretty.reset()
  41. httpretty.disable()
  42. def assertIP(self, ipv4=None, ipv6=None, name=None):
  43. old_credentials = self.client._credentials['HTTP_AUTHORIZATION']
  44. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.password)
  45. name = name or self.username
  46. if ipv4 is not None:
  47. url = reverse('rrset', args=(name, '', 'A',))
  48. response = self.client.get(url)
  49. self.assertEqual(response.data['records'][0], ipv4)
  50. if ipv6 is not None:
  51. url = reverse('rrset', args=(name, '', 'AAAA',))
  52. response = self.client.get(url)
  53. self.assertEqual(response.data['records'][0], ipv6)
  54. self.assertEqual(response.status_code, status.HTTP_200_OK)
  55. self.assertEqual(response.data['ttl'], 60)
  56. self.client.credentials(HTTP_AUTHORIZATION=old_credentials)
  57. def testDynDNS1UpdateDDClientSuccess(self):
  58. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  59. url = reverse('dyndns12update')
  60. response = self.client.get(url,
  61. {
  62. 'action': 'edit',
  63. 'started': 1,
  64. 'hostname': 'YES',
  65. 'host_id': self.username,
  66. 'myip': '10.1.2.3'
  67. })
  68. self.assertEqual(response.status_code, status.HTTP_200_OK)
  69. self.assertEqual(response.data, 'good')
  70. self.assertIP(ipv4='10.1.2.3')
  71. def testDynDNS1UpdateDDClientIPv6Success(self):
  72. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  73. url = reverse('dyndns12update')
  74. response = self.client.get(url,
  75. {
  76. 'action': 'edit',
  77. 'started': 1,
  78. 'hostname': 'YES',
  79. 'host_id': self.username,
  80. 'myipv6': '::1337'
  81. })
  82. self.assertEqual(response.status_code, status.HTTP_200_OK)
  83. self.assertEqual(response.data, 'good')
  84. self.assertIP(ipv6='::1337')
  85. def testDynDNS2UpdateDDClientIPv4Success(self):
  86. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  87. url = reverse('dyndns12update')
  88. response = self.client.get(url,
  89. {
  90. 'system': 'dyndns',
  91. 'hostname': self.username,
  92. 'myip': '10.2.3.4'
  93. })
  94. self.assertEqual(response.status_code, status.HTTP_200_OK)
  95. self.assertEqual(response.data, 'good')
  96. self.assertIP(ipv4='10.2.3.4')
  97. def testDynDNS2UpdateDDClientIPv6Success(self):
  98. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  99. url = reverse('dyndns12update')
  100. response = self.client.get(url,
  101. {
  102. 'system': 'dyndns',
  103. 'hostname': self.username,
  104. 'myipv6': '::1338'
  105. })
  106. self.assertEqual(response.status_code, status.HTTP_200_OK)
  107. self.assertEqual(response.data, 'good')
  108. self.assertIP(ipv6='::1338')
  109. def testFritzBoxIPv6(self):
  110. #/
  111. url = reverse('dyndns12update')
  112. response = self.client.get(url)
  113. self.assertEqual(response.status_code, status.HTTP_200_OK)
  114. self.assertEqual(response.data, 'good')
  115. self.assertIP(ipv4='127.0.0.1')
  116. def testIdentificationByUsernameDomainname(self):
  117. # To force identification by the provided username (which is the domain name)
  118. # we add a second domain for the current user.
  119. name = 'second-' + self.domain
  120. httpretty.register_uri(httpretty.GET,
  121. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  122. body='{"rrsets": []}',
  123. content_type="application/json")
  124. httpretty.register_uri(httpretty.GET,
  125. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  126. body='[]',
  127. content_type="application/json")
  128. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  129. url = reverse('domain-list')
  130. response = self.client.post(url, {'name': name})
  131. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  132. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  133. url = reverse('dyndns12update')
  134. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  135. self.assertEqual(response.status_code, status.HTTP_200_OK)
  136. self.assertEqual(response.data, 'good')
  137. self.assertIP(ipv4='10.5.5.5')
  138. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + '.invalid:' + self.password).encode()).decode())
  139. url = reverse('dyndns12update')
  140. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  141. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  142. def testIdentificationByTokenWithEmptyUser(self):
  143. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((':' + self.password).encode()).decode())
  144. url = reverse('dyndns12update')
  145. response = self.client.get(url, REMOTE_ADDR='10.5.5.6')
  146. self.assertEqual(response.status_code, status.HTTP_200_OK)
  147. self.assertEqual(response.data, 'good')
  148. self.assertIP(ipv4='10.5.5.6')
  149. # Now make sure we get a conflict when the user has multiple domains. Thus,
  150. # we add a second domain for the current user.
  151. name = 'second-' + self.domain
  152. httpretty.register_uri(httpretty.GET,
  153. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  154. body='{"rrsets": []}',
  155. content_type="application/json")
  156. httpretty.register_uri(httpretty.GET,
  157. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  158. body='[]',
  159. content_type="application/json")
  160. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  161. url = reverse('domain-list')
  162. response = self.client.post(url, {'name': name})
  163. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  164. url = reverse('dyndns12update')
  165. response = self.client.get(url, REMOTE_ADDR='10.5.5.7')
  166. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  167. def testManualIPv6(self):
  168. #/update?username=foobar.dedyn.io&password=secret
  169. self.client.credentials(HTTP_AUTHORIZATION='')
  170. url = reverse('dyndns12update')
  171. response = self.client.get(url,
  172. {
  173. 'username': self.username,
  174. 'password': self.token,
  175. })
  176. self.assertEqual(response.status_code, status.HTTP_200_OK)
  177. self.assertEqual(response.data, 'good')
  178. self.assertIP(ipv4='127.0.0.1')
  179. def testSuspendedUpdates(self):
  180. self.owner.captcha_required = True
  181. self.owner.save()
  182. url = reverse('dyndns12update')
  183. response = self.client.get(url,
  184. {
  185. 'system': 'dyndns',
  186. 'hostname': self.domain,
  187. 'myip': '10.1.1.1'
  188. })
  189. self.assertEqual(response.status_code, status.HTTP_200_OK)
  190. self.assertIP(ipv4='10.1.1.1')
  191. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  192. httpretty.register_uri(httpretty.POST,
  193. settings.NSLORD_PDNS_API + '/zones',
  194. body='{"error": "Domain \'%s.\' already exists"}' % self.domain,
  195. content_type="application/json", status=422)
  196. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  197. httpretty.register_uri(httpretty.GET,
  198. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  199. body='{"rrsets": []}',
  200. content_type="application/json")
  201. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  202. self.owner.unlock()
  203. self.assertEqual(httpretty.httpretty.latest_requests[-2].method, 'PATCH')
  204. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.').endswith(httpretty.httpretty.latest_requests[-2].path))
  205. self.assertTrue(self.domain in httpretty.httpretty.latest_requests[-2].parsed_body)
  206. self.assertTrue('10.1.1.1' in httpretty.httpretty.latest_requests[-2].parsed_body)
  207. def testSuspendedUpdatesDomainCreation(self):
  208. self.owner.captcha_required = True
  209. self.owner.save()
  210. url = reverse('domain-list')
  211. newdomain = utils.generateDynDomainname()
  212. data = {'name': newdomain}
  213. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  214. response = self.client.post(url, data)
  215. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  216. url = reverse('dyndns12update')
  217. response = self.client.get(url,
  218. {
  219. 'system': 'dyndns',
  220. 'hostname': newdomain,
  221. 'myip': '10.2.2.2'
  222. })
  223. self.assertEqual(response.status_code, status.HTTP_200_OK)
  224. self.assertIP(name=newdomain, ipv4='10.2.2.2')
  225. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  226. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.')
  227. httpretty.register_uri(httpretty.GET,
  228. settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.',
  229. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  230. content_type="application/json")
  231. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + newdomain + './notify', status=200)
  232. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  233. httpretty.register_uri(httpretty.GET,
  234. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  235. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  236. content_type="application/json")
  237. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  238. self.owner.unlock()
  239. self.assertEqual(httpretty.httpretty.latest_requests[-3].method, 'PATCH')
  240. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.').endswith(httpretty.httpretty.latest_requests[-3].path))
  241. self.assertTrue('10.2.2.2' in httpretty.httpretty.latest_requests[-3].parsed_body)