test_dyndns12update.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. import random
  2. from rest_framework import status
  3. from desecapi.tests.base import DynDomainOwnerTestCase
  4. class DynDNS12UpdateTest(DynDomainOwnerTestCase):
  5. def assertIP(self, ipv4=None, ipv6=None, name=None, subname=""):
  6. name = name or self.my_domain.name.lower()
  7. for type_, value in [("A", ipv4), ("AAAA", ipv6)]:
  8. url = self.reverse("v1:rrset", name=name, subname=subname, type=type_)
  9. response = self.client_token_authorized.get(url)
  10. if value:
  11. self.assertStatus(response, status.HTTP_200_OK)
  12. self.assertEqual(response.data["records"][0], value)
  13. self.assertEqual(response.data["ttl"], 60)
  14. else:
  15. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  16. def test_identification_by_domain_name(self):
  17. self.client.set_credentials_basic_auth(
  18. self.my_domain.name + ".invalid", self.token.plain
  19. )
  20. response = self.assertDynDNS12NoUpdate(mock_remote_addr="10.5.5.6")
  21. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  22. def test_identification_by_query_params(self):
  23. # /update?username=foobar.dedyn.io&password=secret
  24. self.client.set_credentials_basic_auth(None, None)
  25. response = self.assertDynDNS12Update(
  26. username=self.my_domain.name, password=self.token.plain
  27. )
  28. self.assertStatus(response, status.HTTP_200_OK)
  29. self.assertEqual(response.data, "good")
  30. self.assertEqual(response.content_type, "text/plain")
  31. self.assertIP(ipv4="127.0.0.1")
  32. def test_identification_by_query_params_with_subdomain(self):
  33. # /update?username=baz.foobar.dedyn.io&password=secret
  34. self.client.set_credentials_basic_auth(None, None)
  35. response = self.assertDynDNS12NoUpdate(
  36. username="baz", password=self.token.plain
  37. )
  38. self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
  39. self.assertEqual(response.content, b"badauth")
  40. for subname in ["baz", "*.baz"]:
  41. response = self.assertDynDNS12Update(
  42. username=f"{subname}.{self.my_domain.name}", password=self.token.plain
  43. )
  44. self.assertStatus(response, status.HTTP_200_OK)
  45. self.assertEqual(response.data, "good")
  46. self.assertIP(ipv4="127.0.0.1", subname=subname)
  47. def test_deviant_ttl(self):
  48. """
  49. The dynamic update will try to set the TTL to 60. Here, we create
  50. a record with a different TTL beforehand and then make sure that
  51. updates still work properly.
  52. """
  53. with self.assertPdnsRequests(
  54. self.request_pdns_zone_update(self.my_domain.name),
  55. self.request_pdns_zone_axfr(self.my_domain.name),
  56. ):
  57. response = self.client_token_authorized.patch_rr_set(
  58. self.my_domain.name.lower(), "", "A", {"ttl": 3600}
  59. )
  60. self.assertStatus(response, status.HTTP_200_OK)
  61. response = self.assertDynDNS12Update(self.my_domain.name)
  62. self.assertStatus(response, status.HTTP_200_OK)
  63. self.assertEqual(response.data, "good")
  64. self.assertIP(ipv4="127.0.0.1")
  65. def test_ddclient_dyndns1_v4_success(self):
  66. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myip=10.1.2.3
  67. with self.assertPdnsRequests(
  68. self.request_pdns_zone_update(self.my_domain.name),
  69. self.request_pdns_zone_axfr(self.my_domain.name),
  70. ):
  71. response = self.client.get(
  72. self.reverse("v1:dyndns12update"),
  73. {
  74. "action": "edit",
  75. "started": 1,
  76. "hostname": "YES",
  77. "host_id": self.my_domain.name,
  78. "myip": "10.1.2.3",
  79. },
  80. )
  81. self.assertStatus(response, status.HTTP_200_OK)
  82. self.assertEqual(response.data, "good")
  83. self.assertIP(ipv4="10.1.2.3")
  84. # Repeat and make sure that no pdns request is made (not even for the empty AAAA record)
  85. response = self.client.get(
  86. self.reverse("v1:dyndns12update"),
  87. {
  88. "action": "edit",
  89. "started": 1,
  90. "hostname": "YES",
  91. "host_id": self.my_domain.name,
  92. "myip": "10.1.2.3",
  93. },
  94. )
  95. self.assertStatus(response, status.HTTP_200_OK)
  96. def test_ddclient_dyndns1_v6_success(self):
  97. # /nic/dyndns?action=edit&started=1&hostname=YES&host_id=foobar.dedyn.io&myipv6=::1337
  98. response = self.assertDynDNS12Update(
  99. domain_name=self.my_domain.name,
  100. action="edit",
  101. started=1,
  102. hostname="YES",
  103. host_id=self.my_domain.name,
  104. myipv6="::1337",
  105. )
  106. self.assertStatus(response, status.HTTP_200_OK)
  107. self.assertEqual(response.data, "good")
  108. self.assertIP(ipv4="127.0.0.1", ipv6="::1337")
  109. # Repeat and make sure that no pdns request is made (not even for the empty A record)
  110. response = self.client.get(
  111. self.reverse("v1:dyndns12update"),
  112. {
  113. "domain_name": self.my_domain.name,
  114. "action": "edit",
  115. "started": 1,
  116. "hostname": "YES",
  117. "host_id": self.my_domain.name,
  118. "myipv6": "::1337",
  119. },
  120. )
  121. self.assertStatus(response, status.HTTP_200_OK)
  122. def test_ddclient_dyndns2_v4_success(self):
  123. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4
  124. response = self.assertDynDNS12Update(
  125. domain_name=self.my_domain.name,
  126. system="dyndns",
  127. hostname=self.my_domain.name,
  128. myip="10.2.3.4",
  129. )
  130. self.assertStatus(response, status.HTTP_200_OK)
  131. self.assertEqual(response.data, "good")
  132. self.assertIP(ipv4="10.2.3.4")
  133. def test_ddclient_dyndns2_v4_invalid(self):
  134. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4asdf
  135. params = {
  136. "domain_name": self.my_domain.name,
  137. "system": "dyndns",
  138. "hostname": self.my_domain.name,
  139. "myip": "10.2.3.4asdf",
  140. }
  141. response = self.client.get(self.reverse("v1:dyndns12update"), params)
  142. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  143. self.assertIn("malformed", str(response.data))
  144. def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
  145. # /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
  146. for name in [
  147. self.owner.email,
  148. self.other_domain.name,
  149. self.my_domain.parent_domain_name,
  150. ]:
  151. response = self.assertDynDNS12NoUpdate(
  152. system="dyndns",
  153. hostname=name,
  154. myip="10.2.3.4",
  155. )
  156. self.assertStatus(response, status.HTTP_404_NOT_FOUND)
  157. self.assertEqual(response.content, b"nohost")
  158. def test_ddclient_dyndns2_v6_success(self):
  159. # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
  160. response = self.assertDynDNS12Update(
  161. domain_name=self.my_domain.name,
  162. system="dyndns",
  163. hostname=self.my_domain.name,
  164. myipv6="::666",
  165. )
  166. self.assertStatus(response, status.HTTP_200_OK)
  167. self.assertEqual(response.data, "good")
  168. self.assertIP(ipv4="127.0.0.1", ipv6="::666")
  169. def test_fritz_box(self):
  170. # /
  171. response = self.assertDynDNS12Update(self.my_domain.name)
  172. self.assertStatus(response, status.HTTP_200_OK)
  173. self.assertEqual(response.data, "good")
  174. self.assertIP(ipv4="127.0.0.1")
  175. def test_unset_ip(self):
  176. for (v4, v6) in [
  177. ("127.0.0.1", "::1"),
  178. ("127.0.0.1", ""),
  179. ("", "::1"),
  180. ("", ""),
  181. ]:
  182. response = self.assertDynDNS12Update(self.my_domain.name, ip=v4, ipv6=v6)
  183. self.assertStatus(response, status.HTTP_200_OK)
  184. self.assertEqual(response.data, "good")
  185. self.assertIP(ipv4=v4, ipv6=v6)
  186. def test_preserve_ip(self):
  187. current_v4 = "127.0.0.1"
  188. current_v6 = "::1"
  189. self.assertDynDNS12Update(self.my_domain.name, ip=current_v4, ipv6=current_v6)
  190. for (v4, v6) in [
  191. ("preserve", "::3"),
  192. ("1.2.3.4", "preserve"),
  193. ("preserve", "preserve"),
  194. ]:
  195. self.assertDynDNS12Update(
  196. self.my_domain.name, ip=v4, ipv6=v6, expect_update=v4 != v6
  197. )
  198. current_v4 = current_v4 if v4 == "preserve" else v4
  199. current_v6 = current_v6 if v6 == "preserve" else v6
  200. self.assertIP(ipv4=current_v4, ipv6=current_v6)
  201. class SingleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  202. NUM_OWNED_DOMAINS = 1
  203. def test_identification_by_token(self):
  204. self.client.set_credentials_basic_auth("", self.token.plain)
  205. response = self.assertDynDNS12Update(
  206. self.my_domain.name, mock_remote_addr="10.5.5.6"
  207. )
  208. self.assertStatus(response, status.HTTP_200_OK)
  209. self.assertEqual(response.data, "good")
  210. self.assertIP(ipv4="10.5.5.6")
  211. def test_identification_by_email(self):
  212. self.client.set_credentials_basic_auth(self.owner.email, self.token.plain)
  213. response = self.assertDynDNS12Update(
  214. self.my_domain.name, mock_remote_addr="10.5.5.6"
  215. )
  216. self.assertStatus(response, status.HTTP_200_OK)
  217. self.assertEqual(response.data, "good")
  218. self.assertIP(ipv4="10.5.5.6")
  219. class MultipleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
  220. NUM_OWNED_DOMAINS = 4
  221. def test_ignore_minimum_ttl(self):
  222. self.my_domain.minimum_ttl = 61
  223. self.my_domain.save()
  224. # Test that dynDNS updates work both under a local public suffix (self.my_domain) and for a custom domains
  225. for domain in [self.my_domain, self.create_domain(owner=self.owner)]:
  226. self.assertGreater(domain.minimum_ttl, 60)
  227. self.client.set_credentials_basic_auth(
  228. domain.name.lower(), self.token.plain
  229. )
  230. response = self.assertDynDNS12Update(domain.name)
  231. self.assertStatus(response, status.HTTP_200_OK)
  232. self.assertEqual(domain.rrset_set.get(subname="", type="A").ttl, 60)
  233. def test_identification_by_token(self):
  234. """
  235. Test if the conflict of having multiple domains, but not specifying which to update is correctly recognized.
  236. """
  237. self.client.set_credentials_basic_auth("", self.token.plain)
  238. response = self.client.get(
  239. self.reverse("v1:dyndns12update"), REMOTE_ADDR="10.5.5.7"
  240. )
  241. self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
  242. class MixedCaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  243. @staticmethod
  244. def random_casing(s):
  245. return "".join(
  246. [c.lower() if random.choice([True, False]) else c.upper() for c in s]
  247. )
  248. def setUp(self):
  249. super().setUp()
  250. self.my_domain.name = self.random_casing(self.my_domain.name)
  251. class UppercaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):
  252. def setUp(self):
  253. super().setUp()
  254. self.my_domain.name = self.my_domain.name.upper()