pgp_utils.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import os
  2. from io import BytesIO
  3. import gnupg
  4. from memory_profiler import memory_usage
  5. from app.config import GNUPGHOME
  6. from app.log import LOG
  7. from app.utils import random_string
  8. gpg = gnupg.GPG(gnupghome=GNUPGHOME)
  9. gpg.encoding = "utf-8"
  10. class PGPException(Exception):
  11. pass
  12. def load_public_key(public_key: str) -> str:
  13. """Load a public key into keyring and return the fingerprint. If error, raise Exception"""
  14. import_result = gpg.import_keys(public_key)
  15. try:
  16. return import_result.fingerprints[0]
  17. except Exception as e:
  18. raise PGPException("Cannot load key") from e
  19. def hard_exit():
  20. pid = os.getpid()
  21. LOG.warning("kill pid %s", pid)
  22. os.kill(pid, 9)
  23. def encrypt_file(data: BytesIO, fingerprint: str) -> str:
  24. LOG.d("encrypt for %s", fingerprint)
  25. mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
  26. LOG.d("mem_usage %s", mem_usage)
  27. # todo
  28. if mem_usage > 300:
  29. LOG.error("Force exit")
  30. hard_exit()
  31. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  32. if not r.ok:
  33. LOG.error("Try encrypt again %s", fingerprint)
  34. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  35. if not r.ok:
  36. # save the content for debugging
  37. random_file_name = random_string(20) + ".eml"
  38. full_path = f"/tmp/{random_file_name}"
  39. with open(full_path, "wb") as f:
  40. f.write(data.getbuffer())
  41. LOG.error("PGP fail - log to %s", full_path)
  42. raise PGPException("Cannot encrypt")
  43. return str(r)