pgp_utils.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import os
  2. from io import BytesIO
  3. import gnupg
  4. from memory_profiler import memory_usage
  5. from app.config import GNUPGHOME
  6. from app.log import LOG
  7. from app.models import Mailbox, Contact
  8. gpg = gnupg.GPG(gnupghome=GNUPGHOME)
  9. gpg.encoding = "utf-8"
  10. class PGPException(Exception):
  11. pass
  12. def load_public_key(public_key: str) -> str:
  13. """Load a public key into keyring and return the fingerprint. If error, raise Exception"""
  14. import_result = gpg.import_keys(public_key)
  15. try:
  16. return import_result.fingerprints[0]
  17. except Exception as e:
  18. raise PGPException("Cannot load key") from e
  19. def hard_exit():
  20. pid = os.getpid()
  21. LOG.warning("kill pid %s", pid)
  22. os.kill(pid, 9)
  23. def encrypt_file(data: BytesIO, fingerprint: str) -> str:
  24. LOG.d("encrypt for %s", fingerprint)
  25. mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
  26. LOG.d("mem_usage %s", mem_usage)
  27. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  28. if not r.ok:
  29. # maybe the fingerprint is not loaded on this host, try to load it
  30. found = False
  31. # searching for the key in mailbox
  32. mailbox = Mailbox.get_by(pgp_finger_print=fingerprint)
  33. if mailbox:
  34. LOG.d("(re-)load public key for %s", mailbox)
  35. load_public_key(mailbox.pgp_public_key)
  36. found = True
  37. # searching for the key in contact
  38. contact = Contact.get_by(pgp_finger_print=fingerprint)
  39. if contact:
  40. LOG.d("(re-)load public key for %s", contact)
  41. load_public_key(contact.pgp_public_key)
  42. found = True
  43. if found:
  44. LOG.d("retry to encrypt")
  45. data.seek(0)
  46. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  47. if not r.ok:
  48. # save the data for debugging
  49. data.seek(0)
  50. file_path = f"/tmp/{random_string(10)}.eml"
  51. with open(file_path, "wb") as f:
  52. f.write(data.getbuffer())
  53. raise PGPException(f"Cannot encrypt, status: {r.status}, {file_path}")
  54. return str(r)