test_dyndns12update.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import random
  2. from rest_framework import status
  3. from desecapi.models import BlockedSubnet
  4. from desecapi.tests.base import DynDomainOwnerTestCase
  5. class DynDNS12UpdateTest(DynDomainOwnerTestCase):
  6. def assertIP(self, ipv4=None, ipv6=None, name=None, subname=""):
  7. name = name or self.my_domain.name.lower()
  8. for type_, value in [("A", ipv4), ("AAAA", ipv6)]:
  9. url = self.reverse("v1:rrset", name=name, subname=subname, type=type_)
  10. response = self.client_token_authorized.get(url)
  11. if value:
  12. if not isinstance(value, set):
  13. value = {value}
  14. self.assertStatus(response, status.HTTP_200_OK)
  15. self.assertEqual(set(response.data["records"]), value)
  16. self.assertEqual(response.data["ttl"], 60)
  17. else:
  18. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  19. def test_identification_by_domain_name(self):
  20. self.client.set_credentials_basic_auth(
  21. self.my_domain.name + ".invalid", self.token.plain
  22. )
  23. response = self.assertDynDNS12NoUpdate(mock_remote_addr="10.5.5.6")
  24. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  25. def test_identification_by_query_params(self):
  26. # /update?username=foobar.dedyn.io&password=secret
  27. self.client.set_credentials_basic_auth(None, None)
  28. response = self.assertDynDNS12Update(
  29. username=self.my_domain.name, password=self.token.plain
  30. )
  31. self.assertStatus(response, status.HTTP_200_OK)
  32. self.assertEqual(response.data, "good")
  33. self.assertEqual(response.content_type, "text/plain")
  34. self.assertIP(ipv4="127.0.0.1")
  35. def test_identification_by_query_params_with_subdomain(self):
  36. # /update?username=baz.foobar.dedyn.io&password=secret
  37. self.client.set_credentials_basic_auth(None, None)
  38. response = self.assertDynDNS12NoUpdate(
  39. username="baz", password=self.token.plain
  40. )
  41. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  42. self.assertEqual(response.content, b"badauth")
  43. for subname in ["baz", "*.baz"]:
  44. response = self.assertDynDNS12Update(
  45. username=f"{subname}.{self.my_domain.name}", password=self.token.plain
  46. )
  47. self.assertStatus(response, status.HTTP_200_OK)
  48. self.assertEqual(response.data, "good")
  49. self.assertIP(ipv4="127.0.0.1", subname=subname)
  50. def test_deviant_ttl(self):
  51. """
  52. The dynamic update will try to set the TTL to 60. Here, we create
  53. a record with a different TTL beforehand and then make sure that
  54. updates still work properly.
  55. """
  56. with self.assertPdnsRequests(
  57. self.request_pdns_zone_update(self.my_domain.name),
  58. self.request_pdns_zone_axfr(self.my_domain.name),
  59. ):
  60. response = self.client_token_authorized.patch_rr_set(
  61. self.my_domain.name.lower(), "", "A", {"ttl": 3600}
  62. )
  63. self.assertStatus(response, status.HTTP_200_OK)
  64. response = self.assertDynDNS12Update(self.my_domain.name)
  65. self.assertStatus(response, status.HTTP_200_OK)
  66. self.assertEqual(response.data, "good")
  67. self.assertIP(ipv4="127.0.0.1")
  68. def test_ddclient_dyndns1_v4_success(self):
  69. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  70. with self.assertPdnsRequests(
  71. self.request_pdns_zone_update(self.my_domain.name),
  72. self.request_pdns_zone_axfr(self.my_domain.name),
  73. ):
  74. response = self.client.get(
  75. self.reverse("v1:dyndns12update"),
  76. {
  77. "action": "edit",
  78. "started": 1,
  79. "hostname": "YES",
  80. "host_id": self.my_domain.name,
  81. "myip": "10.1.2.3",
  82. },
  83. )
  84. self.assertStatus(response, status.HTTP_200_OK)
  85. self.assertEqual(response.data, "good")
  86. self.assertIP(ipv4="10.1.2.3")
  87. # Repeat and make sure that no pdns request is made (not even for the empty AAAA record)
  88. response = self.client.get(
  89. self.reverse("v1:dyndns12update"),
  90. {
  91. "action": "edit",
  92. "started": 1,
  93. "hostname": "YES",
  94. "host_id": self.my_domain.name,
  95. "myip": "10.1.2.3",
  96. },
  97. )
  98. self.assertStatus(response, status.HTTP_200_OK)
  99. def test_ddclient_dyndns1_v6_success(self):
  100. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  101. response = self.assertDynDNS12Update(
  102. domain_name=self.my_domain.name,
  103. action="edit",
  104. started=1,
  105. hostname="YES",
  106. host_id=self.my_domain.name,
  107. myipv6="::1337",
  108. )
  109. self.assertStatus(response, status.HTTP_200_OK)
  110. self.assertEqual(response.data, "good")
  111. self.assertIP(ipv4="127.0.0.1", ipv6="::1337")
  112. # Repeat and make sure that no pdns request is made (not even for the empty A record)
  113. response = self.client.get(
  114. self.reverse("v1:dyndns12update"),
  115. {
  116. "domain_name": self.my_domain.name,
  117. "action": "edit",
  118. "started": 1,
  119. "hostname": "YES",
  120. "host_id": self.my_domain.name,
  121. "myipv6": "::1337",
  122. },
  123. )
  124. self.assertStatus(response, status.HTTP_200_OK)
  125. def test_ddclient_dyndns2_v4_success(self):
  126. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  127. response = self.assertDynDNS12Update(
  128. domain_name=self.my_domain.name,
  129. system="dyndns",
  130. hostname=self.my_domain.name,
  131. myip="10.2.3.4",
  132. )
  133. self.assertStatus(response, status.HTTP_200_OK)
  134. self.assertEqual(response.data, "good")
  135. self.assertIP(ipv4="10.2.3.4")
  136. def test_ddclient_dyndns2_v4_invalid(self):
  137. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4asdf
  138. params = {
  139. "domain_name": self.my_domain.name,
  140. "system": "dyndns",
  141. "hostname": self.my_domain.name,
  142. "myip": "10.2.3.4asdf",
  143. }
  144. response = self.client.get(self.reverse("v1:dyndns12update"), params)
  145. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  146. self.assertIn("malformed", str(response.data))
  147. def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
  148. # /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
  149. for name in [
  150. self.owner.email,
  151. self.other_domain.name,
  152. self.my_domain.parent_domain_name,
  153. ]:
  154. response = self.assertDynDNS12NoUpdate(
  155. system="dyndns",
  156. hostname=name,
  157. myip="10.2.3.4",
  158. )
  159. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  160. self.assertEqual(response.content, b"nohost")
  161. def test_ddclient_dyndns2_v4_blocked(self):
  162. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=3.2.2.3
  163. BlockedSubnet.from_ip("3.2.2.3").save()
  164. params = {
  165. "domain_name": self.my_domain.name,
  166. "system": "dyndns",
  167. "hostname": self.my_domain.name,
  168. "myip": "3.2.2.5",
  169. }
  170. response = self.client.get(self.reverse("v1:dyndns12update"), params)
  171. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  172. self.assertIn("IP address 3.2.2.5 not allowed.", str(response.data))
  173. def test_ddclient_dyndns2_v6_success(self):
  174. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  175. response = self.assertDynDNS12Update(
  176. domain_name=self.my_domain.name,
  177. system="dyndns",
  178. hostname=self.my_domain.name,
  179. myipv6="::666",
  180. )
  181. self.assertStatus(response, status.HTTP_200_OK)
  182. self.assertEqual(response.data, "good")
  183. self.assertIP(ipv4="127.0.0.1", ipv6="::666")
  184. def test_ddclient_dyndns2_mixed_success(self):
  185. response = self.assertDynDNS12Update(
  186. domain_name=self.my_domain.name,
  187. system="dyndns",
  188. hostname=self.my_domain.name,
  189. myip="10.2.3.4, ::2 , 10.6.5.4 ,::4",
  190. )
  191. self.assertStatus(response, status.HTTP_200_OK)
  192. self.assertEqual(response.data, "good")
  193. self.assertIP(ipv4={"10.2.3.4", "10.6.5.4"}, ipv6={"::2", "::4"})
  194. def test_ddclient_dyndns2_mixed_invalid(self):
  195. for myip in ["10.2.3.4, ", "preserve,::2"]:
  196. response = self.assertDynDNS12NoUpdate(
  197. domain_name=self.my_domain.name,
  198. system="dyndns",
  199. hostname=self.my_domain.name,
  200. myip=myip,
  201. )
  202. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  203. self.assertEqual(response.data["code"], "inconsistent-parameter")
  204. def test_fritz_box(self):
  205. # /
  206. response = self.assertDynDNS12Update(self.my_domain.name)
  207. self.assertStatus(response, status.HTTP_200_OK)
  208. self.assertEqual(response.data, "good")
  209. self.assertIP(ipv4="127.0.0.1")
  210. def test_unset_ip(self):
  211. for (v4, v6) in [
  212. ("127.0.0.1", "::1"),
  213. ("127.0.0.1", ""),
  214. ("", "::1"),
  215. ("", ""),
  216. ]:
  217. response = self.assertDynDNS12Update(self.my_domain.name, ip=v4, ipv6=v6)
  218. self.assertStatus(response, status.HTTP_200_OK)
  219. self.assertEqual(response.data, "good")
  220. self.assertIP(ipv4=v4, ipv6=v6)
  221. def test_preserve_ip(self):
  222. current_v4 = "127.0.0.1"
  223. current_v6 = "::1"
  224. self.assertDynDNS12Update(self.my_domain.name, ip=current_v4, ipv6=current_v6)
  225. for (v4, v6) in [
  226. ("preserve", "::3"),
  227. ("1.2.3.4", "preserve"),
  228. ("preserve", "preserve"),
  229. ]:
  230. self.assertDynDNS12Update(
  231. self.my_domain.name, ip=v4, ipv6=v6, expect_update=v4 != v6
  232. )
  233. current_v4 = current_v4 if v4 == "preserve" else v4
  234. current_v6 = current_v6 if v6 == "preserve" else v6
  235. self.assertIP(ipv4=current_v4, ipv6=current_v6)
  236. class SingleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  237. NUM_OWNED_DOMAINS = 1
  238. def test_identification_by_token(self):
  239. self.client.set_credentials_basic_auth("", self.token.plain)
  240. response = self.assertDynDNS12Update(
  241. self.my_domain.name, mock_remote_addr="10.5.5.6"
  242. )
  243. self.assertStatus(response, status.HTTP_200_OK)
  244. self.assertEqual(response.data, "good")
  245. self.assertIP(ipv4="10.5.5.6")
  246. def test_identification_by_email(self):
  247. self.client.set_credentials_basic_auth(self.owner.email, self.token.plain)
  248. response = self.assertDynDNS12Update(
  249. self.my_domain.name, mock_remote_addr="10.5.5.6"
  250. )
  251. self.assertStatus(response, status.HTTP_200_OK)
  252. self.assertEqual(response.data, "good")
  253. self.assertIP(ipv4="10.5.5.6")
  254. class MultipleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  255. NUM_OWNED_DOMAINS = 4
  256. def test_ignore_minimum_ttl(self):
  257. self.my_domain.minimum_ttl = 61
  258. self.my_domain.save()
  259. # Test that dynDNS updates work both under a local public suffix (self.my_domain) and for a custom domains
  260. for domain in [self.my_domain, self.create_domain(owner=self.owner)]:
  261. self.assertGreater(domain.minimum_ttl, 60)
  262. self.client.set_credentials_basic_auth(
  263. domain.name.lower(), self.token.plain
  264. )
  265. response = self.assertDynDNS12Update(domain.name)
  266. self.assertStatus(response, status.HTTP_200_OK)
  267. self.assertEqual(domain.rrset_set.get(subname="", type="A").ttl, 60)
  268. def test_identification_by_token(self):
  269. """
  270. Test if the conflict of having multiple domains, but not specifying which to update is correctly recognized.
  271. """
  272. self.client.set_credentials_basic_auth("", self.token.plain)
  273. response = self.client.get(
  274. self.reverse("v1:dyndns12update"), REMOTE_ADDR="10.5.5.7"
  275. )
  276. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  277. class MixedCaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  278. @staticmethod
  279. def random_casing(s):
  280. return "".join(
  281. [c.lower() if random.choice([True, False]) else c.upper() for c in s]
  282. )
  283. def setUp(self):
  284. super().setUp()
  285. self.my_domain.name = self.random_casing(self.my_domain.name)
  286. class UppercaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  287. def setUp(self):
  288. super().setUp()
  289. self.my_domain.name = self.my_domain.name.upper()