test_dyndns12update.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import random
  2. from rest_framework import status
  3. from desecapi.tests.base import DynDomainOwnerTestCase
  4. class DynDNS12UpdateTest(DynDomainOwnerTestCase):
  5. def assertIP(self, ipv4=None, ipv6=None, name=None, subname=''):
  6. name = name or self.my_domain.name.lower()
  7. for type_, value in [('A', ipv4), ('AAAA', ipv6)]:
  8. url = self.reverse('v1:rrset', name=name, subname=subname, type=type_)
  9. response = self.client_token_authorized.get(url)
  10. if value:
  11. self.assertStatus(response, status.HTTP_200_OK)
  12. self.assertEqual(response.data['records'][0], value)
  13. self.assertEqual(response.data['ttl'], 60)
  14. else:
  15. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  16. def test_identification_by_domain_name(self):
  17. self.client.set_credentials_basic_auth(self.my_domain.name + '.invalid', self.token.plain)
  18. response = self.assertDynDNS12NoUpdate(mock_remote_addr='10.5.5.6')
  19. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  20. def test_identification_by_query_params(self):
  21. # /update?username=foobar.dedyn.io&password=secret
  22. self.client.set_credentials_basic_auth(None, None)
  23. response = self.assertDynDNS12Update(username=self.my_domain.name, password=self.token.plain)
  24. self.assertStatus(response, status.HTTP_200_OK)
  25. self.assertEqual(response.data, 'good')
  26. self.assertEqual(response.content_type, 'text/plain')
  27. self.assertIP(ipv4='127.0.0.1')
  28. def test_identification_by_query_params_with_subdomain(self):
  29. # /update?username=baz.foobar.dedyn.io&password=secret
  30. self.client.set_credentials_basic_auth(None, None)
  31. response = self.assertDynDNS12NoUpdate(username='baz', password=self.token.plain)
  32. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  33. self.assertEqual(response.content, b'badauth')
  34. response = self.assertDynDNS12Update(username=f'baz.{self.my_domain.name}', password=self.token.plain)
  35. self.assertStatus(response, status.HTTP_200_OK)
  36. self.assertEqual(response.data, 'good')
  37. self.assertIP(ipv4='127.0.0.1', subname='baz')
  38. def test_deviant_ttl(self):
  39. """
  40. The dynamic update will try to set the TTL to 60. Here, we create
  41. a record with a different TTL beforehand and then make sure that
  42. updates still work properly.
  43. """
  44. with self.assertPdnsRequests(
  45. self.request_pdns_zone_update(self.my_domain.name),
  46. self.request_pdns_zone_axfr(self.my_domain.name),
  47. ):
  48. response = self.client_token_authorized.patch_rr_set(self.my_domain.name.lower(), '', 'A', {'ttl': 3600})
  49. self.assertStatus(response, status.HTTP_200_OK)
  50. response = self.assertDynDNS12Update(self.my_domain.name)
  51. self.assertStatus(response, status.HTTP_200_OK)
  52. self.assertEqual(response.data, 'good')
  53. self.assertIP(ipv4='127.0.0.1')
  54. def test_ddclient_dyndns1_v4_success(self):
  55. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  56. with self.assertPdnsRequests(
  57. self.request_pdns_zone_update(self.my_domain.name),
  58. self.request_pdns_zone_axfr(self.my_domain.name),
  59. ):
  60. response = self.client.get(
  61. self.reverse('v1:dyndns12update'),
  62. {
  63. 'action': 'edit',
  64. 'started': 1,
  65. 'hostname': 'YES',
  66. 'host_id': self.my_domain.name,
  67. 'myip': '10.1.2.3'
  68. }
  69. )
  70. self.assertStatus(response, status.HTTP_200_OK)
  71. self.assertEqual(response.data, 'good')
  72. self.assertIP(ipv4='10.1.2.3')
  73. # Repeat and make sure that no pdns request is made (not even for the empty AAAA record)
  74. response = self.client.get(
  75. self.reverse('v1:dyndns12update'),
  76. {
  77. 'action': 'edit',
  78. 'started': 1,
  79. 'hostname': 'YES',
  80. 'host_id': self.my_domain.name,
  81. 'myip': '10.1.2.3'
  82. }
  83. )
  84. self.assertStatus(response, status.HTTP_200_OK)
  85. def test_ddclient_dyndns1_v6_success(self):
  86. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  87. response = self.assertDynDNS12Update(
  88. domain_name=self.my_domain.name,
  89. action='edit',
  90. started=1,
  91. hostname='YES',
  92. host_id=self.my_domain.name,
  93. myipv6='::1337'
  94. )
  95. self.assertStatus(response, status.HTTP_200_OK)
  96. self.assertEqual(response.data, 'good')
  97. self.assertIP(ipv4='127.0.0.1', ipv6='::1337')
  98. # Repeat and make sure that no pdns request is made (not even for the empty A record)
  99. response = self.client.get(
  100. self.reverse('v1:dyndns12update'),
  101. {
  102. 'domain_name': self.my_domain.name,
  103. 'action': 'edit',
  104. 'started': 1,
  105. 'hostname': 'YES',
  106. 'host_id': self.my_domain.name,
  107. 'myipv6': '::1337'
  108. }
  109. )
  110. self.assertStatus(response, status.HTTP_200_OK)
  111. def test_ddclient_dyndns2_v4_success(self):
  112. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  113. response = self.assertDynDNS12Update(
  114. domain_name=self.my_domain.name,
  115. system='dyndns',
  116. hostname=self.my_domain.name,
  117. myip='10.2.3.4',
  118. )
  119. self.assertStatus(response, status.HTTP_200_OK)
  120. self.assertEqual(response.data, 'good')
  121. self.assertIP(ipv4='10.2.3.4')
  122. def test_ddclient_dyndns2_v4_invalid(self):
  123. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4asdf
  124. params = {
  125. 'domain_name': self.my_domain.name,
  126. 'system': 'dyndns',
  127. 'hostname': self.my_domain.name,
  128. 'myip': '10.2.3.4asdf',
  129. }
  130. response = self.client.get(self.reverse('v1:dyndns12update'), params)
  131. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  132. self.assertIn('malformed', str(response.data))
  133. def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
  134. # /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
  135. for name in [self.owner.email, self.other_domain.name, self.my_domain.parent_domain_name]:
  136. response = self.assertDynDNS12NoUpdate(
  137. system='dyndns',
  138. hostname=name,
  139. myip='10.2.3.4',
  140. )
  141. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  142. self.assertEqual(response.content, b'nohost')
  143. def test_ddclient_dyndns2_v6_success(self):
  144. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  145. response = self.assertDynDNS12Update(
  146. domain_name=self.my_domain.name,
  147. system='dyndns',
  148. hostname=self.my_domain.name,
  149. myipv6='::666',
  150. )
  151. self.assertStatus(response, status.HTTP_200_OK)
  152. self.assertEqual(response.data, 'good')
  153. self.assertIP(ipv4='127.0.0.1', ipv6='::666')
  154. def test_fritz_box(self):
  155. # /
  156. response = self.assertDynDNS12Update(self.my_domain.name)
  157. self.assertStatus(response, status.HTTP_200_OK)
  158. self.assertEqual(response.data, 'good')
  159. self.assertIP(ipv4='127.0.0.1')
  160. def test_unset_ip(self):
  161. for (v4, v6) in [
  162. ('127.0.0.1', '::1'),
  163. ('127.0.0.1', ''),
  164. ('', '::1'),
  165. ('', ''),
  166. ]:
  167. response = self.assertDynDNS12Update(self.my_domain.name, ip=v4, ipv6=v6)
  168. self.assertStatus(response, status.HTTP_200_OK)
  169. self.assertEqual(response.data, 'good')
  170. self.assertIP(ipv4=v4, ipv6=v6)
  171. class SingleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  172. NUM_OWNED_DOMAINS = 1
  173. def test_identification_by_token(self):
  174. self.client.set_credentials_basic_auth('', self.token.plain)
  175. response = self.assertDynDNS12Update(self.my_domain.name, mock_remote_addr='10.5.5.6')
  176. self.assertStatus(response, status.HTTP_200_OK)
  177. self.assertEqual(response.data, 'good')
  178. self.assertIP(ipv4='10.5.5.6')
  179. def test_identification_by_email(self):
  180. self.client.set_credentials_basic_auth(self.owner.email, self.token.plain)
  181. response = self.assertDynDNS12Update(self.my_domain.name, mock_remote_addr='10.5.5.6')
  182. self.assertStatus(response, status.HTTP_200_OK)
  183. self.assertEqual(response.data, 'good')
  184. self.assertIP(ipv4='10.5.5.6')
  185. class MultipleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  186. NUM_OWNED_DOMAINS = 4
  187. def test_ignore_minimum_ttl(self):
  188. self.my_domain.minimum_ttl = 61
  189. self.my_domain.save()
  190. # Test that dynDNS updates work both under a local public suffix (self.my_domain) and for a custom domains
  191. for domain in [self.my_domain, self.create_domain(owner=self.owner)]:
  192. self.assertGreater(domain.minimum_ttl, 60)
  193. self.client.set_credentials_basic_auth(domain.name.lower(), self.token.plain)
  194. response = self.assertDynDNS12Update(domain.name)
  195. self.assertStatus(response, status.HTTP_200_OK)
  196. self.assertEqual(domain.rrset_set.get(subname='', type='A').ttl, 60)
  197. def test_identification_by_token(self):
  198. """
  199. Test if the conflict of having multiple domains, but not specifying which to update is correctly recognized.
  200. """
  201. self.client.set_credentials_basic_auth('', self.token.plain)
  202. response = self.client.get(self.reverse('v1:dyndns12update'), REMOTE_ADDR='10.5.5.7')
  203. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  204. class MixedCaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  205. @staticmethod
  206. def random_casing(s):
  207. return ''.join([c.lower() if random.choice([True, False]) else c.upper() for c in s])
  208. def setUp(self):
  209. super().setUp()
  210. self.my_domain.name = self.random_casing(self.my_domain.name)
  211. class UppercaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  212. def setUp(self):
  213. super().setUp()
  214. self.my_domain.name = self.my_domain.name.upper()