testdyndns12update.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. from django.core.urlresolvers import reverse
  2. from rest_framework import status
  3. from rest_framework.test import APITestCase
  4. from .utils import utils
  5. from django.db import transaction
  6. import base64
  7. import httpretty
  8. from django.conf import settings
  9. class DynDNS12UpdateTest(APITestCase):
  10. owner = None
  11. token = None
  12. username = None
  13. password = None
  14. def setUp(self):
  15. self.owner = utils.createUser()
  16. self.token = utils.createToken(user=self.owner)
  17. self.domain = utils.generateDynDomainname()
  18. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  19. url = reverse('domain-list')
  20. data = {'name': self.domain}
  21. response = self.client.post(url, data)
  22. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  23. self.username = response.data['name']
  24. self.password = self.token
  25. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  26. httpretty.enable()
  27. httpretty.HTTPretty.allow_net_connect = False
  28. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  29. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  30. httpretty.register_uri(httpretty.GET,
  31. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  32. body='{"rrsets": []}',
  33. content_type="application/json")
  34. httpretty.register_uri(httpretty.GET,
  35. settings.NSLORD_PDNS_API + '/zones/' + self.domain + './cryptokeys',
  36. body='[]',
  37. content_type="application/json")
  38. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify')
  39. def tearDown(self):
  40. httpretty.reset()
  41. httpretty.disable()
  42. def assertIP(self, ipv4=None, ipv6=None, name=None):
  43. old_credentials = self.client._credentials['HTTP_AUTHORIZATION']
  44. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.password)
  45. name = name or self.username
  46. def verify_response(type_, ip):
  47. url = reverse('rrset', args=(name, '', type_,))
  48. response = self.client.get(url)
  49. if ip is not None:
  50. self.assertEqual(response.data['records'][0], ip)
  51. self.assertEqual(response.status_code, status.HTTP_200_OK)
  52. self.assertEqual(response.data['ttl'], 60)
  53. else:
  54. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  55. verify_response('A', ipv4)
  56. verify_response('AAAA', ipv6)
  57. self.client.credentials(HTTP_AUTHORIZATION=old_credentials)
  58. def testDynDNS1UpdateDDClientSuccess(self):
  59. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  60. url = reverse('dyndns12update')
  61. response = self.client.get(url,
  62. {
  63. 'action': 'edit',
  64. 'started': 1,
  65. 'hostname': 'YES',
  66. 'host_id': self.username,
  67. 'myip': '10.1.2.3'
  68. })
  69. self.assertEqual(response.status_code, status.HTTP_200_OK)
  70. self.assertEqual(response.data, 'good')
  71. self.assertIP(ipv4='10.1.2.3')
  72. def testDynDNS1UpdateDDClientIPv6Success(self):
  73. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  74. url = reverse('dyndns12update')
  75. response = self.client.get(url,
  76. {
  77. 'action': 'edit',
  78. 'started': 1,
  79. 'hostname': 'YES',
  80. 'host_id': self.username,
  81. 'myipv6': '::1337'
  82. })
  83. self.assertEqual(response.status_code, status.HTTP_200_OK)
  84. self.assertEqual(response.data, 'good')
  85. self.assertIP(ipv4='127.0.0.1', ipv6='::1337')
  86. def testDynDNS2UpdateDDClientIPv4Success(self):
  87. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  88. url = reverse('dyndns12update')
  89. response = self.client.get(url,
  90. {
  91. 'system': 'dyndns',
  92. 'hostname': self.username,
  93. 'myip': '10.2.3.4'
  94. })
  95. self.assertEqual(response.status_code, status.HTTP_200_OK)
  96. self.assertEqual(response.data, 'good')
  97. self.assertIP(ipv4='10.2.3.4')
  98. def testDynDNS2UpdateDDClientIPv6Success(self):
  99. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  100. url = reverse('dyndns12update')
  101. response = self.client.get(url,
  102. {
  103. 'system': 'dyndns',
  104. 'hostname': self.username,
  105. 'myipv6': '::1338'
  106. })
  107. self.assertEqual(response.status_code, status.HTTP_200_OK)
  108. self.assertEqual(response.data, 'good')
  109. self.assertIP(ipv4='127.0.0.1', ipv6='::1338')
  110. def testFritzBox(self):
  111. #/
  112. url = reverse('dyndns12update')
  113. response = self.client.get(url)
  114. self.assertEqual(response.status_code, status.HTTP_200_OK)
  115. self.assertEqual(response.data, 'good')
  116. self.assertIP(ipv4='127.0.0.1')
  117. def testUnsetIP(self):
  118. url = reverse('dyndns12update')
  119. def testVariant(params, **kwargs):
  120. response = self.client.get(url, params)
  121. self.assertEqual(response.status_code, status.HTTP_200_OK)
  122. self.assertEqual(response.data, 'good')
  123. self.assertIP(**kwargs)
  124. testVariant({'ipv6': '::1337'}, ipv4='127.0.0.1', ipv6='::1337')
  125. testVariant({'ipv6': '::1337', 'myip': ''}, ipv4=None, ipv6='::1337')
  126. testVariant({'ipv6': '', 'ip': '1.2.3.4'}, ipv4='1.2.3.4', ipv6=None)
  127. testVariant({'ipv6': '', 'myipv4': ''}, ipv4=None, ipv6=None)
  128. def testIdentificationByUsernameDomainname(self):
  129. # To force identification by the provided username (which is the domain name)
  130. # we add a second domain for the current user.
  131. name = 'second-' + self.domain
  132. httpretty.register_uri(httpretty.GET,
  133. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  134. body='{"rrsets": []}',
  135. content_type="application/json")
  136. httpretty.register_uri(httpretty.GET,
  137. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  138. body='[]',
  139. content_type="application/json")
  140. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  141. url = reverse('domain-list')
  142. response = self.client.post(url, {'name': name})
  143. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  144. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  145. url = reverse('dyndns12update')
  146. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  147. self.assertEqual(response.status_code, status.HTTP_200_OK)
  148. self.assertEqual(response.data, 'good')
  149. self.assertIP(ipv4='10.5.5.5')
  150. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + '.invalid:' + self.password).encode()).decode())
  151. url = reverse('dyndns12update')
  152. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  153. self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
  154. def testIdentificationByTokenWithEmptyUser(self):
  155. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((':' + self.password).encode()).decode())
  156. url = reverse('dyndns12update')
  157. response = self.client.get(url, REMOTE_ADDR='10.5.5.6')
  158. self.assertEqual(response.status_code, status.HTTP_200_OK)
  159. self.assertEqual(response.data, 'good')
  160. self.assertIP(ipv4='10.5.5.6')
  161. # Now make sure we get a conflict when the user has multiple domains. Thus,
  162. # we add a second domain for the current user.
  163. name = 'second-' + self.domain
  164. httpretty.register_uri(httpretty.GET,
  165. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  166. body='{"rrsets": []}',
  167. content_type="application/json")
  168. httpretty.register_uri(httpretty.GET,
  169. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  170. body='[]',
  171. content_type="application/json")
  172. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  173. url = reverse('domain-list')
  174. response = self.client.post(url, {'name': name})
  175. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  176. url = reverse('dyndns12update')
  177. response = self.client.get(url, REMOTE_ADDR='10.5.5.7')
  178. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  179. def testManual(self):
  180. #/update?username=foobar.dedyn.io&password=secret
  181. self.client.credentials(HTTP_AUTHORIZATION='')
  182. url = reverse('dyndns12update')
  183. response = self.client.get(url,
  184. {
  185. 'username': self.username,
  186. 'password': self.token,
  187. })
  188. self.assertEqual(response.status_code, status.HTTP_200_OK)
  189. self.assertEqual(response.data, 'good')
  190. self.assertIP(ipv4='127.0.0.1')
  191. def testSuspendedUpdates(self):
  192. self.owner.captcha_required = True
  193. self.owner.save()
  194. url = reverse('dyndns12update')
  195. response = self.client.get(url,
  196. {
  197. 'system': 'dyndns',
  198. 'hostname': self.domain,
  199. 'myip': '10.1.1.1'
  200. })
  201. self.assertEqual(response.status_code, status.HTTP_200_OK)
  202. self.assertIP(ipv4='10.1.1.1')
  203. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  204. httpretty.register_uri(httpretty.POST,
  205. settings.NSLORD_PDNS_API + '/zones',
  206. body='{"error": "Domain \'%s.\' already exists"}' % self.domain,
  207. content_type="application/json", status=422)
  208. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  209. httpretty.register_uri(httpretty.GET,
  210. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  211. body='{"rrsets": []}',
  212. content_type="application/json")
  213. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  214. self.owner.unlock()
  215. self.assertEqual(httpretty.httpretty.latest_requests[-2].method, 'PATCH')
  216. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.').endswith(httpretty.httpretty.latest_requests[-2].path))
  217. self.assertTrue(self.domain in httpretty.httpretty.latest_requests[-2].parsed_body)
  218. self.assertTrue('10.1.1.1' in httpretty.httpretty.latest_requests[-2].parsed_body)
  219. def testSuspendedUpdatesDomainCreation(self):
  220. self.owner.captcha_required = True
  221. self.owner.save()
  222. url = reverse('domain-list')
  223. newdomain = utils.generateDynDomainname()
  224. data = {'name': newdomain}
  225. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  226. httpretty.register_uri(httpretty.GET,
  227. settings.NSLORD_PDNS_API + '/zones/' + newdomain + './cryptokeys',
  228. body='[]',
  229. content_type="application/json")
  230. response = self.client.post(url, data)
  231. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  232. url = reverse('dyndns12update')
  233. response = self.client.get(url,
  234. {
  235. 'system': 'dyndns',
  236. 'hostname': newdomain,
  237. 'myip': '10.2.2.2'
  238. })
  239. self.assertEqual(response.status_code, status.HTTP_200_OK)
  240. self.assertIP(name=newdomain, ipv4='10.2.2.2')
  241. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  242. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.')
  243. httpretty.register_uri(httpretty.GET,
  244. settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.',
  245. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  246. content_type="application/json")
  247. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + newdomain + './notify', status=200)
  248. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  249. httpretty.register_uri(httpretty.GET,
  250. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  251. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  252. content_type="application/json")
  253. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  254. self.owner.unlock()
  255. self.assertEqual(httpretty.httpretty.latest_requests[-3].method, 'PATCH')
  256. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.').endswith(httpretty.httpretty.latest_requests[-3].path))
  257. self.assertTrue('10.2.2.2' in httpretty.httpretty.latest_requests[-3].parsed_body)