Преглед изворни кода

feat(api): improve domain name and PSL validation

Nils Wisiol пре 5 година
родитељ
комит
9d1b545d82

+ 68 - 10
api/desecapi/models.py

@@ -8,16 +8,18 @@ import uuid
 from base64 import b64encode
 from datetime import datetime, timedelta
 from os import urandom
+from typing import Union
 
+import psl_dns
 import rest_framework.authtoken.models
 from django.conf import settings
-from django.contrib.auth.models import BaseUserManager, AbstractBaseUser
+from django.contrib.auth.models import BaseUserManager, AbstractBaseUser, AnonymousUser
 from django.core.exceptions import ValidationError
 from django.core.mail import EmailMessage
 from django.core.signing import Signer
 from django.core.validators import RegexValidator
 from django.db import models
-from django.db.models import Manager
+from django.db.models import Manager, Q
 from django.template.loader import get_template
 from django.utils import timezone
 from django.utils.crypto import constant_time_compare
@@ -26,6 +28,7 @@ from rest_framework.exceptions import APIException
 from desecapi import pdns
 
 logger = logging.getLogger(__name__)
+psl = psl_dns.PSL(resolver=settings.PSL_RESOLVER)
 
 
 def validate_lower(value):
@@ -184,31 +187,86 @@ class Token(rest_framework.authtoken.models.Token):
         unique_together = (('user', 'user_specific_id'),)
 
 
+validate_domain_name = [
+    validate_lower,
+    RegexValidator(
+        regex=r'^[a-z0-9_.-]*[a-z]$',
+        message='Invalid value (not a DNS name).',
+        code='invalid_domain_name'
+    )
+]
+
+
 class Domain(models.Model):
     created = models.DateTimeField(auto_now_add=True)
     name = models.CharField(max_length=191,
                             unique=True,
-                            validators=[validate_lower,
-                                        RegexValidator(regex=r'^[a-z0-9_.-]*[a-z]$',
-                                                       message='Invalid value (not a DNS name).',
-                                                       code='invalid_domain_name')
-                                        ])
+                            validators=validate_domain_name)
     owner = models.ForeignKey(User, on_delete=models.PROTECT, related_name='domains')
     published = models.DateTimeField(null=True, blank=True)
     minimum_ttl = models.PositiveIntegerField(default=settings.MINIMUM_TTL_DEFAULT)
 
+    @classmethod
+    def is_registrable(cls, domain_name: str, user: User):
+        """
+        Returns False in any of the following cases:
+        (a) the domain_name appears on the public suffix list,
+        (b) the domain is descendant to a zone that belongs to any user different from the given one,
+            unless it's parent is a public suffix, either through the Internet PSL or local settings.
+        Otherwise, True is returned.
+        """
+        if domain_name != domain_name.lower():
+            raise ValueError
+
+        try:
+            public_suffix = psl.get_public_suffix(domain_name)
+            is_public_suffix = psl.is_public_suffix(domain_name)
+        except psl_dns.exceptions.UnsupportedRule as e:
+            # It would probably be fine to just return True (with the TLD acting as the
+            # public suffix and setting both public_suffix and is_public_suffix accordingly).
+            # However, in order to allow to investigate the situation, it's better not catch
+            # this exception. For web requests, our error handler turns it into a 503 error
+            # and makes sure admins are notified.
+            raise e
+
+        if not is_public_suffix:
+            # Take into account that any of the parent domains could be a local public suffix. To that
+            # end, identify the longest local public suffix that is actually a suffix of domain_name.
+            # Then, override the global PSL result.
+            for local_public_suffix in settings.LOCAL_PUBLIC_SUFFIXES:
+                has_local_public_suffix_parent = ('.' + domain_name).endswith('.' + local_public_suffix)
+                if has_local_public_suffix_parent and len(local_public_suffix) > len(public_suffix):
+                    public_suffix = local_public_suffix
+                    is_public_suffix = (public_suffix == domain_name)
+
+        if is_public_suffix and domain_name not in settings.LOCAL_PUBLIC_SUFFIXES:
+            return False
+
+        # Generate a list of all domains connecting this one and its public suffix.
+        # If another user owns a zone with one of these names, then the requested
+        # domain is unavailable because it is part of the other user's zone.
+        private_components = domain_name.rsplit(public_suffix, 1)[0].rstrip('.')
+        private_components = private_components.split('.') if private_components else []
+        private_components += [public_suffix]
+        private_domains = ['.'.join(private_components[i:]) for i in range(0, len(private_components) - 1)]
+        assert is_public_suffix or domain_name == private_domains[0]
+
+        # Deny registration for non-local public suffixes and for domains covered by other users' zones
+        user = user if not isinstance(user, AnonymousUser) else None
+        return not cls.objects.filter(Q(name__in=private_domains) & ~Q(owner=user)).exists()
+
     @property
     def keys(self):
         return pdns.get_keys(self)
 
