Browse Source

feat(api): allow dynDNS updates on subdomains, closes #412

Peter Thomassen 4 years ago
parent
commit
ebe57ff1ee

+ 8 - 6
api/desecapi/authentication.py

@@ -10,18 +10,20 @@ from rest_framework.authentication import (
     TokenAuthentication as RestFrameworkTokenAuthentication,
     BasicAuthentication)
 
-from desecapi.models import Token
+from desecapi.models import Domain, Token
 from desecapi.serializers import AuthenticatedBasicUserActionSerializer, EmailPasswordSerializer
 
 
 class DynAuthenticationMixin:
     def authenticate_credentials(self, username, key):
         user, token = TokenAuthentication().authenticate_credentials(key)
-        if not user.is_active:
-            raise exceptions.AuthenticationFailed
-        if username not in ['', user.email] and not user.domains.filter(name=username.lower()).exists():
-            raise exceptions.AuthenticationFailed
-        return user, token
+        # Make sure username is not misleading
+        try:
+            if username in ['', user.email] or Domain.objects.filter_qname(username.lower(), owner=user).exists():
+                return user, token
+        except ValueError:
+            pass
+        raise exceptions.AuthenticationFailed
 
 
 class TokenAuthentication(RestFrameworkTokenAuthentication):

+ 17 - 1
api/desecapi/models.py

@@ -25,8 +25,9 @@ from django.core.exceptions import ValidationError
 from django.core.mail import EmailMessage, get_connection
 from django.core.validators import MinValueValidator, RegexValidator
 from django.db import models
-from django.db.models import Manager, Q
+from django.db.models import CharField, F, Manager, Q, Value
 from django.db.models.expressions import RawSQL
+from django.db.models.functions import Concat, Length
 from django.template.loader import get_template
 from django.utils import timezone
 from django_prometheus.models import ExportModelOperationsMixin
@@ -204,6 +205,19 @@ validate_domain_name = [
 ]
 
 
+class DomainManager(Manager):
+    def filter_qname(self, qname: str, **kwargs) -> models.query.QuerySet:
+        try:
+            Domain._meta.get_field('name').run_validators(qname)
+        except ValidationError:
+            raise ValueError
+        return self.annotate(
+            dotted_name=Concat(Value('.'), 'name', output_field=CharField()),
+            dotted_qname=Value(f'.{qname}', output_field=CharField()),
+            name_length=Length('name'),
+        ).filter(dotted_qname__endswith=F('dotted_name'), **kwargs)
+
+
 class Domain(ExportModelOperationsMixin('Domain'), models.Model):
     @staticmethod
     def _minimum_ttl_default():
@@ -226,7 +240,9 @@ class Domain(ExportModelOperationsMixin('Domain'), models.Model):
     minimum_ttl = models.PositiveIntegerField(default=_minimum_ttl_default.__func__)
     renewal_state = models.IntegerField(choices=RenewalState.choices, default=RenewalState.IMMORTAL)
     renewal_changed = models.DateTimeField(auto_now_add=True)
+
     _keys = None
+    objects = DomainManager()
 
     def __init__(self, *args, **kwargs):
         if isinstance(kwargs.get('owner'), AnonymousUser):

+ 47 - 0
api/desecapi/tests/test_domains.py

@@ -453,3 +453,50 @@ class AutoDelegationDomainOwnerTests(DomainOwnerTestCase):
             response = self.client.post(url, {'name': name})
         self.assertStatus(response, status.HTTP_201_CREATED)
         self.assertEqual(response.data['minimum_ttl'], 60)
