pki_utils.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import datetime
  2. import uuid
  3. from collections import OrderedDict
  4. from cryptography import x509
  5. from cryptography.hazmat.backends import default_backend
  6. from cryptography.hazmat.primitives import asymmetric, hashes, serialization
  7. from cryptography.x509.oid import NameOID
  8. class PKI:
  9. key = None
  10. crt = None
  11. def __init__(self, ca_crt_pem, ca_key_pem, ca_key_password=None, key=None):
  12. self.ca_crt = x509.load_pem_x509_certificate(
  13. ca_crt_pem,
  14. default_backend(),
  15. )
  16. self.ca_pkey = serialization.load_pem_private_key(
  17. ca_key_pem,
  18. password=ca_key_password,
  19. backend=default_backend(),
  20. )
  21. def initialize_key(self, key=None):
  22. self.key = key or self._generate_key()
  23. @staticmethod
  24. def _generate_key():
  25. return asymmetric.rsa.generate_private_key(
  26. public_exponent=65537,
  27. key_size=4096,
  28. backend=default_backend(),
  29. )
  30. @property
  31. def key_pem(self):
  32. assert self.key is not None, 'You must call `initialize_key()` before accessing the key.'
  33. return self.key.private_bytes(
  34. encoding=serialization.Encoding.PEM,
  35. format=serialization.PrivateFormat.TraditionalOpenSSL,
  36. encryption_algorithm=serialization.NoEncryption(),
  37. )
  38. @property
  39. def crt_pem(self):
  40. assert self.crt is not None, 'You must call `create_certificate()` before accessing the certificate.'
  41. return self.crt.public_bytes(encoding=serialization.Encoding.PEM)
  42. @property
  43. def subject_attributes(self):
  44. assert self.crt is not None, 'You must call `create_certificate()` before accessing the certificate.'
  45. oids = OrderedDict()
  46. oids[NameOID.COMMON_NAME] = 'CN'
  47. oids[NameOID.X500_UNIQUE_IDENTIFIER] = 'x500UniqueIdentifier'
  48. attrs = OrderedDict()
  49. for oid, label in oids.items():
  50. assert label not in attrs
  51. attrs[label] = [attribute.value for attribute in self.crt.subject.get_attributes_for_oid(oid)]
  52. return attrs
  53. def _generate_csr(self, common_name):
  54. assert self.key is not None, 'You must call `initialize_key()` before requesting a certificate.'
  55. # Copy attributes from CA certificate, except for CN
  56. attributes = [
  57. x509.NameAttribute(NameOID.COMMON_NAME, common_name),
  58. x509.NameAttribute(NameOID.X500_UNIQUE_IDENTIFIER, str(uuid.uuid4())),
  59. ]
  60. # Initialize and set attributes
  61. csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(attributes))
  62. # Sign and return
  63. return csr.sign(self.key, hashes.SHA256(), default_backend())
  64. def create_certificate(self, common_name, days):
  65. csr = self._generate_csr(common_name)
  66. self.crt = x509.CertificateBuilder().subject_name(
  67. csr.subject
  68. ).issuer_name(
  69. self.ca_crt.subject
  70. ).public_key(
  71. csr.public_key()
  72. ).serial_number(
  73. x509.random_serial_number()
  74. ).not_valid_before(
  75. datetime.datetime.utcnow()
  76. ).not_valid_after(
  77. datetime.datetime.utcnow() + datetime.timedelta(days=days)
  78. ).sign(
  79. private_key=self.ca_pkey,
  80. algorithm=hashes.SHA256(),
  81. backend=default_backend()
  82. )