from datetime import timedelta import json from unittest import mock from django.utils import timezone from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED from desecapi.models import Token from desecapi.tests.base import DynDomainOwnerTestCase class DynUpdateAuthenticationTestCase(DynDomainOwnerTestCase): NUM_OWNED_DOMAINS = 1 def _get_dyndns12(self): with self.assertPdnsNoRequestsBut(self.requests_desec_rr_sets_update()): return self.client.get(self.reverse("v1:dyndns12update")) def assertDynDNS12Status(self, code=HTTP_200_OK, authorization=None): if authorization: self.client.set_credentials_basic_auth(authorization) self.assertStatus(self._get_dyndns12(), code) def test_username_password(self): # noinspection PyPep8Naming def assertDynDNS12AuthenticationStatus(username, token, code): self.client.set_credentials_basic_auth(username, token) self.assertDynDNS12Status(code) assertDynDNS12AuthenticationStatus("", self.token.plain, HTTP_200_OK) assertDynDNS12AuthenticationStatus( self.owner.get_username(), self.token.plain, HTTP_200_OK ) assertDynDNS12AuthenticationStatus( self.my_domain.name, self.token.plain, HTTP_200_OK ) assertDynDNS12AuthenticationStatus( " " + self.my_domain.name, self.token.plain, HTTP_401_UNAUTHORIZED ) assertDynDNS12AuthenticationStatus( "wrong", self.token.plain, HTTP_401_UNAUTHORIZED ) assertDynDNS12AuthenticationStatus("", "wrong", HTTP_401_UNAUTHORIZED) assertDynDNS12AuthenticationStatus( self.user.get_username(), "wrong", HTTP_401_UNAUTHORIZED ) def test_malformed_basic_auth(self): for authorization in [ "asdf:asdf:sadf", "asdf", "bull[%]shit", "你好", "💩💩💩💩", "💩💩:💩💩", ]: self.assertDynDNS12Status( authorization=authorization, code=HTTP_401_UNAUTHORIZED ) class TokenAuthenticationTestCase(DynDomainOwnerTestCase): def setUp(self): super().setUp() # Refresh token from database, but keep plain value self.token, self.token.plain = ( Token.objects.get(pk=self.token.pk), self.token.plain, ) def assertAuthenticationStatus(self, code, plain=None, expired=False, **kwargs): plain = plain or self.token.plain self.client.set_credentials_token_auth(plain) # only forward REMOTE_ADDR if not None if kwargs.get("REMOTE_ADDR") is None: kwargs.pop("REMOTE_ADDR", None) response = self.client.get(self.reverse("v1:root"), **kwargs) body = ( json.dumps({"detail": "Invalid token."}) if code == HTTP_401_UNAUTHORIZED else None ) self.assertResponse(response, code, body) if expired: key = Token.make_hash(plain) self.assertFalse(Token.objects.get(key=key).is_valid) def test_token_case_sensitive(self): self.assertAuthenticationStatus(HTTP_200_OK) self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, self.token.plain.upper()) self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, self.token.plain.lower()) def test_token_subnets(self): datas = [ # Format: allowed_subnets, status, client_ip | None, [client_ip, ...] ([], HTTP_401_UNAUTHORIZED, None), (["127.0.0.1"], HTTP_200_OK, None), (["1.2.3.4"], HTTP_401_UNAUTHORIZED, None), (["1.2.3.4"], HTTP_200_OK, "1.2.3.4"), (["1.2.3.0/24"], HTTP_200_OK, "1.2.3.4"), (["1.2.3.0/24"], HTTP_401_UNAUTHORIZED, "bade::affe"), (["bade::/64"], HTTP_200_OK, "bade::affe"), (["bade::/64", "1.2.3.0/24"], HTTP_200_OK, "bade::affe", "1.2.3.66"), ] for allowed_subnets, status, client_ips in ( (*data[:2], data[2:]) for data in datas ): self.token.allowed_subnets = allowed_subnets self.token.save() for client_ip in client_ips: self.assertAuthenticationStatus(status, REMOTE_ADDR=client_ip) def test_token_max_age(self): # No maximum age: can use now and in ten years self.token.max_age = None self.token.save() self.assertAuthenticationStatus(HTTP_200_OK) with mock.patch( "desecapi.models.timezone.now", return_value=timezone.now() + timedelta(days=3650), ): self.assertAuthenticationStatus(HTTP_200_OK) # Maximum age zero: token cannot be used self.token.max_age = timedelta(0) self.token.save() self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True) # Maximum age 10 10:10:10: can use one second before, but not once second after period = timedelta(days=10, hours=10, minutes=10, seconds=10) self.token.max_age = period self.token.save() second = timedelta(seconds=1) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + period - second, ): self.assertAuthenticationStatus(HTTP_200_OK) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + period + second, ): self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True) def test_token_max_unused_period(self): plain = self.token.plain second = timedelta(seconds=1) # Maximum unused period zero: token cannot be used self.token.max_unused_period = timedelta(0) self.token.save() self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True) # Maximum unused period period = timedelta(days=10, hours=10, minutes=10, seconds=10) self.token.max_unused_period = period self.token.save() # Can't use after period if token was never used (last_used is None) self.assertIsNone(self.token.last_used) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + period + second, ): self.assertAuthenticationStatus( HTTP_401_UNAUTHORIZED, plain=plain, expired=True ) self.assertIsNone( Token.objects.get(pk=self.token.pk).last_used ) # unchanged # Can use after half the period with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + period / 2 ): self.assertAuthenticationStatus(HTTP_200_OK, plain=plain) self.token = Token.objects.get(pk=self.token.pk) # update last_used field # Can't use once another period is over with mock.patch( "desecapi.models.timezone.now", return_value=self.token.last_used + period + second, ): self.assertAuthenticationStatus( HTTP_401_UNAUTHORIZED, plain=plain, expired=True ) self.assertEqual( self.token.last_used, Token.objects.get(pk=self.token.pk).last_used ) # unchanged # ... but one second before, and also for one more period with mock.patch( "desecapi.models.timezone.now", return_value=self.token.last_used + period - second, ): self.assertAuthenticationStatus(HTTP_200_OK, plain=plain) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.last_used + 2 * period - 2 * second, ): self.assertAuthenticationStatus(HTTP_200_OK, plain=plain) # No maximum age: can use now and in ten years self.token.max_unused_period = None self.token.save() self.assertAuthenticationStatus(HTTP_200_OK, plain=plain) with mock.patch( "desecapi.models.timezone.now", return_value=timezone.now() + timedelta(days=3650), ): self.assertAuthenticationStatus(HTTP_200_OK, plain=plain) def test_token_max_age_max_unused_period(self): hour = timedelta(hours=1) self.token.max_age = 3 * hour self.token.max_unused_period = hour self.token.save() # max_unused_period wins if tighter than max_age with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 1.25 * hour, ): self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True) # Can use immediately self.assertAuthenticationStatus(HTTP_200_OK) # Can use continuously within max_unused_period with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 0.75 * hour, ): self.assertAuthenticationStatus(HTTP_200_OK) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 1.5 * hour ): self.assertAuthenticationStatus(HTTP_200_OK) # max_unused_period wins again if tighter than max_age with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 2.75 * hour, ): self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True) # Can use continuously within max_unused_period with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 2.25 * hour, ): self.assertAuthenticationStatus(HTTP_200_OK) with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 2.75 * hour, ): self.assertAuthenticationStatus(HTTP_200_OK) # max_age wins again if tighter than max_unused_period with mock.patch( "desecapi.models.timezone.now", return_value=self.token.created + 3.25 * hour, ): self.assertAuthenticationStatus(HTTP_401_UNAUTHORIZED, expired=True)