testdyndns12update.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. from django.urls import reverse
  2. from rest_framework import status
  3. from rest_framework.test import APITestCase
  4. from .utils import utils
  5. import base64
  6. import httpretty
  7. from django.conf import settings
  8. import json
  9. from django.utils import timezone
  10. from desecapi.exceptions import PdnsException
  11. class DynDNS12UpdateTest(APITestCase):
  12. owner = None
  13. token = None
  14. username = None
  15. password = None
  16. def setUp(self):
  17. self.owner = utils.createUser()
  18. self.token = utils.createToken(user=self.owner)
  19. self.domain = utils.generateDynDomainname()
  20. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  21. url = reverse('domain-list')
  22. data = {'name': self.domain}
  23. response = self.client.post(url, data)
  24. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  25. self.username = response.data['name']
  26. self.password = self.token
  27. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  28. httpretty.enable()
  29. httpretty.HTTPretty.allow_net_connect = False
  30. self.httpretty_reset_uris()
  31. def httpretty_reset_uris(self):
  32. httpretty.reset()
  33. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  34. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  35. httpretty.register_uri(httpretty.GET,
  36. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  37. body='{"rrsets": []}',
  38. content_type="application/json")
  39. httpretty.register_uri(httpretty.GET,
  40. settings.NSLORD_PDNS_API + '/zones/' + self.domain + './cryptokeys',
  41. body='[]',
  42. content_type="application/json")
  43. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify')
  44. def tearDown(self):
  45. httpretty.reset()
  46. httpretty.disable()
  47. def assertIP(self, ipv4=None, ipv6=None, name=None):
  48. old_credentials = self.client._credentials['HTTP_AUTHORIZATION']
  49. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.password)
  50. name = name or self.username
  51. def verify_response(type_, ip):
  52. url = reverse('rrset', args=(name, '', type_,))
  53. response = self.client.get(url)
  54. if ip is not None:
  55. self.assertEqual(response.data['records'][0], ip)
  56. self.assertEqual(response.status_code, status.HTTP_200_OK)
  57. self.assertEqual(response.data['ttl'], 60)
  58. else:
  59. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  60. verify_response('A', ipv4)
  61. verify_response('AAAA', ipv6)
  62. self.client.credentials(HTTP_AUTHORIZATION=old_credentials)
  63. def testDynDNS1UpdateDDClientSuccess(self):
  64. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  65. url = reverse('dyndns12update')
  66. response = self.client.get(url,
  67. {
  68. 'action': 'edit',
  69. 'started': 1,
  70. 'hostname': 'YES',
  71. 'host_id': self.username,
  72. 'myip': '10.1.2.3'
  73. })
  74. self.assertEqual(response.status_code, status.HTTP_200_OK)
  75. self.assertEqual(response.data, 'good')
  76. self.assertIP(ipv4='10.1.2.3')
  77. def testDynDNS1UpdateDDClientIPv6Success(self):
  78. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  79. url = reverse('dyndns12update')
  80. response = self.client.get(url,
  81. {
  82. 'action': 'edit',
  83. 'started': 1,
  84. 'hostname': 'YES',
  85. 'host_id': self.username,
  86. 'myipv6': '::1337'
  87. })
  88. self.assertEqual(response.status_code, status.HTTP_200_OK)
  89. self.assertEqual(response.data, 'good')
  90. self.assertIP(ipv4='127.0.0.1', ipv6='::1337')
  91. def testDynDNS2UpdateDDClientIPv4Success(self):
  92. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  93. url = reverse('dyndns12update')
  94. response = self.client.get(url,
  95. {
  96. 'system': 'dyndns',
  97. 'hostname': self.username,
  98. 'myip': '10.2.3.4'
  99. })
  100. self.assertEqual(response.status_code, status.HTTP_200_OK)
  101. self.assertEqual(response.data, 'good')
  102. self.assertIP(ipv4='10.2.3.4')
  103. def testDynDNS2UpdateDDClientIPv6Success(self):
  104. #/nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  105. url = reverse('dyndns12update')
  106. response = self.client.get(url,
  107. {
  108. 'system': 'dyndns',
  109. 'hostname': self.username,
  110. 'myipv6': '::1338'
  111. })
  112. self.assertEqual(response.status_code, status.HTTP_200_OK)
  113. self.assertEqual(response.data, 'good')
  114. self.assertIP(ipv4='127.0.0.1', ipv6='::1338')
  115. def testFritzBox(self):
  116. #/
  117. url = reverse('dyndns12update')
  118. response = self.client.get(url)
  119. self.assertEqual(response.status_code, status.HTTP_200_OK)
  120. self.assertEqual(response.data, 'good')
  121. self.assertIP(ipv4='127.0.0.1')
  122. def testUnsetIP(self):
  123. url = reverse('dyndns12update')
  124. def testVariant(params, **kwargs):
  125. response = self.client.get(url, params)
  126. self.assertEqual(response.status_code, status.HTTP_200_OK)
  127. self.assertEqual(response.data, 'good')
  128. self.assertIP(**kwargs)
  129. testVariant({'ipv6': '::1337'}, ipv4='127.0.0.1', ipv6='::1337')
  130. testVariant({'ipv6': '::1337', 'myip': ''}, ipv4=None, ipv6='::1337')
  131. testVariant({'ipv6': '', 'ip': '1.2.3.4'}, ipv4='1.2.3.4', ipv6=None)
  132. testVariant({'ipv6': '', 'myipv4': ''}, ipv4=None, ipv6=None)
  133. def testIdentificationByUsernameDomainname(self):
  134. # To force identification by the provided username (which is the domain name)
  135. # we add a second domain for the current user.
  136. name = 'second-' + self.domain
  137. httpretty.register_uri(httpretty.GET,
  138. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  139. body='{"rrsets": []}',
  140. content_type="application/json")
  141. httpretty.register_uri(httpretty.GET,
  142. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  143. body='[]',
  144. content_type="application/json")
  145. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + name + './notify', status=200)
  146. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  147. url = reverse('domain-list')
  148. response = self.client.post(url, {'name': name})
  149. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  150. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  151. url = reverse('dyndns12update')
  152. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  153. self.assertEqual(response.status_code, status.HTTP_200_OK)
  154. self.assertEqual(response.data, 'good')
  155. self.assertIP(ipv4='10.5.5.5')
  156. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + '.invalid:' + self.password).encode()).decode())
  157. url = reverse('dyndns12update')
  158. response = self.client.get(url, REMOTE_ADDR='10.5.5.5')
  159. self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
  160. def testIdentificationByTokenWithEmptyUser(self):
  161. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((':' + self.password).encode()).decode())
  162. url = reverse('dyndns12update')
  163. response = self.client.get(url, REMOTE_ADDR='10.5.5.6')
  164. self.assertEqual(response.status_code, status.HTTP_200_OK)
  165. self.assertEqual(response.data, 'good')
  166. self.assertIP(ipv4='10.5.5.6')
  167. # Now make sure we get a conflict when the user has multiple domains. Thus,
  168. # we add a second domain for the current user.
  169. name = 'second-' + self.domain
  170. httpretty.register_uri(httpretty.GET,
  171. settings.NSLORD_PDNS_API + '/zones/' + name + '.',
  172. body='{"rrsets": []}',
  173. content_type="application/json")
  174. httpretty.register_uri(httpretty.GET,
  175. settings.NSLORD_PDNS_API + '/zones/' + name + './cryptokeys',
  176. body='[]',
  177. content_type="application/json")
  178. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + name + './notify', status=200)
  179. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  180. url = reverse('domain-list')
  181. response = self.client.post(url, {'name': name})
  182. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  183. url = reverse('dyndns12update')
  184. response = self.client.get(url, REMOTE_ADDR='10.5.5.7')
  185. self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
  186. def testManual(self):
  187. #/update?username=foobar.dedyn.io&password=secret
  188. self.client.credentials(HTTP_AUTHORIZATION='')
  189. url = reverse('dyndns12update')
  190. response = self.client.get(url,
  191. {
  192. 'username': self.username,
  193. 'password': self.token,
  194. })
  195. self.assertEqual(response.status_code, status.HTTP_200_OK)
  196. self.assertEqual(response.data, 'good')
  197. self.assertIP(ipv4='127.0.0.1')
  198. def testDeviantTTL(self):
  199. # The dynamic update will try to set the TTL to 60. Here, we create
  200. # a record with a different TTL beforehand and then make sure that
  201. # updates still work properly.
  202. url = reverse('rrsets', args=(self.domain,))
  203. data = {'records': ['127.0.0.1'], 'ttl': 3600, 'type': 'A'}
  204. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  205. response = self.client.post(url, json.dumps(data),
  206. content_type='application/json')
  207. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  208. self.httpretty_reset_uris()
  209. url = reverse('dyndns12update')
  210. self.client.credentials(HTTP_AUTHORIZATION='Basic ' + base64.b64encode((self.username + ':' + self.password).encode()).decode())
  211. response = self.client.get(url)
  212. self.assertEqual(httpretty.httpretty.latest_requests[-2].method, 'PATCH')
  213. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.').endswith(httpretty.httpretty.latest_requests[-2].path))
  214. self.assertEqual(response.status_code, status.HTTP_200_OK)
  215. self.assertEqual(response.data, 'good')
  216. self.assertIP(ipv4='127.0.0.1')
  217. def testSuspendedUpdates(self):
  218. self.owner.locked = timezone.now()
  219. self.owner.save()
  220. url = reverse('dyndns12update')
  221. response = self.client.get(url,
  222. {
  223. 'system': 'dyndns',
  224. 'hostname': self.domain,
  225. 'myip': '10.1.1.1'
  226. })
  227. self.assertEqual(response.status_code, status.HTTP_200_OK)
  228. self.assertIP(ipv4='10.1.1.1')
  229. httpretty.register_uri(httpretty.POST, settings.NSLORD_PDNS_API + '/zones')
  230. httpretty.register_uri(httpretty.POST,
  231. settings.NSLORD_PDNS_API + '/zones',
  232. body='{"error": "Domain \'%s.\' already exists"}' % self.domain,
  233. content_type="application/json", status=422)
  234. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  235. httpretty.register_uri(httpretty.GET,
  236. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  237. body='{"rrsets": []}',
  238. content_type="application/json")
  239. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  240. self.owner.unlock()
  241. self.assertEqual(httpretty.httpretty.latest_requests[-2].method, 'PATCH')
  242. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.').endswith(httpretty.httpretty.latest_requests[-2].path))
  243. self.assertTrue(self.domain in httpretty.httpretty.latest_requests[-2].parsed_body)
  244. self.assertTrue('10.1.1.1' in httpretty.httpretty.latest_requests[-2].parsed_body)
  245. def testSuspendedUpdatesDomainCreation(self):
  246. # Lock user
  247. self.owner.locked = timezone.now()
  248. self.owner.save()
  249. # While in locked state, create a domain and set some records
  250. url = reverse('domain-list')
  251. newdomain = utils.generateDynDomainname()
  252. data = {'name': newdomain}
  253. self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token)
  254. httpretty.register_uri(httpretty.GET,
  255. settings.NSLORD_PDNS_API + '/zones/' + newdomain + './cryptokeys',
  256. body='[]',
  257. content_type="application/json")
  258. response = self.client.post(url, data)
  259. self.assertEqual(response.status_code, status.HTTP_201_CREATED)
  260. url = reverse('dyndns12update')
  261. response = self.client.get(url,
  262. {
  263. 'system': 'dyndns',
  264. 'hostname': newdomain,
  265. 'myip': '10.2.2.2'
  266. })
  267. self.assertEqual(response.status_code, status.HTTP_200_OK)
  268. self.assertIP(name=newdomain, ipv4='10.2.2.2')
  269. # See what happens upon unlock if pdns knows this domain already
  270. httpretty.register_uri(httpretty.POST,
  271. settings.NSLORD_PDNS_API + '/zones',
  272. body='{"error": "Domain \'' + newdomain + '.\' already exists"}',
  273. status=422)
  274. with self.assertRaises(PdnsException) as cm:
  275. self.owner.unlock()
  276. self.assertEqual(str(cm.exception),
  277. "Domain '" + newdomain + ".' already exists")
  278. # See what happens upon unlock if this domain is new to pdns
  279. httpretty.register_uri(httpretty.POST,
  280. settings.NSLORD_PDNS_API + '/zones')
  281. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.')
  282. httpretty.register_uri(httpretty.GET,
  283. settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.',
  284. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  285. content_type="application/json")
  286. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + newdomain + './notify', status=200)
  287. httpretty.register_uri(httpretty.PATCH, settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.')
  288. httpretty.register_uri(httpretty.GET,
  289. settings.NSLORD_PDNS_API + '/zones/' + self.domain + '.',
  290. body='{"rrsets": [{"comments": [], "name": "%s.", "records": [ { "content": "ns1.desec.io.", "disabled": false }, { "content": "ns2.desec.io.", "disabled": false } ], "ttl": 60, "type": "NS"}]}' % self.domain,
  291. content_type="application/json")
  292. httpretty.register_uri(httpretty.PUT, settings.NSLORD_PDNS_API + '/zones/' + self.domain + './notify', status=200)
  293. self.owner.unlock()
  294. self.assertEqual(httpretty.httpretty.latest_requests[-5].method, 'POST')
  295. self.assertTrue((settings.NSLORD_PDNS_API + '/zones').endswith(httpretty.httpretty.latest_requests[-5].path))
  296. self.assertEqual(httpretty.httpretty.latest_requests[-3].method, 'PATCH')
  297. self.assertTrue((settings.NSLORD_PDNS_API + '/zones/' + newdomain + '.').endswith(httpretty.httpretty.latest_requests[-3].path))
  298. self.assertTrue('10.2.2.2' in httpretty.httpretty.latest_requests[-3].parsed_body)