Browse Source

feat(api): step-up tokens

Peter Thomassen 2 years ago
parent
commit
c3c6e82df6

+ 21 - 0
api/desecapi/migrations/0029_token_mfa.py

@@ -0,0 +1,21 @@
+# Generated by Django 4.1 on 2022-08-25 03:00
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        (
+            "desecapi",
+            "0028_authenticatedcreatetotpfactoruseraction_basefactor_and_more",
+        ),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name="token",
+            name="mfa",
+            field=models.BooleanField(default=None, null=True),
+        ),
+    ]

+ 1 - 0
api/desecapi/models/tokens.py

@@ -33,6 +33,7 @@ class Token(ExportModelOperationsMixin("Token"), rest_framework.authtoken.models
     user = models.ForeignKey("User", on_delete=models.CASCADE)
     name = models.CharField("Name", blank=True, max_length=64)
     last_used = models.DateTimeField(null=True, blank=True)
+    mfa = models.BooleanField(default=None, null=True)
     perm_manage_tokens = models.BooleanField(default=False)
     allowed_subnets = ArrayField(
         CidrAddressField(), default=_allowed_subnets_default.__func__

+ 43 - 0
api/desecapi/permissions.py

@@ -13,6 +13,49 @@ class IsActiveUser(permissions.BasePermission):
         return request.user and request.user.is_active
 
 
+class IsAPIToken(permissions.BasePermission):
+    """
+    Allows access only with API token (.mfa is None).
+    """
+
+    message = "API token required."
+    code = "api_token_required"
+
+    def has_permission(self, request, view):
+        return request.auth.mfa is None
+
+
+class IsLoginToken(permissions.BasePermission):
+    """
+    Allows access only with login token (.mfa is not None).
+
+    DRF permission negation is flawed, so ~IsAPIToken does not give the correct behavior:
+    https://github.com/encode/django-rest-framework/issues/6598#issuecomment-484824743
+    """
+
+    message = "Login token required."
+    code = "login_token_required"
+
+    def has_permission(self, request, view):
+        return request.auth.mfa is not None
+
+
+class MFARequiredIfEnabled(permissions.BasePermission):
+    """
+    Allows access only to when
+        - the token is a human token that has passed MFA, or
+        - the token is a human token that has not passed MFA, but the user has not enabled MFA at all.
+    """
+
+    message = "Multi-factor authentication required."
+    code = "mfa_required"
+
+    def has_permission(self, request, view):
+        return request.auth.mfa or (
+            request.auth.mfa is False and not request.user.mfa_enabled
+        )
+
+
 class IsOwner(permissions.BasePermission):
     """
     Custom permission to only allow owners of an object to view or edit it.

+ 1 - 0
api/desecapi/tests/test_tokens.py

@@ -142,6 +142,7 @@ class TokenPermittedTestCase(DomainOwnerTestCase):
                 data.get("perm_manage_tokens", False),
             )
             self.assertIsNone(response.data["last_used"])
+            self.assertIsNone(Token.objects.get(pk=response.data["id"]).mfa)
 
         self.assertEqual(
             len(Token.objects.filter(user=self.owner).all()), n + len(datas)

+ 106 - 15
api/desecapi/tests/test_totp.py

@@ -11,8 +11,54 @@ class TOTPFactorTestCase(DomainOwnerTestCase):
         super().setUp()
         # Make the token a log-in token
         self.token.perm_manage_tokens = True
+        self.token.mfa = False
         self.token.save()
 
+    def _assertTOTP(self, totp, name):
+        self.assertEqual(
+            totp.keys(), {"id", "created", "last_used", "name", "secret", "uri"}
+        )
+        self.assertEqual(totp["name"], name)
+        self.assertIsNone(totp["last_used"])
+        self.assertRegex(totp["secret"], r"^[A-Z0-9]{52}$")  # 32 bytes make 52 chars
+        self.assertRegex(
+            totp["uri"], r"^otpauth://totp/.*:.*[?]secret=[A-Z0-9]{52}&issuer=.*$"
+        )
+
+    def _decrement_timestep(self, offset):
+        factor = self.owner.basefactor_set.get().totpfactor
+        factor.last_verified_timestep -= offset
+        factor.save()
+
+    def _test_MFA_permission_status(self, assertion):
+        for method, view_names in {
+            self.client.get: [
+                "v1:account",
+                "v1:domain-list",
+                "v1:token-list",
+            ],
+            self.client.post: [
+                "v1:domain-list",
+                "v1:token-list",
+                "v1:totp-list",
+            ],
+        }.items():
+            for view_name in view_names:
+                response = method(self.reverse(view_name))
+                assertion(response.status_code, status.HTTP_403_FORBIDDEN)
+        for view_name in [
+            "v1:domain-detail",
+            "v1:rrsets",
+        ]:
+            for method in [self.client.get, self.client.post]:
+                response = method(self.reverse(view_name, name=self.my_domain))
+                assertion(response.status_code, status.HTTP_403_FORBIDDEN)
+        for method in [self.client.get, self.client.post]:
+            response = method(
+                self.reverse("v1:rrset@", name=self.my_domain, subname="", type="NS")
+            )
+            assertion(response.status_code, status.HTTP_403_FORBIDDEN)
+
     def test_workflow(self):
         # Request setting up TOTP factor
         self.client.post(self.reverse("v1:totp-list"))
@@ -33,18 +79,7 @@ class TOTPFactorTestCase(DomainOwnerTestCase):
         response = self.client.post(confirmation_link)
         self.assertResponse(response, status.HTTP_200_OK)
         totp = response.data
-        self.assertEqual(
-            totp.keys(), {"id", "created", "last_used", "name", "secret", "uri"}
-        )
-        self.assertEqual(totp["name"], "")
-        self.assertIsNone(totp["last_used"])
-        self.assertRegex(totp["secret"], r"^[A-Z0-9]{52}$")  # 32 bytes make 52 chars
-        self.assertResponse(
-            self.assertRegex(
-                totp["uri"],
-                r"^otpauth://totp/.*:Secret[?]secret=[A-Z0-9]{52}&issuer=.*$",
-            )
-        )
+        self._assertTOTP(totp, "")
         self.assertEqual(
             self.owner.basefactor_set.get().totpfactor.last_verified_timestep, 0
         )
@@ -105,12 +140,28 @@ class TOTPFactorTestCase(DomainOwnerTestCase):
         # Anonymous verification only allowed for activation
         response = self.client.post(url, {"code": authenticator.at(now)})
         self.assertResponse(response, status.HTTP_401_UNAUTHORIZED)
+
+        # Token has not yet passed MFA, but GET'ing TOTP list is allowed
+        self.assertTrue(self.token.mfa == False)  # assertFalse also allows None
         self.client.credentials(HTTP_AUTHORIZATION="Token " + self.token.plain)
+        response = self.client.get(self.reverse("v1:totp-list"))
+        self.assertResponse(response, status.HTTP_200_OK)
+
+        # MFA required
+        self._test_MFA_permission_status(self.assertEqual)
+
+        # Verification sets token step-up
+        self._decrement_timestep(1)
+        response = self.client.post(url, {"code": authenticator.at(now)})
+        self.assertResponse(response, status.HTTP_200_OK)
+        self.token.refresh_from_db()
+        self.assertTrue(self.token.mfa)
+
+        # MFA passed
+        self._test_MFA_permission_status(self.assertNotEqual)
 
         # Graceful validation window
-        factor = self.owner.basefactor_set.get().totpfactor
-        factor.last_verified_timestep -= 2
-        factor.save()
+        self._decrement_timestep(2)
         window_codes = [authenticator.at(now + i * step) for i in (-1, 0, 1)]
         for code in window_codes:
             response = self.client.post(url, {"code": code})
@@ -124,3 +175,43 @@ class TOTPFactorTestCase(DomainOwnerTestCase):
             self.assertResponse(
                 response, status.HTTP_400_BAD_REQUEST, {"code": ["Invalid code."]}
             )
+
+        # Additional token needs unique name
+        response = self.client.post(self.reverse("v1:totp-list"))
+        self.assertResponse(
+            response,
+            status.HTTP_400_BAD_REQUEST,
+            {
+                "non_field_errors": [
+                    "An authentication factor with this name already exists."
+                ]
+            },
+        )
+
+        # When MFA is enabled, new TOTP factors are returned directly
+        response = self.client.post(self.reverse("v1:totp-list"), data={"name": "test"})
+        self.assertResponse(response, status.HTTP_201_CREATED)
+        totp2 = response.data
+        self._assertTOTP(totp2, "test")
+
+        # Activation of additional factor doesn't change user state
+        credentials_changed = self.owner.credentials_changed
+        authenticator = TOTP(totp2["secret"], digits=6)
+        url = self.reverse("v1:totp-detail", pk=totp2["id"]) + "verify/"
+        response = self.client.post(url, {"code": authenticator.at(now)})
+        self.assertResponse(
+            response,
+            status.HTTP_200_OK,
+            {"detail": "Your TOTP token has been activated!"},
+        )
+        self.owner.refresh_from_db()
+        self.assertEqual(self.owner.credentials_changed, credentials_changed)
+
+        # Removal disables MFA
+        self.assertTrue(self.owner.mfa_enabled)
+        for pk in (totp["id"], totp2["id"]):
+            url = self.reverse("v1:totp-detail", pk=pk)
+            self.client.delete(url)
+        self.owner.refresh_from_db()
+        self.assertFalse(self.owner.mfa_enabled)
+        self.assertGreater(self.owner.credentials_changed, credentials_changed)

+ 5 - 1
api/desecapi/views/domains.py

@@ -33,7 +33,11 @@ class DomainViewSet(
 
     @property
     def permission_classes(self):
-        ret = [IsAuthenticated, permissions.IsOwner]
+        ret = [
+            IsAuthenticated,
+            permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
+            permissions.IsOwner,
+        ]
         if self.action == "create":
             ret.append(permissions.WithinDomainLimit)
         if self.action == "zonefile":

+ 25 - 10
api/desecapi/views/mfa.py

@@ -1,7 +1,7 @@
 from rest_framework import status, viewsets
 from rest_framework.decorators import action
 from rest_framework.exceptions import NotAuthenticated
-from rest_framework.permissions import AllowAny, IsAuthenticated
+from rest_framework.permissions import AllowAny, IsAuthenticated, SAFE_METHODS
 from rest_framework.response import Response
 
 from desecapi import permissions
@@ -22,7 +22,12 @@ class TOTPViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
     def permission_classes(self):
         if self.action == "verify":
             return [AllowAny]  # temporary for anonymous activation
-        return [IsAuthenticated, permissions.HasManageTokensPermission]
+        ret = [IsAuthenticated, permissions.HasManageTokensPermission]
+        if self.request.method not in SAFE_METHODS and self.action != "verify":
+            ret.append(permissions.MFARequiredIfEnabled)
+        else:
+            ret.append(permissions.IsLoginToken)
+        return ret
 
     def get_queryset(self):
         qs = self.serializer_class.Meta.model.objects
@@ -32,14 +37,20 @@ class TOTPViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
             return qs.filter(user=self.request.user)
 
     def create(self, request, *args, **kwargs):
-        super().create(request, *args, **kwargs)
-        message = "This operation requires manual confirmation. Please check your mailbox for instructions!"
-        return Response(data={"detail": message}, status=status.HTTP_202_ACCEPTED)
-
-    def perform_create(self, serializer):
-        AuthenticatedCreateTOTPFactorUserActionSerializer.build_and_save(
-            user=self.request.user, name=serializer.validated_data.get("name", "")
-        )
+        serializer = self.get_serializer(data=request.data, include_secret=True)
+        serializer.is_valid(raise_exception=True)
+        if request.user.mfa_enabled:
+            serializer.save(user=request.user)
+            headers = self.get_success_headers(serializer.data)
+            return Response(
+                serializer.data, status=status.HTTP_201_CREATED, headers=headers
+            )
+        else:
+            AuthenticatedCreateTOTPFactorUserActionSerializer.build_and_save(
+                user=request.user, name=serializer.validated_data.get("name", "")
+            )
+            message = "This operation requires confirmation. Please check your mailbox for instructions!"
+            return Response(data={"detail": message}, status=status.HTTP_202_ACCEPTED)
 
     @action(detail=True, methods=["post"])
     def verify(self, request, pk=None):
@@ -53,6 +64,10 @@ class TOTPViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
         )
         serializer.is_valid(raise_exception=True)
 
+        if authenticated and not new:  # don't step up during activation
+            request.auth.mfa = True  # Step-up token
+            request.auth.save()
+
         message = (
             "Your TOTP token has been activated!" if new else "The code was correct."
         )

+ 1 - 0
api/desecapi/views/records.py

@@ -38,6 +38,7 @@ class RRsetView(DomainViewMixin):
     serializer_class = RRsetSerializer
     permission_classes = (
         IsAuthenticated,
+        permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
         permissions.IsDomainOwner,
         permissions.TokenHasDomainRRsetsPermission,
     )

+ 6 - 1
api/desecapi/views/tokens.py

@@ -18,6 +18,7 @@ class TokenViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
     serializer_class = TokenSerializer
     permission_classes = (
         IsAuthenticated,
+        permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
         permissions.HasManageTokensPermission,
     )
     throttle_scope = "account_management_passive"
@@ -38,6 +39,7 @@ class TokenViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
 class TokenPoliciesRoot(APIView):
     permission_classes = [
         IsAuthenticated,
+        permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
         permissions.HasManageTokensPermission
         | permissions.AuthTokenCorrespondsToViewToken,
     ]
@@ -61,7 +63,10 @@ class TokenDomainPolicyViewSet(IdempotentDestroyMixin, viewsets.ModelViewSet):
 
     @property
     def permission_classes(self):
-        ret = [IsAuthenticated]
+        ret = [
+            IsAuthenticated,
+            permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
+        ]
         if self.request.method in SAFE_METHODS:
             ret.append(
                 permissions.HasManageTokensPermission

+ 2 - 1
api/desecapi/views/users.py

@@ -55,6 +55,7 @@ class AccountCreateView(generics.CreateAPIView):
 class AccountView(generics.RetrieveUpdateAPIView):
     permission_classes = (
         IsAuthenticated,
+        permissions.IsAPIToken | permissions.MFARequiredIfEnabled,
         permissions.TokenNoDomainPolicy,
     )
     serializer_class = serializers.UserSerializer
@@ -100,10 +101,10 @@ class AccountLoginView(generics.GenericAPIView):
         user = self.request.user
         token = Token.objects.create(
             user=user,
-            name="login",
             perm_manage_tokens=True,
             max_age=timedelta(days=7),
             max_unused_period=timedelta(hours=1),
+            mfa=False,
         )
         user_logged_in.send(sender=user.__class__, request=self.request, user=user)