-    def has_local_public_suffix(self):
+    def is_locally_registrable(self):
         return self.partition_name()[1] in settings.LOCAL_PUBLIC_SUFFIXES
 
     def parent_domain_name(self):
         return self.partition_name()[1]
 
-    def partition_name(domain):
-        name = domain.name if isinstance(domain, Domain) else domain
+    def partition_name(self: Union[Domain, str]):
+        name = self.name if isinstance(self, Domain) else self
         subname, _, parent_name = name.partition('.')
         return subname, parent_name or None
 

+ 11 - 33
api/desecapi/serializers.py

@@ -3,7 +3,6 @@ import json
 import re
 from base64 import urlsafe_b64decode, urlsafe_b64encode
 
-import psl_dns
 from django.core.validators import MinValueValidator
 from django.db.models import Model, Q
 from rest_framework import serializers
@@ -429,7 +428,6 @@ class RRsetListSerializer(ListSerializer):
 
 
 class DomainSerializer(serializers.ModelSerializer):
-    psl = psl_dns.PSL(resolver=settings.PSL_RESOLVER)
 
     class Meta:
         model = models.Domain
@@ -446,38 +444,14 @@ class DomainSerializer(serializers.ModelSerializer):
         return fields
 
     def validate_name(self, value):
-        # Check if domain is a public suffix
-        try:
-            public_suffix = self.psl.get_public_suffix(value)
-            is_public_suffix = self.psl.is_public_suffix(value)
-        except psl_dns.exceptions.UnsupportedRule as e:
-            # It would probably be fine to just create the domain (with the TLD acting as the
-            # public suffix and setting both public_suffix and is_public_suffix accordingly).
-            # However, in order to allow to investigate the situation, it's better not catch
-            # this exception. Our error handler turns it into a 503 error and makes sure
-            # admins are notified.
-            raise e
-
-        is_restricted_suffix = is_public_suffix and value not in settings.LOCAL_PUBLIC_SUFFIXES
-
-        # Generate a list of all domains connecting this one and its public suffix.
-        # If another user owns a zone with one of these names, then the requested
-        # domain is unavailable because it is part of the other user's zone.
-        private_components = value.rsplit(public_suffix, 1)[0].rstrip('.')
-        private_components = private_components.split('.') if private_components else []
-        private_components += [public_suffix]
-        private_domains = ['.'.join(private_components[i:]) for i in range(0, len(private_components) - 1)]
-        assert is_public_suffix or value == private_domains[0]
-
-        # Deny registration for non-local public suffixes and for domains covered by other users' zones
-        owner = self.context['request'].user
-        queryset = models.Domain.objects.filter(Q(name__in=private_domains) & ~Q(owner=owner))
-        if is_restricted_suffix or queryset.exists():
-            msg = 'This domain name is unavailable.'
-            raise serializers.ValidationError(msg, code='name_unavailable')
-
+        self.raise_if_domain_unavailable(value, self.context['request'].user)
         return value
 
+    @staticmethod
+    def raise_if_domain_unavailable(domain_name: str, user: models.User):
+        if not models.Domain.is_registrable(domain_name, user):
+            raise serializers.ValidationError('This domain name is unavailable.', code='name_unavailable')
+
 
 class DonationSerializer(serializers.ModelSerializer):
 
@@ -510,13 +484,17 @@ class UserSerializer(serializers.ModelSerializer):
 
 
 class RegisterAccountSerializer(UserSerializer):
-    domain = serializers.CharField(required=False)  # TODO Needs more validation
+    domain = serializers.CharField(required=False, validators=models.validate_domain_name)
 
     class Meta:
         model = UserSerializer.Meta.model
         fields = ('email', 'password', 'domain')
         extra_kwargs = UserSerializer.Meta.extra_kwargs
 
+    def validate_domain(self, value):
+        DomainSerializer.raise_if_domain_unavailable(value, self.context['request'].user)
+        return value
+
     def create(self, validated_data):
         validated_data.pop('domain', None)
         return super().create(validated_data)

+ 10 - 8
api/desecapi/tests/base.py

@@ -17,8 +17,7 @@ from rest_framework.reverse import reverse
 from rest_framework.test import APITestCase, APIClient
 from rest_framework.utils import json
 
-from desecapi.models import User, Domain, Token, RRset, RR
-from desecapi.serializers import DomainSerializer
+from desecapi.models import User, Domain, Token, RRset, RR, psl
 
 
 class DesecAPIClient(APIClient):
@@ -829,7 +828,7 @@ class PublicSuffixMockMixin():
 
     @staticmethod
     def _mock_is_public_suffix(name):
-        return name == DomainSerializer.psl.get_public_suffix(name)
+        return name == psl.get_public_suffix(name)
 
     def get_psl_context_manager(self, side_effect_parameter):
         if side_effect_parameter is None:
@@ -838,13 +837,16 @@ class PublicSuffixMockMixin():
         if callable(side_effect_parameter):
             side_effect = side_effect_parameter
         else:
-            side_effect = partial(self._mock_get_public_suffix, public_suffixes=[side_effect_parameter])
+            side_effect = partial(
+                self._mock_get_public_suffix,
+                public_suffixes=[side_effect_parameter] if not isinstance(side_effect_parameter, list) else list(side_effect_parameter)
+            )
 
-        return mock.patch.object(DomainSerializer.psl, 'get_public_suffix', side_effect=side_effect)
+        return mock.patch.object(psl, 'get_public_suffix', side_effect=side_effect)
 
     def setUpMockPatch(self):
-        mock.patch.object(DomainSerializer.psl, 'get_public_suffix', side_effect=self._mock_get_public_suffix).start()
-        mock.patch.object(DomainSerializer.psl, 'is_public_suffix', side_effect=self._mock_is_public_suffix).start()
+        mock.patch.object(psl, 'get_public_suffix', side_effect=self._mock_get_public_suffix).start()
+        mock.patch.object(psl, 'is_public_suffix', side_effect=self._mock_is_public_suffix).start()
         self.addCleanup(mock.patch.stopall)
 
 
@@ -899,7 +901,7 @@ class DomainOwnerTestCase(DesecTestCase, PublicSuffixMockMixin):
     def setUp(self):
         super().setUp()
         self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token.key)
-        PublicSuffixMockMixin.setUpMockPatch(self)
+        self.setUpMockPatch()
 
 
 class LockedDomainOwnerTestCase(DomainOwnerTestCase):

+ 98 - 1
api/desecapi/tests/test_domains.py

@@ -6,7 +6,104 @@ from rest_framework import status
 
 from desecapi.models import Domain
 from desecapi.pdns_change_tracker import PDNSChangeTracker
-from desecapi.tests.base import DesecTestCase, DomainOwnerTestCase
+from desecapi.tests.base import DesecTestCase, DomainOwnerTestCase, PublicSuffixMockMixin
+
+
+class IsRegistrableTestCase(DesecTestCase, PublicSuffixMockMixin):
+    """ Tests which domains can be registered by whom, depending on domain ownership and public suffix
+    configuration. Note that we use "global public suffix" to refer to public suffixes which appear on the
+    Internet-wide Public Suffix List (accessible, e.g., via psl_dns), and "local public suffix" to public
+    suffixes which are configured in the local Django settings.LOCAL_PUBLIC_SUFFIXES. Consequently, a
+    public suffix can be just local, just global, or both. """
+
+    def mock(self, global_public_suffixes, local_public_suffixes):
+        self.setUpMockPatch()
+        test_case = self
+
+        class _MockSuffixLists:
+
+            settings_mocker = None
+            psl_mocker = None
+
+            def __enter__(self):
+                self.settings_mocker = test_case.settings(LOCAL_PUBLIC_SUFFIXES=local_public_suffixes)
+                self.settings_mocker.__enter__()
+                self.psl_mocker = test_case.get_psl_context_manager(global_public_suffixes)
+                self.psl_mocker.__enter__()
+
+            def __exit__(self, exc_type, exc_val, exc_tb):
+                if exc_type or exc_val or exc_tb:
+                    raise exc_val
+                self.settings_mocker.__exit__(None, None, None)
+                self.psl_mocker.__exit__(None, None, None)
+
+        return _MockSuffixLists()
+
+    def assertRegistrable(self, domain_name, user=None):
+        """ Raises if the given user (fresh if None) cannot register the given domain name. """
+        self.assertTrue(Domain.is_registrable(domain_name, user or self.create_user()),
+                        f'{domain_name} was expected to be registrable for {user or "a new user"}, but wasn\'t.')
+
+    def assertNotRegistrable(self, domain_name, user=None):
+        """ Raises if the given user (fresh if None) can register the given domain name. """
+        self.assertFalse(Domain.is_registrable(domain_name, user or self.create_user()),
+                         f'{domain_name} was expected to be not registrable for {user or "a new user"}, but was.')
+
+    def test_cant_register_global_non_local_public_suffix(self):
+        with self.mock(
+            global_public_suffixes=['com', 'de', 'xxx', 'com.uk'],
+            local_public_suffixes=['something.else'],
+        ):
+            self.assertNotRegistrable('xxx')
+            self.assertNotRegistrable('com.uk')
+            self.assertRegistrable('something.else')
+
+    def test_can_register_local_public_suffix(self):
+        with self.mock(
+            global_public_suffixes=['com', 'de', 'xxx', 'com.uk'],
+            local_public_suffixes=['something.else', 'our.public.suffix', 'com', 'com.uk'],
+        ):
+            self.assertRegistrable('something.else')
+            self.assertRegistrable('out.public.suffix')
+            self.assertRegistrable('com')
+            self.assertRegistrable('com.uk')
+            self.assertRegistrable('foo.bar.com')
+
+    def test_cant_register_descendants_of_children_of_public_suffixes(self):
+        with self.mock(
+            global_public_suffixes={'public.suffix'},
+            local_public_suffixes={'public.suffix'},
+        ):
+            # let A own a.public.suffix
+            user_a = self.create_user()
+            self.assertRegistrable('a.public.suffix', user_a)
+            self.create_domain(owner=user_a, name='a.public.suffix')
+            # user B shall not register b.a.public.suffix, but A may
+            user_b = self.create_user()
+            self.assertNotRegistrable('b.a.public.suffix', user_b)
+            self.assertRegistrable('b.a.public.suffix', user_a)
+
+    def test_can_register_public_suffixes_under_private_domains(self):
+        with self.mock(
+            global_public_suffixes={'public.suffix'},
+            local_public_suffixes={'another.public.suffix.private.public.suffix', 'public.suffix'},
+        ):
+            # let A own public.suffix
+            user_a = self.create_user()
+            self.assertRegistrable('public.suffix', user_a)
+            self.create_domain(owner=user_a, name='public.suffix')
+            # user B may register private.public.suffix
+            user_b = self.create_user()
+            self.assertRegistrable('private.public.suffix', user_b)
+            self.create_domain(owner=user_b, name='private.public.suffix')
+            # user C may register b.another.public.suffix.private.public.suffix,
+            # or long.silly.prefix.another.public.suffix.private.public.suffix,
+            # but not b.private.public.suffix.
+            user_c = self.create_user()
+            self.assertRegistrable('b.another.public.suffix.private.public.suffix', user_c)
+            self.assertRegistrable('long.silly.prefix.another.public.suffix.private.public.suffix', user_c)
+            self.assertNotRegistrable('b.private.public.suffix', user_c)
+            self.assertRegistrable('b.private.public.suffix', user_b)
 
 
 class UnauthenticatedDomainTests(DesecTestCase):

+ 31 - 25
api/desecapi/tests/test_user_management.py

@@ -12,6 +12,7 @@ This involves testing five separate endpoints:
 (4) delete user endpoint, and
 (5) verify endpoint.
 """
+import random
 import re
 
 from django.core import mail
@@ -201,11 +202,18 @@ class UserManagementTestCase(DesecTestCase, PublicSuffixMockMixin):
         )
         self.assertEqual(response.data['password'][0].code, 'blank')
 
-    def assertRegistrationFailureDomainUnavailableResponse(self, response, domain, reason):
+    def assertRegistrationFailureDomainUnavailableResponse(self, response, domain):
         self.assertContains(
             response=response,
-            text=("The requested domain {} could not be registered (reason: {}). "
-                  "Please start over and sign up again.".format(domain, reason)),
+            text='This domain name is unavailable',
+            status_code=status.HTTP_400_BAD_REQUEST,
+            msg_prefix=str(response.data)
+        )
+
+    def assertRegistrationFailureDomainInvalidResponse(self, response, domain):
+        self.assertContains(
+            response=response,
+            text="Invalid value (not a DNS name)",
             status_code=status.HTTP_400_BAD_REQUEST,
             msg_prefix=str(response.data)
         )
@@ -324,34 +332,31 @@ class UserManagementTestCase(DesecTestCase, PublicSuffixMockMixin):
         self.assertPassword(email, password)
         return email, password
 
-    def _test_registration_with_domain(self, email=None, password=None, domain=None, expect_failure_reason=None):
+    def _test_registration_with_domain(self, email=None, password=None, domain=None, expect_failure_response=None):
         domain = domain or self.random_domain_name()
 
         email, password, response = self.register_user(email, password, domain=domain)
+        if expect_failure_response:
+            expect_failure_response(response, domain)
+            self.assertUserDoesNotExist(email)
+            return
         self.assertRegistrationSuccessResponse(response)
         self.assertUserExists(email)
         self.assertFalse(User.objects.get(email=email).is_active)
         self.assertPassword(email, password)
 
         confirmation_link = self.assertRegistrationEmail(email)
-        if expect_failure_reason is None:
-            if domain.endswith('.dedyn.io'):
-                cm = self.requests_desec_domain_creation_auto_delegation(domain)
-            else:
-                cm = self.requests_desec_domain_creation(domain)
-            with self.assertPdnsRequests(cm[:-1]):
-                response = self.client.verify(confirmation_link)
-            self.assertRegistrationWithDomainVerificationSuccessResponse(response, domain)
-            self.assertTrue(User.objects.get(email=email).is_active)
-            self.assertPassword(email, password)
-            self.assertTrue(Domain.objects.filter(name=domain, owner__email=email).exists())
-            return email, password, domain
+        if domain.endswith('.dedyn.io'):
+            cm = self.requests_desec_domain_creation_auto_delegation(domain)
         else:
-            domain_exists = Domain.objects.filter(name=domain).exists()
+            cm = self.requests_desec_domain_creation(domain)
+        with self.assertPdnsRequests(cm[:-1]):
             response = self.client.verify(confirmation_link)
-            self.assertRegistrationFailureDomainUnavailableResponse(response, domain, expect_failure_reason)
-            self.assertUserDoesNotExist(email)
-            self.assertEqual(Domain.objects.filter(name=domain).exists(), domain_exists)
+        self.assertRegistrationWithDomainVerificationSuccessResponse(response, domain)
+        self.assertTrue(User.objects.get(email=email).is_active)
+        self.assertPassword(email, password)
+        self.assertTrue(Domain.objects.filter(name=domain, owner__email=email).exists())
+        return email, password, domain
 
     def _test_login(self):
         token, response = self.login_user(self.email, self.password)
@@ -415,13 +420,14 @@ class NoUserAccountTestCase(UserLifeCycleTestCase):
         PublicSuffixMockMixin.setUpMockPatch(self)
         with self.get_psl_context_manager('.'):
             _, _, domain = self._test_registration_with_domain()
-            self._test_registration_with_domain(domain=domain, expect_failure_reason='unique')
-            self._test_registration_with_domain(domain='töö--', expect_failure_reason='invalid_domain_name')
+            self._test_registration_with_domain(domain=domain, expect_failure_response=self.assertRegistrationFailureDomainUnavailableResponse)
+            self._test_registration_with_domain(domain='töö--', expect_failure_response=self.assertRegistrationFailureDomainInvalidResponse)
 
         with self.get_psl_context_manager('co.uk'):
-            self._test_registration_with_domain(domain='co.uk', expect_failure_reason='name_unavailable')
-        with self.get_psl_context_manager('dedyn.io'):
-            self._test_registration_with_domain(domain=self.random_domain_name(suffix='dedyn.io'))
+            self._test_registration_with_domain(domain='co.uk', expect_failure_response=self.assertRegistrationFailureDomainUnavailableResponse)
+        local_public_suffix = random.sample(self.AUTO_DELEGATION_DOMAINS, 1)[0]
+        with self.get_psl_context_manager(local_public_suffix):
+            self._test_registration_with_domain(domain=self.random_domain_name(suffix=local_public_suffix))
 
     def test_registration_known_account(self):
         email, _ = self._test_registration()

+ 5 - 3
api/desecapi/views.py

@@ -80,16 +80,17 @@ class DomainList(generics.ListCreateAPIView):
         with PDNSChangeTracker():
             domain = serializer.save(**domain_kwargs)
 
+        # TODO this line raises if the local public suffix is not in our database!
         PDNSChangeTracker.track(lambda: self.auto_delegate(domain))
 
         # Send dyn email
-        if domain.name.endswith('.dedyn.io'):
+        if domain_is_local:
             content_tmpl = get_template('emails/domain-dyndns/content.txt')
             subject_tmpl = get_template('emails/domain-dyndns/subject.txt')
             from_tmpl = get_template('emails/from.txt')
             context = {
                 'domain': domain.name,
-                'url': 'https://update.dedyn.io/',
+                'url': f'https://update.{parent_domain_name}/',
                 'username': domain.name,
                 'password': self.request.auth.key
             }
@@ -115,7 +116,7 @@ class DomainDetail(IdempotentDestroy, generics.RetrieveUpdateDestroyAPIView):
     def perform_destroy(self, instance: models.Domain):
         with PDNSChangeTracker():
             instance.delete()
-        if instance.has_local_public_suffix():
+        if instance.is_locally_registrable():
             parent_domain = models.Domain.objects.get(name=instance.parent_domain_name())
             with PDNSChangeTracker():
                 parent_domain.update_delegation(instance)
@@ -591,6 +592,7 @@ class AuthenticatedActivateUserActionView(AuthenticatedActionView):
         domain = PDNSChangeTracker.track(lambda: serializer.save(owner=action.user))
 
         if domain.parent_domain_name() in settings.LOCAL_PUBLIC_SUFFIXES:
+            # TODO the following line raises Domain.DoesNotExist under unknown conditions
             PDNSChangeTracker.track(lambda: DomainList.auto_delegate(domain))
             token = models.Token.objects.create(user=action.user, name='dyndns')
             return Response({