+
+
+class DomainManagerTestCase(DesecTestCase):
+
+    def test_filter_qname(self):
+        user1, user2 = self.create_user(), self.create_user()
+        domains = {
+            user1: ['domain.dedyn.io', 'foobar.example'],
+            user2: ['dedyn.io', 'desec.io'],
+        }
+        for user, names in domains.items():
+            for name in names:
+                Domain(name=name, owner=user).save()
+
+        config = {
+            'domain.dedyn.io': {
+                None: ['domain.dedyn.io', 'dedyn.io'],
+                user1: ['domain.dedyn.io'],
+                user2: ['dedyn.io'],
+            },
+            'foo.bar.baz.foobar.example': {
+                None: ['foobar.example'],
+                user1: ['foobar.example'],
+                user2: [],
+            },
+            'dedyn.io': {
+                None: ['dedyn.io'],
+                user1: [],
+                user2: ['dedyn.io'],
+            },
+            'foobar.desec.io': {
+                None: ['desec.io'],
+                user1: [],
+                user2: ['desec.io'],
+            },
+        }
+        config['sub.domain.dedyn.io'] = config['domain.dedyn.io']
+
+        for qname, cases in config.items():
+            for owner, expected in cases.items():
+                filter_kwargs = dict(owner=owner) if owner is not None else {}
+                qs = Domain.objects.filter_qname(qname, **filter_kwargs).values_list('name', flat=True)
+                self.assertListEqual(list(qs), expected)
+
+    def test_filter_qname_invalid(self):
+        with self.assertRaises(ValueError):
+            Domain.objects.filter_qname('foo@bar.com')

+ 27 - 3
api/desecapi/tests/test_dyndns12update.py

@@ -7,10 +7,11 @@ from desecapi.tests.base import DynDomainOwnerTestCase
 
 class DynDNS12UpdateTest(DynDomainOwnerTestCase):
 
-    def assertIP(self, ipv4=None, ipv6=None, name=None):
+    def assertIP(self, ipv4=None, ipv6=None, name=None, subname=''):
         name = name or self.my_domain.name.lower()
         for type_, value in [('A', ipv4), ('AAAA', ipv6)]:
-            response = self.client_token_authorized.get(self.reverse('v1:rrset', name=name, subname='', type=type_))
+            url = self.reverse('v1:rrset', name=name, subname=subname, type=type_)
+            response = self.client_token_authorized.get(url)
             if value:
                 self.assertStatus(response, status.HTTP_200_OK)
                 self.assertEqual(response.data['records'][0], value)
@@ -31,6 +32,18 @@ class DynDNS12UpdateTest(DynDomainOwnerTestCase):
         self.assertEqual(response.data, 'good')
         self.assertIP(ipv4='127.0.0.1')
 
+    def test_identification_by_query_params_with_subdomain(self):
+        # /update?username=baz.foobar.dedyn.io&password=secret
+        self.client.set_credentials_basic_auth(None, None)
+        response = self.assertDynDNS12NoUpdate(username='baz', password=self.token.plain)
+        self.assertStatus(response, status.HTTP_401_UNAUTHORIZED)
+        self.assertEqual(response.content, b'badauth')
+
+        response = self.assertDynDNS12Update(username=f'baz.{self.my_domain.name}', password=self.token.plain)
+        self.assertStatus(response, status.HTTP_200_OK)
+        self.assertEqual(response.data, 'good')
+        self.assertIP(ipv4='127.0.0.1', subname='baz')
+
     def test_deviant_ttl(self):
         """
         The dynamic update will try to set the TTL to 60. Here, we create
@@ -134,6 +147,17 @@ class DynDNS12UpdateTest(DynDomainOwnerTestCase):
         self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
         self.assertIn('malformed', str(response.data))
 
+    def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
+        # /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
+        for name in [self.owner.email, self.other_domain.name, self.my_domain.parent_domain_name]:
+            response = self.assertDynDNS12NoUpdate(
+                system='dyndns',
+                hostname=name,
+                myip='10.2.3.4',
+            )
+            self.assertStatus(response, status.HTTP_404_NOT_FOUND)
+            self.assertEqual(response.content, b'nohost')
+
     def test_ddclient_dyndns2_v6_success(self):
         # /nic/update?system=dyndns&hostname=foobar.dedyn.io&myipv6=::1338
         response = self.assertDynDNS12Update(
@@ -205,7 +229,7 @@ class MultipleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
         """
         self.client.set_credentials_basic_auth('', self.token.plain)
         response = self.client.get(self.reverse('v1:dyndns12update'), REMOTE_ADDR='10.5.5.7')
