crypto.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from base64 import urlsafe_b64encode
  2. from cryptography.fernet import Fernet, InvalidToken
  3. from cryptography.hazmat.primitives import hashes
  4. from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode
  5. from cryptography.hazmat.backends import default_backend
  6. from django.conf import settings
  7. from django.utils.encoding import force_bytes
  8. from desecapi import metrics
  9. def _derive_urlsafe_key(*, label, context):
  10. backend = default_backend()
  11. kdf = KBKDFHMAC(algorithm=hashes.SHA256(), mode=Mode.CounterMode, length=32, rlen=4, llen=4,
  12. location=CounterLocation.BeforeFixed, label=label, context=context, fixed=None, backend=backend)
  13. key = kdf.derive(settings.SECRET_KEY.encode())
  14. return urlsafe_b64encode(key)
  15. def retrieve_key(*, label, context):
  16. # Keeping this function separate from key derivation gives us freedom to implement look-ups later, e.g. from cache
  17. label = force_bytes(label, strings_only=True)
  18. context = force_bytes(context, strings_only=True)
  19. return _derive_urlsafe_key(label=label, context=context)
  20. def encrypt(data, *, context):
  21. key = retrieve_key(label=b'crypt', context=context)
  22. value = Fernet(key=key).encrypt(data)
  23. metrics.get('desecapi_key_encryption_success').labels(context).inc()
  24. return value
  25. def decrypt(token, *, context, ttl=None):
  26. key = retrieve_key(label=b'crypt', context=context)
  27. f = Fernet(key=key)
  28. try:
  29. ret = f.extract_timestamp(token), f.decrypt(token, ttl=ttl)
  30. metrics.get('desecapi_key_decryption_success').labels(context).inc()
  31. return ret
  32. except InvalidToken:
  33. raise ValueError