pgp_utils.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import os
  2. from io import BytesIO
  3. import gnupg
  4. from memory_profiler import memory_usage
  5. from app.config import GNUPGHOME
  6. from app.log import LOG
  7. from app.models import Mailbox, Contact
  8. from app.utils import random_string
  9. gpg = gnupg.GPG(gnupghome=GNUPGHOME)
  10. gpg.encoding = "utf-8"
  11. class PGPException(Exception):
  12. pass
  13. def load_public_key(public_key: str) -> str:
  14. """Load a public key into keyring and return the fingerprint. If error, raise Exception"""
  15. import_result = gpg.import_keys(public_key)
  16. try:
  17. return import_result.fingerprints[0]
  18. except Exception as e:
  19. raise PGPException("Cannot load key") from e
  20. def hard_exit():
  21. pid = os.getpid()
  22. LOG.warning("kill pid %s", pid)
  23. os.kill(pid, 9)
  24. def encrypt_file(data: BytesIO, fingerprint: str) -> str:
  25. LOG.d("encrypt for %s", fingerprint)
  26. mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
  27. LOG.d("mem_usage %s", mem_usage)
  28. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  29. if not r.ok:
  30. # maybe the fingerprint is not loaded on this host, try to load it
  31. found = False
  32. # searching for the key in mailbox
  33. mailbox = Mailbox.get_by(pgp_finger_print=fingerprint)
  34. if mailbox:
  35. LOG.d("(re-)load public key for %s", mailbox)
  36. load_public_key(mailbox.pgp_public_key)
  37. found = True
  38. # searching for the key in contact
  39. contact = Contact.get_by(pgp_finger_print=fingerprint)
  40. if contact:
  41. LOG.d("(re-)load public key for %s", contact)
  42. load_public_key(contact.pgp_public_key)
  43. found = True
  44. if found:
  45. LOG.d("retry to encrypt")
  46. data.seek(0)
  47. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  48. if not r.ok:
  49. raise PGPException(f"Cannot encrypt, status: {r.status}")
  50. return str(r)