123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import os
- from io import BytesIO
- from typing import Union
- import gnupg
- import pgpy
- from memory_profiler import memory_usage
- from pgpy import PGPMessage
- from app.config import GNUPGHOME, PGP_SENDER_PRIVATE_KEY
- from app.log import LOG
- from app.models import Mailbox, Contact
- gpg = gnupg.GPG(gnupghome=GNUPGHOME)
- gpg.encoding = "utf-8"
- class PGPException(Exception):
- pass
- def load_public_key(public_key: str) -> str:
- """Load a public key into keyring and return the fingerprint. If error, raise Exception"""
- import_result = gpg.import_keys(public_key)
- try:
- return import_result.fingerprints[0]
- except Exception as e:
- raise PGPException("Cannot load key") from e
- def load_public_key_and_check(public_key: str) -> str:
- """Same as load_public_key but will try an encryption using the new key.
- If the encryption fails, remove the newly created fingerprint.
- Return the fingerprint
- """
- import_result = gpg.import_keys(public_key)
- try:
- fingerprint = import_result.fingerprints[0]
- except Exception as e:
- raise PGPException("Cannot load key") from e
- else:
- dummy_data = BytesIO(b"test")
- r = gpg.encrypt_file(dummy_data, fingerprint)
- if not r.ok:
- # remove the fingerprint
- gpg.delete_keys([fingerprint])
- raise PGPException("Encryption fails with the key")
- return fingerprint
- def hard_exit():
- pid = os.getpid()
- LOG.warning("kill pid %s", pid)
- os.kill(pid, 9)
- def encrypt_file(data: BytesIO, fingerprint: str) -> str:
- LOG.d("encrypt for %s", fingerprint)
- mem_usage = memory_usage(-1, interval=1, timeout=1)[0]
- LOG.d("mem_usage %s", mem_usage)
- r = gpg.encrypt_file(data, fingerprint, always_trust=True)
- if not r.ok:
- # maybe the fingerprint is not loaded on this host, try to load it
- found = False
- # searching for the key in mailbox
- mailbox = Mailbox.get_by(pgp_finger_print=fingerprint)
- if mailbox:
- LOG.d("(re-)load public key for %s", mailbox)
- load_public_key(mailbox.pgp_public_key)
- found = True
- # searching for the key in contact
- contact = Contact.get_by(pgp_finger_print=fingerprint)
- if contact:
- LOG.d("(re-)load public key for %s", contact)
- load_public_key(contact.pgp_public_key)
- found = True
- if found:
- LOG.d("retry to encrypt")
- data.seek(0)
- r = gpg.encrypt_file(data, fingerprint, always_trust=True)
- if not r.ok:
- raise PGPException(f"Cannot encrypt, status: {r.status}")
- return str(r)
- def encrypt_file_with_pgpy(data: bytes, public_key: str) -> PGPMessage:
- key = pgpy.PGPKey()
- key.parse(public_key)
- msg = pgpy.PGPMessage.new(data, encoding="utf-8")
- r = key.encrypt(msg)
- return r
- if PGP_SENDER_PRIVATE_KEY:
- _SIGN_KEY_ID = gpg.import_keys(PGP_SENDER_PRIVATE_KEY).fingerprints[0]
- def sign_data(data: Union[str, bytes]) -> str:
- signature = str(gpg.sign(data, keyid=_SIGN_KEY_ID, detach=True))
- return signature
- def sign_data_with_pgpy(data: Union[str, bytes]) -> str:
- key = pgpy.PGPKey()
- key.parse(PGP_SENDER_PRIVATE_KEY)
- signature = str(key.sign(data))
- return signature
|