pgp_utils.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import os
  2. from io import BytesIO
  3. from typing import Union
  4. import gnupg
  5. import pgpy
  6. from memory_profiler import memory_usage
  7. from pgpy import PGPMessage
  8. from app.config import GNUPGHOME, PGP_SENDER_PRIVATE_KEY
  9. from app.log import LOG
  10. from app.models import Mailbox, Contact
  11. gpg = gnupg.GPG(gnupghome=GNUPGHOME)
  12. gpg.encoding = "utf-8"
  13. class PGPException(Exception):
  14. pass
  15. def load_public_key(public_key: str) -> str:
  16. """Load a public key into keyring and return the fingerprint. If error, raise Exception"""
  17. import_result = gpg.import_keys(public_key)
  18. try:
  19. return import_result.fingerprints[0]
  20. except Exception as e:
  21. raise PGPException("Cannot load key") from e
  22. def load_public_key_and_check(public_key: str) -> str:
  23. """Same as load_public_key but will try an encryption using the new key.
  24. If the encryption fails, remove the newly created fingerprint.
  25. Return the fingerprint
  26. """
  27. import_result = gpg.import_keys(public_key)
  28. try:
  29. fingerprint = import_result.fingerprints[0]
  30. except Exception as e:
  31. raise PGPException("Cannot load key") from e
  32. else:
  33. dummy_data = BytesIO(b"test")
  34. r = gpg.encrypt_file(dummy_data, fingerprint)
  35. if not r.ok:
  36. # remove the fingerprint
  37. gpg.delete_keys([fingerprint])
  38. raise PGPException("Encryption fails with the key")
  39. return fingerprint
  40. def hard_exit():
  41. pid = os.getpid()
  42. LOG.warning("kill pid %s", pid)
  43. os.kill(pid, 9)
  44. def encrypt_file(data: BytesIO, fingerprint: str) -> str:
  45. LOG.d("encrypt for %s", fingerprint)
  46. mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
  47. LOG.d("mem_usage %s", mem_usage)
  48. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  49. if not r.ok:
  50. # maybe the fingerprint is not loaded on this host, try to load it
  51. found = False
  52. # searching for the key in mailbox
  53. mailbox = Mailbox.get_by(pgp_finger_print=fingerprint)
  54. if mailbox:
  55. LOG.d("(re-)load public key for %s", mailbox)
  56. load_public_key(mailbox.pgp_public_key)
  57. found = True
  58. # searching for the key in contact
  59. contact = Contact.get_by(pgp_finger_print=fingerprint)
  60. if contact:
  61. LOG.d("(re-)load public key for %s", contact)
  62. load_public_key(contact.pgp_public_key)
  63. found = True
  64. if found:
  65. LOG.d("retry to encrypt")
  66. data.seek(0)
  67. r = gpg.encrypt_file(data, fingerprint, always_trust=True)
  68. if not r.ok:
  69. raise PGPException(f"Cannot encrypt, status: {r.status}")
  70. return str(r)
  71. def encrypt_file_with_pgpy(data: bytes, public_key: str) -> PGPMessage:
  72. key = pgpy.PGPKey()
  73. key.parse(public_key)
  74. msg = pgpy.PGPMessage.new(data, encoding="utf-8")
  75. r = key.encrypt(msg)
  76. return r
  77. if PGP_SENDER_PRIVATE_KEY:
  78. _SIGN_KEY_ID = gpg.import_keys(PGP_SENDER_PRIVATE_KEY).fingerprints[0]
  79. def sign_data(data: Union[str, bytes]) -> str:
  80. signature = str(gpg.sign(data, keyid=_SIGN_KEY_ID, detach=True))
  81. return signature
  82. def sign_data_with_pgpy(data: Union[str, bytes]) -> str:
  83. key = pgpy.PGPKey()
  84. key.parse(PGP_SENDER_PRIVATE_KEY)
  85. signature = str(key.sign(data))
  86. return signature