-        self.assertStatus(response, status.HTTP_409_CONFLICT)
+        self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
 
 
 class MixedCaseDynDNS12UpdateTestCase(DynDNS12UpdateTest):

+ 67 - 69
api/desecapi/views.py

@@ -1,6 +1,7 @@
 import base64
 import binascii
 from datetime import timedelta
+from functools import cached_property
 
 from django.conf import settings
 from django.contrib.auth import user_logged_in
@@ -265,60 +266,12 @@ class Root(APIView):
         return Response(routes)
 
 
-class DynDNS12Update(APIView):
+class DynDNS12Update(generics.GenericAPIView):
     authentication_classes = (auth.TokenAuthentication, auth.BasicTokenAuthentication, auth.URLParamAuthentication,)
     renderer_classes = [PlainTextRenderer]
     throttle_scope = 'dyndns'
 
-    def _find_domain(self, request):
-        def find_domain_name(r):
-            # 1. hostname parameter
-            if 'hostname' in r.query_params and r.query_params['hostname'] != 'YES':
-                return r.query_params['hostname']
-
-            # 2. host_id parameter
-            if 'host_id' in r.query_params:
-                return r.query_params['host_id']
-
-            # 3. http basic auth username
-            try:
-                domain_name = base64.b64decode(
-                    get_authorization_header(r).decode().split(' ')[1].encode()).decode().split(':')[0]
-                if domain_name and '@' not in domain_name:
-                    return domain_name
-            except IndexError:
-                pass
-            except UnicodeDecodeError:
-                pass
-            except binascii.Error:
-                pass
-
-            # 4. username parameter
-            if 'username' in r.query_params:
-                return r.query_params['username']
-
-            # 5. only domain associated with this user account
-            if len(r.user.domains.all()) == 1:
-                return r.user.domains.all()[0].name
-            if len(r.user.domains.all()) > 1:
-                ex = ValidationError(detail={
-                    "detail": "Request does not specify domain unambiguously.",
-                    "code": "domain-ambiguous"
-                })
-                ex.status_code = status.HTTP_409_CONFLICT
-                raise ex
-
-            return None
-
-        name = find_domain_name(request).lower()
-
-        try:
-            return self.request.user.domains.get(name=name)
-        except models.Domain.DoesNotExist:
-            return None
-
-    @staticmethod
-    def find_ip(request, params, version=4):
+    def _find_ip(self, params, version):
         if version == 4:
             look_for = '.'
         elif version == 6:
@@ -328,43 +281,88 @@ class DynDNS12Update(APIView):
 
         # Check URL parameters
         for p in params:
-            if p in request.query_params:
-                if not len(request.query_params[p]):
+            if p in self.request.query_params:
+                if not len(self.request.query_params[p]):
                     return None
-                if look_for in request.query_params[p]:
-                    return request.query_params[p]
+                if look_for in self.request.query_params[p]:
+                    return self.request.query_params[p]
 
         # Check remote IP address
-        client_ip = request.META.get('REMOTE_ADDR')
+        client_ip = self.request.META.get('REMOTE_ADDR')
         if look_for in client_ip:
             return client_ip
 
         # give up
         return None
 
-    def _find_ip_v4(self, request):
-        return self.find_ip(request, ['myip', 'myipv4', 'ip'])
+    @cached_property
+    def qname(self):
+        # hostname parameter
+        try:
+            if self.request.query_params['hostname'] != 'YES':
+                return self.request.query_params['hostname'].lower()
+        except KeyError:
+            pass
 
-    def _find_ip_v6(self, request):
-        return self.find_ip(request, ['myipv6', 'ipv6', 'myip', 'ip'], version=6)
+        # host_id parameter
+        try:
+            return self.request.query_params['host_id'].lower()
+        except KeyError:
+            pass
 
