123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import base64
- import datetime
- from ipaddress import ip_address
- from django.contrib.auth.hashers import PBKDF2PasswordHasher
- from django.utils import timezone
- from rest_framework import exceptions, HTTP_HEADER_ENCODING
- from rest_framework.authentication import (
- BaseAuthentication,
- get_authorization_header,
- TokenAuthentication as RestFrameworkTokenAuthentication,
- BasicAuthentication,
- )
- from desecapi.models import Domain, Token
- from desecapi.serializers import (
- AuthenticatedBasicUserActionSerializer,
- EmailPasswordSerializer,
- )
- class DynAuthenticationMixin:
- def authenticate_credentials(self, username, key):
- user, token = TokenAuthentication().authenticate_credentials(key)
- # Make sure username is not misleading
- try:
- if (
- username in ["", user.email]
- or Domain.objects.filter_qname(username.lower(), owner=user).exists()
- ):
- return user, token
- except ValueError:
- pass
- raise exceptions.AuthenticationFailed
- class TokenAuthentication(RestFrameworkTokenAuthentication):
- model = Token
- # Note: This method's runtime depends on in what way a credential is invalid (expired, wrong client IP).
- # It thus exposes the failure reason when under timing attack.
- def authenticate(self, request):
- try:
- user, token = super().authenticate(
- request
- ) # may raise exceptions.AuthenticationFailed if token is invalid
- except TypeError: # no token given
- return None # unauthenticated
- # REMOTE_ADDR is populated by the environment of the wsgi-request [1], which in turn is set up by nginx as per
- # uwsgi_params [2]. The value of $remote_addr finally is given by the network connection [3].
- # [1]: https://github.com/django/django/blob/stable/3.1.x/django/core/handlers/wsgi.py#L77
- # [2]: https://github.com/desec-io/desec-stack/blob/62820ad/www/conf/sites-available/90-desec.api.location.var#L11
- # [3]: https://nginx.org/en/docs/http/ngx_http_core_module.html#var_remote_addr
- # While the request.META dictionary contains a mixture of values from various sources, HTTP headers have keys
- # with the HTTP_ prefix. Client addresses can therefore not be spoofed through headers.
- # In case the stack is run behind an application proxy, the address will be the proxy's address. Extracting the
- # real client address is currently not supported. For further information on this case, see
- # https://www.django-rest-framework.org/api-guide/throttling/#how-clients-are-identified
- client_ip = ip_address(request.META.get("REMOTE_ADDR"))
- # This can likely be done within Postgres with django-postgres-extensions (client_ip <<= ANY allowed_subnets).
- # However, the django-postgres-extensions package is unmaintained, and the GitHub repo has been archived.
- if not any(client_ip in subnet for subnet in token.allowed_subnets):
- raise exceptions.AuthenticationFailed("Invalid token.")
- return user, token
- def authenticate_credentials(self, key):
- key = Token.make_hash(key)
- try:
- user, token = super().authenticate_credentials(key)
- except TypeError: # no token given
- return None # unauthenticated
- if not token.is_valid:
- raise exceptions.AuthenticationFailed("Invalid token.")
- token.last_used = timezone.now()
- token.save()
- return user, token
- class BasicTokenAuthentication(BaseAuthentication, DynAuthenticationMixin):
- """
- HTTP Basic authentication that uses username and token.
- Clients should authenticate by passing the username and the token as a
- password in the "Authorization" HTTP header, according to the HTTP
- Basic Authentication Scheme
- Authorization: Basic dXNlcm5hbWU6dG9rZW4=
- For username "username" and password "token".
- """
- # A custom token model may be used, but must have the following properties.
- #
- # * key -- The string identifying the token
- # * user -- The user to which the token belongs
- model = Token
- def authenticate(self, request):
- auth = get_authorization_header(request).split()
- if not auth or auth[0].lower() != b"basic":
- return None
- if len(auth) == 1:
- msg = "Invalid basic auth token header. No credentials provided."
- raise exceptions.AuthenticationFailed(msg)
- elif len(auth) > 2:
- msg = "Invalid basic auth token header. Basic authentication string should not contain spaces."
- raise exceptions.AuthenticationFailed(msg)
- try:
- username, key = (
- base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).split(":")
- )
- return self.authenticate_credentials(username, key)
- except Exception:
- raise exceptions.AuthenticationFailed("badauth")
- def authenticate_header(self, request):
- return "Basic"
- class URLParamAuthentication(BaseAuthentication, DynAuthenticationMixin):
- """
- Authentication against username/password as provided in URL parameters.
- """
- model = Token
- def authenticate(self, request):
- """
- Returns `(User, Token)` if a correct username and token have been supplied
- using URL parameters. Otherwise raises `AuthenticationFailed`.
- """
- if "username" not in request.query_params:
- msg = "No username URL parameter provided."
- raise exceptions.AuthenticationFailed(msg)
- if "password" not in request.query_params:
- msg = "No password URL parameter provided."
- raise exceptions.AuthenticationFailed(msg)
- try:
- return self.authenticate_credentials(
- request.query_params["username"], request.query_params["password"]
- )
- except Exception:
- raise exceptions.AuthenticationFailed("badauth")
- class EmailPasswordPayloadAuthentication(BaseAuthentication):
- authenticate_credentials = BasicAuthentication.authenticate_credentials
- def authenticate(self, request):
- serializer = EmailPasswordSerializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- return self.authenticate_credentials(
- serializer.data["email"], serializer.data["password"], request
- )
- class AuthenticatedBasicUserActionAuthentication(BaseAuthentication):
- """
- Authenticates a request based on whether the serializer determines the validity of the given verification code
- based on the view's 'code' kwarg and the view serializer's code validity period.
- """
- def authenticate(self, request):
- view = request.parser_context["view"]
- return self.authenticate_credentials(view.get_serializer_context())
- def authenticate_credentials(self, context):
- serializer = AuthenticatedBasicUserActionSerializer(data={}, context=context)
- serializer.is_valid(raise_exception=True)
- user = serializer.validated_data["user"]
- email_verified = datetime.datetime.fromtimestamp(
- serializer.timestamp, datetime.timezone.utc
- )
- user.email_verified = max(user.email_verified or email_verified, email_verified)
- user.save()
- # When user.is_active is None, activation is pending. We need to admit them to finish activation, so only
- # reject strictly False. There are permissions to make sure that such accounts can't do anything else.
- if user.is_active == False:
- raise exceptions.AuthenticationFailed("User inactive.")
- return user, None
- class TokenHasher(PBKDF2PasswordHasher):
- algorithm = "pbkdf2_sha256_iter1"
- iterations = 1
|