email_utils.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import os
  2. from email.message import EmailMessage
  3. from email.utils import make_msgid, formatdate
  4. from smtplib import SMTP
  5. import dkim
  6. from jinja2 import Environment, FileSystemLoader
  7. from app.config import (
  8. SUPPORT_EMAIL,
  9. ROOT_DIR,
  10. POSTFIX_SERVER,
  11. NOT_SEND_EMAIL,
  12. DKIM_SELECTOR,
  13. DKIM_PRIVATE_KEY,
  14. DKIM_HEADERS,
  15. )
  16. from app.log import LOG
  17. def _render(template_name, **kwargs) -> str:
  18. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  19. env = Environment(loader=FileSystemLoader(templates_dir))
  20. template = env.get_template(template_name)
  21. return template.render(**kwargs)
  22. def send_welcome_email(email, name):
  23. send_email(
  24. email,
  25. f"{name}, welcome to SimpleLogin!",
  26. _render("welcome.txt", name=name),
  27. _render("welcome.html", name=name),
  28. )
  29. def send_activation_email(email, name, activation_link):
  30. send_email(
  31. email,
  32. f"{name}, just one more step to join SimpleLogin",
  33. _render(
  34. "activation.txt", name=name, activation_link=activation_link, email=email
  35. ),
  36. _render(
  37. "activation.html", name=name, activation_link=activation_link, email=email
  38. ),
  39. )
  40. def send_reset_password_email(email, name, reset_password_link):
  41. send_email(
  42. email,
  43. f"{name}, reset your password on SimpleLogin",
  44. _render(
  45. "reset-password.txt", name=name, reset_password_link=reset_password_link
  46. ),
  47. _render(
  48. "reset-password.html", name=name, reset_password_link=reset_password_link
  49. ),
  50. )
  51. def send_change_email(new_email, current_email, name, link):
  52. send_email(
  53. new_email,
  54. f"{name}, confirm email update on SimpleLogin",
  55. _render(
  56. "change-email.txt",
  57. name=name,
  58. link=link,
  59. new_email=new_email,
  60. current_email=current_email,
  61. ),
  62. _render(
  63. "change-email.html",
  64. name=name,
  65. link=link,
  66. new_email=new_email,
  67. current_email=current_email,
  68. ),
  69. )
  70. def send_new_app_email(email, name):
  71. send_email(
  72. email,
  73. f"{name}, any question/feedback for SimpleLogin?",
  74. _render("new-app.txt", name=name),
  75. _render("new-app.html", name=name),
  76. )
  77. def send_test_email_alias(email, name):
  78. send_email(
  79. email,
  80. f"{name}, this email is sent to {email}",
  81. _render("test-email.txt", name=name, alias=email),
  82. _render("test-email.html", name=name, alias=email),
  83. )
  84. def send_email(to_email, subject, plaintext, html):
  85. if NOT_SEND_EMAIL:
  86. LOG.d(
  87. "send email with subject %s to %s, plaintext: %s",
  88. subject,
  89. to_email,
  90. plaintext,
  91. )
  92. return
  93. # host IP, setup via Docker network
  94. smtp = SMTP(POSTFIX_SERVER, 25)
  95. msg = EmailMessage()
  96. msg["Subject"] = subject
  97. msg["From"] = f"Son from SimpleLogin <{SUPPORT_EMAIL}>"
  98. msg["To"] = to_email
  99. msg.set_content(plaintext)
  100. if html is not None:
  101. msg.add_alternative(html, subtype="html")
  102. msg_id_header = make_msgid()
  103. LOG.d("message-id %s", msg_id_header)
  104. msg["Message-ID"] = msg_id_header
  105. date_header = formatdate()
  106. LOG.d("Date header: %s", date_header)
  107. msg["Date"] = date_header
  108. # add DKIM
  109. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  110. add_dkim_signature(msg, email_domain)
  111. msg_raw = msg.as_string().encode()
  112. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  113. def get_email_name(email_from):
  114. """parse email from header and return the name part
  115. First Last <ab@cd.com> -> First Last
  116. ab@cd.com -> ""
  117. """
  118. if "<" in email_from:
  119. return email_from[: email_from.find("<")].strip()
  120. return ""
  121. def get_email_part(email_from):
  122. """parse email from header and return the email part
  123. First Last <ab@cd.com> -> ab@cd.com
  124. ab@cd.com -> ""
  125. """
  126. if "<" in email_from:
  127. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  128. return email_from
  129. def add_dkim_signature(msg: EmailMessage, email_domain: str):
  130. if msg["DKIM-Signature"]:
  131. LOG.d("Remove DKIM-Signature %s", msg["DKIM-Signature"])
  132. del msg["DKIM-Signature"]
  133. # Specify headers in "byte" form
  134. # Generate message signature
  135. sig = dkim.sign(
  136. msg.as_string().encode(),
  137. DKIM_SELECTOR,
  138. email_domain.encode(),
  139. DKIM_PRIVATE_KEY.encode(),
  140. include_headers=DKIM_HEADERS,
  141. )
  142. sig = sig.decode()
  143. # remove linebreaks from sig
  144. sig = sig.replace("\n", " ").replace("\r", "")
  145. msg.add_header("DKIM-Signature", sig[len("DKIM-Signature: ") :])