-    def get(self, request, *_):
-        domain = self._find_domain(request)
+        # http basic auth username
+        try:
+            domain_name = base64.b64decode(
+                get_authorization_header(self.request).decode().split(' ')[1].encode()).decode().split(':')[0]
+            if domain_name and '@' not in domain_name:
+                return domain_name.lower()
+        except (binascii.Error, IndexError, UnicodeDecodeError):
+            pass
 
-        if domain is None:
+        # username parameter
+        try:
+            return self.request.query_params['username'].lower()
+        except KeyError:
+            pass
+
+        # only domain associated with this user account
+        try:
+            return self.request.user.domains.get().name
+        except models.Domain.MultipleObjectsReturned:
+            raise ValidationError(detail={
+                "detail": "Request does not properly specify domain for update.",
+                "code": "domain-unspecified"
+            })
+        except models.Domain.DoesNotExist:
             metrics.get('desecapi_dynDNS12_domain_not_found').inc()
             raise NotFound('nohost')
 
-        ipv4 = self._find_ip_v4(request)
-        ipv6 = self._find_ip_v6(request)
+    @cached_property
+    def domain(self):
+        try:
+            return models.Domain.objects.filter_qname(self.qname, owner=self.request.user).order_by('-name_length')[0]
+        except (IndexError, ValueError):
+            raise NotFound('nohost')
+
+    @property
+    def subname(self):
+        return self.qname.rpartition(f'.{self.domain.name}')[0]
+
+    def get_queryset(self):
+        return self.domain.rrset_set.filter(subname=self.subname, type__in=['A', 'AAAA'])
+
+    def get(self, request, *_):
+        instances = self.get_queryset().all()
+
+        ipv4 = self._find_ip(['myip', 'myipv4', 'ip'], version=4)
+        ipv6 = self._find_ip(['myipv6', 'ipv6', 'myip', 'ip'], version=6)
 
         data = [
-            {'type': 'A', 'subname': '', 'ttl': 60, 'records': [ipv4] if ipv4 else []},
-            {'type': 'AAAA', 'subname': '', 'ttl': 60, 'records': [ipv6] if ipv6 else []},
+            {'type': 'A', 'subname': self.subname, 'ttl': 60, 'records': [ipv4] if ipv4 else []},
+            {'type': 'AAAA', 'subname': self.subname, 'ttl': 60, 'records': [ipv6] if ipv6 else []},
         ]
 
-        instances = domain.rrset_set.filter(subname='', type__in=['A', 'AAAA']).all()
-        context = {'domain': domain, 'minimum_ttl': 60}
+        context = {'domain': self.domain, 'minimum_ttl': 60}
         serializer = serializers.RRsetSerializer(instances, data=data, many=True, partial=True, context=context)
         try:
             serializer.is_valid(raise_exception=True)
@@ -383,7 +381,7 @@ class DynDNS12Update(APIView):
         with PDNSChangeTracker():
             serializer.save()
 
-        return Response('good', content_type='text/plain')
+        return Response('good')
 
 
 class DonationList(generics.CreateAPIView):

+ 14 - 2
docs/dyndns/update-api.rst

@@ -83,8 +83,20 @@ determine the hostname, we try the following steps until there is a match:
 - After successful authentication (no matter how), the only hostname that is
   associated with your user account (if not ambiguous).
 
-If we cannot determine a hostname to update, the API will return a ``404 Not
-Found`` status code.
+If we cannot determine a hostname to update, the API returns a status code of
+``400 Bad Request`` (if no hostname was given but multiple domains exist in
+the account) or ``404 Not Found`` (if the specified domain was not found).
+
+Subdomains
+----------
+The dynDNS update API can also be used to update IP records for subdomains.
+To do so, make sure that in the above list of steps, the first value
+provided contains the full domain name (including the subdomain).
+
+Example: Your domain is ``yourdomain.dedyn.io``, and you're using HTTP Basic
+Authentication.  In this case, replace your authentication username with
+``sub.yourdomain.dedyn.io``.  Similarly, if you use the ``hostname`` query
+parameter, it needs to be set to the full domain name (including subdomain).
 
 .. _determine-ip-addresses: