Browse Source

feat(api): require locally generated captcha for user registration

Nils Wisiol 5 years ago
parent
commit
a7d54148a9

+ 3 - 0
api/api/settings.py

@@ -155,6 +155,9 @@ LIMIT_USER_DOMAIN_COUNT_DEFAULT = 5
 USER_ACTIVATION_REQUIRED = True
 VALIDITY_PERIOD_VERIFICATION_SIGNATURE = timedelta(hours=12)
 
+# CAPTCHA
+CAPTCHA_VALIDITY_PERIOD = timedelta(hours=24)
+
 if DEBUG and not EMAIL_HOST:
     EMAIL_BACKEND = 'django.core.mail.backends.dummy.EmailBackend'
 

+ 1 - 1
api/cronhook/crontab

@@ -1 +1 @@
-55 2 * * * /usr/local/bin/python3 /usr/src/app/manage.py privacy-chores >> /var/log/cron.log 2>&1
+55 2 * * * /usr/local/bin/python3 /usr/src/app/manage.py chores >> /var/log/cron.log 2>&1

+ 12 - 0
api/desecapi/management/commands/chores.py

@@ -0,0 +1,12 @@
+from django.conf import settings
+from django.core.management import BaseCommand
+from django.utils import timezone
+
+from desecapi.models import Captcha
+
+
+class Command(BaseCommand):
+
+    def handle(self, *args, **kwargs):
+        # delete expired captchas
+        Captcha.objects.filter(created__lt=timezone.now() - settings.CAPTCHA_VALIDITY_PERIOD).delete()

+ 0 - 7
api/desecapi/management/commands/privacy-chores.py

@@ -1,7 +0,0 @@
-from django.core.management import BaseCommand
-
-
-class Command(BaseCommand):
-
-    def handle(self, *args, **kwargs):
-        pass

+ 24 - 0
api/desecapi/migrations/0008_captcha.py

@@ -0,0 +1,24 @@
+# Generated by Django 2.2.1 on 2019-09-22 10:26
+
+import desecapi.models
+from django.db import migrations, models
+import django.db.models.deletion
+import uuid
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ('desecapi', '0007_remove_user_registration_remote_ip'),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name='Captcha',
+            fields=[
+                ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
+                ('created', models.DateTimeField(auto_now_add=True)),
+                ('content', models.CharField(default=desecapi.models.captcha_default_content, max_length=24)),
+            ],
+        ),
+    ]

+ 25 - 4
api/desecapi/models.py

@@ -3,6 +3,8 @@ from __future__ import annotations
 import json
 import logging
 import random
+import secrets
+import string
 import time
 import uuid
 from base64 import b64encode
@@ -421,10 +423,6 @@ class RR(models.Model):
         return '<RR %s>' % self.content
 
 
-def authenticated_action_default_timestamp():
-    return int(datetime.timestamp(datetime.now()))
-
-
 class AuthenticatedAction(models.Model):
     """
     Represents a procedure call on a defined set of arguments.
@@ -604,3 +602,26 @@ class AuthenticatedDeleteUserAction(AuthenticatedUserAction):
 
     def act(self):
         self.user.delete()
+
+
+def captcha_default_content():
+    alphabet = (string.ascii_uppercase + string.digits).translate({ord(c): None for c in 'IO0'})
+    return ''.join([secrets.choice(alphabet) for _ in range(5)])
+
+
+class Captcha(models.Model):
+    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
+    created = models.DateTimeField(auto_now_add=True)
+    content = models.CharField(
+        max_length=24,
+        default=captcha_default_content,
+    )
+
+    def verify(self, solution: str):
+        age = timezone.now() - self.created
+        self.delete()
+        return (
+            str(solution).upper().strip() == self.content  # solution correct
+            and
+            age <= settings.CAPTCHA_VALIDITY_PERIOD  # not expired
+        )

+ 33 - 2
api/desecapi/serializers.py

@@ -1,8 +1,9 @@
 import binascii
 import json
 import re
-from base64 import urlsafe_b64decode, urlsafe_b64encode
+from base64 import urlsafe_b64decode, urlsafe_b64encode, b64encode
 
+from captcha.image import ImageCaptcha
 from django.core.validators import MinValueValidator
 from django.db.models import Model, Q
 from rest_framework import serializers
@@ -15,6 +16,34 @@ from api import settings
 from desecapi import models
 
 
+class CaptchaSerializer(serializers.ModelSerializer):
+    challenge = serializers.SerializerMethodField()
+
+    class Meta:
+        model = models.Captcha
+        fields = ('id', 'challenge') if not settings.DEBUG else ('id', 'challenge', 'content')
+
+    def get_challenge(self, obj: models.Captcha):
+        # TODO Does this need to be stored in the object instance, in case this method gets called twice?
+        challenge = ImageCaptcha().generate(obj.content).getvalue()
+        return b64encode(challenge)
+
+
+class CaptchaSolutionSerializer(serializers.Serializer):
+    id = serializers.PrimaryKeyRelatedField(
+        queryset=models.Captcha.objects.all(),
+        error_messages={'does_not_exist': 'CAPTCHA does not exist.'}
+    )
+    solution = serializers.CharField(write_only=True, required=True)
+
+    def validate(self, attrs):
+        captcha = attrs['id']  # Note that this already is the Captcha object
+        if not captcha.verify(attrs['solution']):
+            raise serializers.ValidationError('CAPTCHA could not be validated. Please obtain a new one and try again.')
+
+        return attrs
+
+
 class TokenSerializer(serializers.ModelSerializer):
     auth_token = serializers.ReadOnlyField(source='key')
     # note this overrides the original "id" field, which is the db primary key
@@ -485,10 +514,11 @@ class UserSerializer(serializers.ModelSerializer):
 
 class RegisterAccountSerializer(UserSerializer):
     domain = serializers.CharField(required=False, validators=models.validate_domain_name)
+    captcha = CaptchaSolutionSerializer(required=True)
 
     class Meta:
         model = UserSerializer.Meta.model
-        fields = ('email', 'password', 'domain')
+        fields = ('email', 'password', 'domain', 'captcha')
         extra_kwargs = UserSerializer.Meta.extra_kwargs
 
     def validate_domain(self, value):
@@ -497,6 +527,7 @@ class RegisterAccountSerializer(UserSerializer):
 
     def create(self, validated_data):
         validated_data.pop('domain', None)
+        validated_data.pop('captcha', None)
         return super().create(validated_data)
 
 

+ 0 - 73
api/desecapi/tests/test_authentication.py

@@ -46,79 +46,6 @@ class DynUpdateAuthenticationTestCase(DynDomainOwnerTestCase):
             self.assertDynDNS12Status(authorization=authorization, code=HTTP_401_UNAUTHORIZED)
 
 
-class SignUpLoginTestCase(DesecTestCase):
-
-    EMAIL = None
-    PASSWORD = None
-
-    REGISTRATION_ENDPOINT = None
-    LOGIN_ENDPOINT = None
-
-    REGISTRATION_STATUS = status.HTTP_202_ACCEPTED
-    LOGIN_STATUS = status.HTTP_200_OK
-
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.EMAIL = self.random_username()
-        self.PASSWORD = self.random_password()
-        if not self.REGISTRATION_ENDPOINT:
-            self.REGISTRATION_ENDPOINT = self.reverse('v1:register')
-        if not self.LOGIN_ENDPOINT:
-            self.LOGIN_ENDPOINT = self.reverse('v1:login')
-
-    def sign_up(self):
-        self.assertStatus(
-            self.client.post(self.REGISTRATION_ENDPOINT, {
-                'email': self.EMAIL,
-                'password': self.PASSWORD,
-            }),
-            self.REGISTRATION_STATUS
-        )
-
-    def activate(self):
-        total = 1
-        self.assertEqual(len(mail.outbox), total, "Expected %i message in the outbox, but found %i." %
-                         (total, len(mail.outbox)))
-        email = mail.outbox[-1]
-        self.assertTrue('Welcome' in email.subject)
-        confirmation_link = re.search(r'following link:\s+([^\s]*)', email.body).group(1)
-        self.client.get(confirmation_link)
-
-    def log_in(self):
-        response = self.client.post(self.LOGIN_ENDPOINT, {
-            'email': self.EMAIL,
-            'password': self.PASSWORD,
-        })
-        self.assertContains(response, "auth_token", status_code=self.LOGIN_STATUS)
-
-    def test_sign_up(self):
-        self.sign_up()
-        self.assertFalse(User.objects.get(email=self.EMAIL).is_active)
-
-    def test_activate(self):
-        self.sign_up()
-        self.activate()
-        self.assertTrue(User.objects.get(email=self.EMAIL).is_active)
-
-    def test_log_in(self):
-        self.sign_up()
-        self.activate()
-        self.log_in()
-
-    def test_log_in_twice(self):
-        self.sign_up()
-        self.activate()
-        self.log_in()
-        self.log_in()
-
-    def test_log_in_two_tokens(self):
-        self.sign_up()
-        self.activate()
-        for _ in range(2):
-            Token.objects.create(user=User.objects.get(email=self.EMAIL))
-        self.log_in()
-
-
 class TokenAuthenticationTestCase(DynDomainOwnerTestCase):
 
     def _get_domains(self):

+ 94 - 0
api/desecapi/tests/test_captcha.py

@@ -0,0 +1,94 @@
+from base64 import b64decode
+from io import BytesIO
+from unittest import mock
+
+from PIL import Image
+from django.utils import timezone
+from django.test import TestCase
+from rest_framework import status
+from rest_framework.reverse import reverse
+from rest_framework.test import APIClient
+
+from api import settings
+from desecapi.models import Captcha
+from desecapi.serializers import CaptchaSerializer, CaptchaSolutionSerializer
+from desecapi.tests.base import DesecTestCase
+
+
+class CaptchaClient(APIClient):
+
+    def obtain(self, **kwargs):
+        return self.post(reverse('v1:captcha'))
+
+
+class CaptchaModelTestCase(TestCase):
+    captcha_class = Captcha
+
+    def test_random_initialization(self):
+        captcha = [self.captcha_class() for _ in range(2)]
+        self.assertNotEqual(captcha[0].content, None)
+        self.assertNotEqual(captcha[0].content, '')
+        self.assertNotEqual(captcha[0].content, captcha[1].content)
+
+
+class CaptchaModelTestCase(TestCase):
+    captcha_class = Captcha
+
+    def test_valid_png(self):
+        for _ in range(10):
+            # use the show method on the Image object to see the actual image during test run
+            # This also allows an impression of how the CAPTCHAs will look like.
+            serializer = CaptchaSerializer(instance=Captcha())
+            challenge = b64decode(serializer.data['challenge'])
+            Image.open(BytesIO(challenge))  # .show()
+
+    def test_verify_solution(self):
+        for _ in range(10):
+            c = self.captcha_class.objects.create()
+            self.assertFalse(c.verify('likely the wrong solution!'))
+            c = self.captcha_class.objects.create()
+            self.assertTrue(c.verify(c.content))
+
+
+class CaptchaWorkflowTestCase(DesecTestCase):
+    client_class = CaptchaClient
+    captcha_class = Captcha
+    serializer_class = CaptchaSolutionSerializer
+
+    def verify(self, id, solution):
+        """
+        Given unsafe (user-input) id and solution, this is how the CAPTCHA module expects you to
+        verify that id and solution are valid.
+        :param id: unsafe ID
+        :param solution: unsafe proposed solution
+        :return: whether the id/solution pair is correct
+        """
+        # use the serializer to validate the solution; id is validated implicitly on DB lookup
+        return self.serializer_class(data={'id': id, 'solution': solution}).is_valid()
+
+    def test_obtain(self):
+        response = self.client.obtain()
+        self.assertContains(response, 'id', status_code=status.HTTP_201_CREATED)
+        self.assertContains(response, 'challenge', status_code=status.HTTP_201_CREATED)
+        self.assertTrue('content' not in response.data)
+        self.assertTrue(len(response.data) == 2)
+        self.assertEqual(self.captcha_class.objects.all().count(), 1)
+        # use the value of f'<img src="data:image/png;base64,{response.data["challenge"].decode()}" />'
+        # to display the CAPTCHA in a browser
+
+    def test_verify_correct(self):
+        id = self.client.obtain().data['id']
+        correct_solution = Captcha.objects.get(id=id).content
+        self.assertTrue(self.verify(id, correct_solution))
+
+    def test_verify_incorrect(self):
+        id = self.client.obtain().data['id']
+        wrong_solution = 'most certainly wrong!'
+        self.assertFalse(self.verify(id, wrong_solution))
+
+    def test_expired(self):
+        id = self.client.obtain().data['id']
+        correct_solution = Captcha.objects.get(id=id).content
+
+        with mock.patch('desecapi.models.timezone.now', return_value=timezone.now() + settings.CAPTCHA_VALIDITY_PERIOD):
+            self.assertFalse(self.verify(id, correct_solution))

+ 30 - 3
api/desecapi/tests/test_user_management.py

@@ -21,16 +21,17 @@ from rest_framework.reverse import reverse
 from rest_framework.test import APIClient
 
 from api import settings
-from desecapi.models import Domain, User
+from desecapi.models import Domain, User, Captcha
 from desecapi.tests.base import DesecTestCase, PublicSuffixMockMixin
 
 
 class UserManagementClient(APIClient):
 
-    def register(self, email, password, **kwargs):
+    def register(self, email, password, captcha_id, captcha_solution, **kwargs):
         return self.post(reverse('v1:register'), {
             'email': email,
             'password': password,
+            'captcha': {'id': captcha_id, 'solution': captcha_solution},
             **kwargs
         })
 
@@ -65,6 +66,9 @@ class UserManagementClient(APIClient):
     def verify(self, url, **kwargs):
         return self.post(url, kwargs) if kwargs else self.get(url)
 
+    def obtain_captcha(self, **kwargs):
+        return self.post(reverse('v1:captcha'))
+
 
 class UserManagementTestCase(DesecTestCase, PublicSuffixMockMixin):
 
@@ -72,10 +76,17 @@ class UserManagementTestCase(DesecTestCase, PublicSuffixMockMixin):
     password = None
     token = None
 
+    def get_captcha(self):
+        data = self.client.obtain_captcha().data
+        id = data['id']
+        solution = Captcha.objects.get(id=id).content
+        return id, solution
+
     def register_user(self, email=None, password=None, **kwargs):
         email = email if email is not None else self.random_username()
         password = password if password is not None else self.random_password()
-        return email.strip(), password, self.client.register(email, password, **kwargs)
+        captcha_id, captcha_solution = self.get_captcha()
+        return email.strip(), password, self.client.register(email, password, captcha_id, captcha_solution, **kwargs)
 
     def login_user(self, email, password):
         response = self.client.login_user(email, password)
@@ -218,6 +229,14 @@ class UserManagementTestCase(DesecTestCase, PublicSuffixMockMixin):
             msg_prefix=str(response.data)
         )
 
+    def assertRegistrationFailureCaptchaInvalidResponse(self, response):
+        self.assertContains(
+            response=response,
+            text='CAPTCHA could not be validated. Please obtain a new one and try again.',
+            status_code=status.HTTP_400_BAD_REQUEST,
+            msg_prefix=str(response.data)
+        )
+
     def assertRegistrationVerificationSuccessResponse(self, response):
         return self.assertContains(
             response=response,
@@ -454,6 +473,14 @@ class NoUserAccountTestCase(UserLifeCycleTestCase):
             )
             self.assertNoEmailSent()
 
+    def test_registration_wrong_captcha(self):
+        email = self.random_username()
+        password = self.random_password()
+        captcha_id, _ = self.get_captcha()
+        self.assertRegistrationFailureCaptchaInvalidResponse(
+            self.client.register(email, password, captcha_id, 'this is most definitely not a correct CAPTCHA solution')
+        )
+
 
 class OtherUserAccountTestCase(UserManagementTestCase):
 

+ 3 - 0
api/desecapi/urls/version_1.py

@@ -46,6 +46,9 @@ api_urls = [
     path('v/change-email/<code>/', views.AuthenticatedChangeEmailUserActionView.as_view(), name='confirm-change-email'),
     path('v/reset-password/<code>/', views.AuthenticatedResetPasswordUserActionView.as_view(), name='confirm-reset-password'),
     path('v/delete-account/<code>/', views.AuthenticatedDeleteUserActionView.as_view(), name='confirm-delete-account'),
+
+    # CAPTCHA
+    path('captcha/', views.CaptchaView.as_view(), name='captcha'),
 ]
 
 app_name = 'desecapi'

+ 6 - 0
api/desecapi/views.py

@@ -409,8 +409,10 @@ class AccountCreateView(generics.CreateAPIView):
             if e.detail:
                 raise e
         else:
+            # create user
             user = serializer.save(is_active=(not activation_required))
 
+            # send email if needed
             domain = serializer.validated_data.get('domain')
             if domain or activation_required:
                 action = models.AuthenticatedActivateUserAction(user=user, domain=domain)
@@ -630,3 +632,7 @@ class AuthenticatedDeleteUserActionView(AuthenticatedActionView):
 
     def finalize(self):
         return Response({'detail': 'All your data has been deleted. Bye bye, see you soon! <3'})
+
+
+class CaptchaView(generics.CreateAPIView):
+    serializer_class = serializers.CaptchaSerializer

+ 1 - 0
api/requirements.txt

@@ -7,3 +7,4 @@ mysqlclient~=1.4.0
 psl-dns~=1.0rc2
 requests~=2.21.0
 uwsgi~=2.0.0
+captcha~=0.3.0

+ 8 - 1
test/e2e/spec/api_spec.js

@@ -46,7 +46,10 @@ describe("API v1", function () {
             baseUrl: 'https://www/api/v1',
         })
 
-        let credentials = {"email":"admin@e2etest.local", "password": "password"};
+        let credentials = {
+            "email":"admin@e2etest.local", "password": "password",
+            "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
+        };
         return chakram.post('/auth/', credentials).then(function() {
             chakram.post('/auth/login/', credentials).then(function (response) {
                 let config = {headers: {'Authorization': 'Token ' + response.body.auth_token}}
@@ -75,6 +78,7 @@ describe("API v1", function () {
             var response = chakram.post('/auth/', {
                 "email": email,
                 "password": password,
+                "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
             });
 
             return expect(response).to.have.status(202);
@@ -94,6 +98,7 @@ describe("API v1", function () {
             var response = chakram.post('/auth/', {
                 "email": email,
                 "password": password,
+                "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
             });
 
             return expect(response).to.have.status(202);
@@ -119,6 +124,7 @@ describe("API v1", function () {
                 return chakram.post('/auth/', {
                     "email": email2,
                     "password": password2,
+                    "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
                 }).then(function () {
                     return chakram.post('/auth/login/', {
                         "email": email2,
@@ -179,6 +185,7 @@ describe("API v1", function () {
             return chakram.post('/auth/', {
                 "email": email,
                 "password": password,
+                "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
             }).then(function () {
                 return chakram.post('/auth/login/', {
                     "email": email,

+ 1 - 0
test/e2e/spec/dyndns_spec.js

@@ -27,6 +27,7 @@ describe("dyndns service", function () {
             return chakram.post('/auth/', {
                 "email": email,
                 "password": password,
+                "captcha": {"id": "d7b5739e-9e14-40df-ac4a-1973777def5e", "solution": "no need for a solution when Django's DEBUG=True"},
             }).then(function () {
                 return chakram.post('/auth/login/', {
                     "email": email,