test_dyndns12update.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import random
  2. from rest_framework import status
  3. from desecapi.tests.base import DynDomainOwnerTestCase
  4. class DynDNS12UpdateTest(DynDomainOwnerTestCase):
  5. def assertIP(self, ipv4=None, ipv6=None, name=None, subname=""):
  6. name = name or self.my_domain.name.lower()
  7. for type_, value in [("A", ipv4), ("AAAA", ipv6)]:
  8. url = self.reverse("v1:rrset", name=name, subname=subname, type=type_)
  9. response = self.client_token_authorized.get(url)
  10. if value:
  11. self.assertStatus(response, status.HTTP_200_OK)
  12. self.assertEqual(response.data["records"][0], value)
  13. self.assertEqual(response.data["ttl"], 60)
  14. else:
  15. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  16. def test_identification_by_domain_name(self):
  17. self.client.set_credentials_basic_auth(
  18. self.my_domain.name + ".invalid", self.token.plain
  19. )
  20. response = self.assertDynDNS12NoUpdate(mock_remote_addr="10.5.5.6")
  21. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  22. def test_identification_by_query_params(self):
  23. # /update?username=foobar.dedyn.io&password=secret
  24. self.client.set_credentials_basic_auth(None, None)
  25. response = self.assertDynDNS12Update(
  26. username=self.my_domain.name, password=self.token.plain
  27. )
  28. self.assertStatus(response, status.HTTP_200_OK)
  29. self.assertEqual(response.data, "good")
  30. self.assertEqual(response.content_type, "text/plain")
  31. self.assertIP(ipv4="127.0.0.1")
  32. def test_identification_by_query_params_with_subdomain(self):
  33. # /update?username=baz.foobar.dedyn.io&password=secret
  34. self.client.set_credentials_basic_auth(None, None)
  35. response = self.assertDynDNS12NoUpdate(
  36. username="baz", password=self.token.plain
  37. )
  38. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  39. self.assertEqual(response.content, b"badauth")
  40. for subname in ["baz", "*.baz"]:
  41. response = self.assertDynDNS12Update(
  42. username=f"{subname}.{self.my_domain.name}", password=self.token.plain
  43. )
  44. self.assertStatus(response, status.HTTP_200_OK)
  45. self.assertEqual(response.data, "good")
  46. self.assertIP(ipv4="127.0.0.1", subname=subname)
  47. def test_deviant_ttl(self):
  48. """
  49. The dynamic update will try to set the TTL to 60. Here, we create
  50. a record with a different TTL beforehand and then make sure that
  51. updates still work properly.
  52. """
  53. with self.assertPdnsRequests(
  54. self.request_pdns_zone_update(self.my_domain.name),
  55. self.request_pdns_zone_axfr(self.my_domain.name),
  56. ):
  57. response = self.client_token_authorized.patch_rr_set(
  58. self.my_domain.name.lower(), "", "A", {"ttl": 3600}
  59. )
  60. self.assertStatus(response, status.HTTP_200_OK)
  61. response = self.assertDynDNS12Update(self.my_domain.name)
  62. self.assertStatus(response, status.HTTP_200_OK)
  63. self.assertEqual(response.data, "good")
  64. self.assertIP(ipv4="127.0.0.1")
  65. def test_ddclient_dyndns1_v4_success(self):
  66. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  67. with self.assertPdnsRequests(
  68. self.request_pdns_zone_update(self.my_domain.name),
  69. self.request_pdns_zone_axfr(self.my_domain.name),
  70. ):
  71. response = self.client.get(
  72. self.reverse("v1:dyndns12update"),
  73. {
  74. "action": "edit",
  75. "started": 1,
  76. "hostname": "YES",
  77. "host_id": self.my_domain.name,
  78. "myip": "10.1.2.3",
  79. },
  80. )
  81. self.assertStatus(response, status.HTTP_200_OK)
  82. self.assertEqual(response.data, "good")
  83. self.assertIP(ipv4="10.1.2.3")
  84. # Repeat and make sure that no pdns request is made (not even for the empty AAAA record)
  85. response = self.client.get(
  86. self.reverse("v1:dyndns12update"),
  87. {
  88. "action": "edit",
  89. "started": 1,
  90. "hostname": "YES",
  91. "host_id": self.my_domain.name,
  92. "myip": "10.1.2.3",
  93. },
  94. )
  95. self.assertStatus(response, status.HTTP_200_OK)
  96. def test_ddclient_dyndns1_v6_success(self):
  97. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  98. response = self.assertDynDNS12Update(
  99. domain_name=self.my_domain.name,
  100. action="edit",
  101. started=1,
  102. hostname="YES",
  103. host_id=self.my_domain.name,
  104. myipv6="::1337",
  105. )
  106. self.assertStatus(response, status.HTTP_200_OK)
  107. self.assertEqual(response.data, "good")
  108. self.assertIP(ipv4="127.0.0.1", ipv6="::1337")
  109. # Repeat and make sure that no pdns request is made (not even for the empty A record)
  110. response = self.client.get(
  111. self.reverse("v1:dyndns12update"),
  112. {
  113. "domain_name": self.my_domain.name,
  114. "action": "edit",
  115. "started": 1,
  116. "hostname": "YES",
  117. "host_id": self.my_domain.name,
  118. "myipv6": "::1337",
  119. },
  120. )
  121. self.assertStatus(response, status.HTTP_200_OK)
  122. def test_ddclient_dyndns2_v4_success(self):
  123. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  124. response = self.assertDynDNS12Update(
  125. domain_name=self.my_domain.name,
  126. system="dyndns",
  127. hostname=self.my_domain.name,
  128. myip="10.2.3.4",
  129. )
  130. self.assertStatus(response, status.HTTP_200_OK)
  131. self.assertEqual(response.data, "good")
  132. self.assertIP(ipv4="10.2.3.4")
  133. def test_ddclient_dyndns2_v4_invalid(self):
  134. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4asdf
  135. params = {
  136. "domain_name": self.my_domain.name,
  137. "system": "dyndns",
  138. "hostname": self.my_domain.name,
  139. "myip": "10.2.3.4asdf",
  140. }
  141. response = self.client.get(self.reverse("v1:dyndns12update"), params)
  142. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  143. self.assertIn("malformed", str(response.data))
  144. def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
  145. # /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
  146. for name in [
  147. self.owner.email,
  148. self.other_domain.name,
  149. self.my_domain.parent_domain_name,
  150. ]:
  151. response = self.assertDynDNS12NoUpdate(
  152. system="dyndns",
  153. hostname=name,
  154. myip="10.2.3.4",
  155. )
  156. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  157. self.assertEqual(response.content, b"nohost")
  158. def test_ddclient_dyndns2_v6_success(self):
  159. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  160. response = self.assertDynDNS12Update(
  161. domain_name=self.my_domain.name,
  162. system="dyndns",
  163. hostname=self.my_domain.name,
  164. myipv6="::666",
  165. )
  166. self.assertStatus(response, status.HTTP_200_OK)
  167. self.assertEqual(response.data, "good")
  168. self.assertIP(ipv4="127.0.0.1", ipv6="::666")
  169. def test_fritz_box(self):
  170. # /
  171. response = self.assertDynDNS12Update(self.my_domain.name)
  172. self.assertStatus(response, status.HTTP_200_OK)
  173. self.assertEqual(response.data, "good")
  174. self.assertIP(ipv4="127.0.0.1")
  175. def test_unset_ip(self):
  176. for (v4, v6) in [
  177. ("127.0.0.1", "::1"),
  178. ("127.0.0.1", ""),
  179. ("", "::1"),
  180. ("", ""),
  181. ]:
  182. response = self.assertDynDNS12Update(self.my_domain.name, ip=v4, ipv6=v6)
  183. self.assertStatus(response, status.HTTP_200_OK)
  184. self.assertEqual(response.data, "good")
  185. self.assertIP(ipv4=v4, ipv6=v6)
  186. class SingleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  187. NUM_OWNED_DOMAINS = 1
  188. def test_identification_by_token(self):
  189. self.client.set_credentials_basic_auth("", self.token.plain)
  190. response = self.assertDynDNS12Update(
  191. self.my_domain.name, mock_remote_addr="10.5.5.6"
  192. )
  193. self.assertStatus(response, status.HTTP_200_OK)
  194. self.assertEqual(response.data, "good")
  195. self.assertIP(ipv4="10.5.5.6")
  196. def test_identification_by_email(self):
  197. self.client.set_credentials_basic_auth(self.owner.email, self.token.plain)
  198. response = self.assertDynDNS12Update(
  199. self.my_domain.name, mock_remote_addr="10.5.5.6"
  200. )
  201. self.assertStatus(response, status.HTTP_200_OK)
  202. self.assertEqual(response.data, "good")
  203. self.assertIP(ipv4="10.5.5.6")
  204. class MultipleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  205. NUM_OWNED_DOMAINS = 4
  206. def test_ignore_minimum_ttl(self):
  207. self.my_domain.minimum_ttl = 61
  208. self.my_domain.save()
  209. # Test that dynDNS updates work both under a local public suffix (self.my_domain) and for a custom domains
  210. for domain in [self.my_domain, self.create_domain(owner=self.owner)]:
  211. self.assertGreater(domain.minimum_ttl, 60)
  212. self.client.set_credentials_basic_auth(
  213. domain.name.lower(), self.token.plain
  214. )
  215. response = self.assertDynDNS12Update(domain.name)
  216. self.assertStatus(response, status.HTTP_200_OK)
  217. self.assertEqual(domain.rrset_set.get(subname="", type="A").ttl, 60)
  218. def test_identification_by_token(self):
  219. """
  220. Test if the conflict of having multiple domains, but not specifying which to update is correctly recognized.
  221. """
  222. self.client.set_credentials_basic_auth("", self.token.plain)
  223. response = self.client.get(
  224. self.reverse("v1:dyndns12update"), REMOTE_ADDR="10.5.5.7"
  225. )
  226. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  227. class MixedCaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  228. @staticmethod
  229. def random_casing(s):
  230. return "".join(
  231. [c.lower() if random.choice([True, False]) else c.upper() for c in s]
  232. )
  233. def setUp(self):
  234. super().setUp()
  235. self.my_domain.name = self.random_casing(self.my_domain.name)
  236. class UppercaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  237. def setUp(self):
  238. super().setUp()
  239. self.my_domain.name = self.my_domain.name.upper()