email_utils.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import os
  2. from email.message import EmailMessage, Message
  3. from email.utils import make_msgid, formatdate
  4. from smtplib import SMTP
  5. import dkim
  6. from jinja2 import Environment, FileSystemLoader
  7. from app.config import (
  8. SUPPORT_EMAIL,
  9. ROOT_DIR,
  10. POSTFIX_SERVER,
  11. NOT_SEND_EMAIL,
  12. DKIM_SELECTOR,
  13. DKIM_PRIVATE_KEY,
  14. DKIM_HEADERS,
  15. )
  16. from app.log import LOG
  17. def _render(template_name, **kwargs) -> str:
  18. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  19. env = Environment(loader=FileSystemLoader(templates_dir))
  20. template = env.get_template(template_name)
  21. return template.render(**kwargs)
  22. def send_welcome_email(email, name):
  23. send_email(
  24. email,
  25. f"Welcome to SimpleLogin {name}!",
  26. _render("welcome.txt", name=name),
  27. _render("welcome.html", name=name),
  28. )
  29. def send_activation_email(email, name, activation_link):
  30. send_email(
  31. email,
  32. f"Just one more step to join SimpleLogin {name}",
  33. _render(
  34. "activation.txt", name=name, activation_link=activation_link, email=email
  35. ),
  36. _render(
  37. "activation.html", name=name, activation_link=activation_link, email=email
  38. ),
  39. )
  40. def send_reset_password_email(email, name, reset_password_link):
  41. send_email(
  42. email,
  43. f"Reset your password on SimpleLogin",
  44. _render(
  45. "reset-password.txt", name=name, reset_password_link=reset_password_link
  46. ),
  47. _render(
  48. "reset-password.html", name=name, reset_password_link=reset_password_link
  49. ),
  50. )
  51. def send_change_email(new_email, current_email, name, link):
  52. send_email(
  53. new_email,
  54. f"Confirm email update on SimpleLogin",
  55. _render(
  56. "change-email.txt",
  57. name=name,
  58. link=link,
  59. new_email=new_email,
  60. current_email=current_email,
  61. ),
  62. _render(
  63. "change-email.html",
  64. name=name,
  65. link=link,
  66. new_email=new_email,
  67. current_email=current_email,
  68. ),
  69. )
  70. def send_new_app_email(email, name):
  71. send_email(
  72. email,
  73. f"Any question/feedback for SimpleLogin {name}?",
  74. _render("new-app.txt", name=name),
  75. _render("new-app.html", name=name),
  76. )
  77. def send_test_email_alias(email, name):
  78. send_email(
  79. email,
  80. f"This email is sent to {email}",
  81. _render("test-email.txt", name=name, alias=email),
  82. _render("test-email.html", name=name, alias=email),
  83. )
  84. def send_cannot_create_directory_alias(user, alias, directory):
  85. """when user cancels their subscription, they cannot create alias on the fly.
  86. If this happens, send them an email to notify
  87. """
  88. send_email(
  89. user.email,
  90. f"Alias {alias} cannot be created",
  91. _render(
  92. "cannot-create-alias-directory.txt",
  93. name=user.name,
  94. alias=alias,
  95. directory=directory,
  96. ),
  97. _render(
  98. "cannot-create-alias-directory.txt",
  99. name=user.name,
  100. alias=alias,
  101. directory=directory,
  102. ),
  103. )
  104. def send_cannot_create_domain_alias(user, alias, domain):
  105. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  106. If this happens, send them an email to notify
  107. """
  108. send_email(
  109. user.email,
  110. f"Alias {alias} cannot be created",
  111. _render(
  112. "cannot-create-alias-domain.txt", name=user.name, alias=alias, domain=domain
  113. ),
  114. _render(
  115. "cannot-create-alias-domain.txt", name=user.name, alias=alias, domain=domain
  116. ),
  117. )
  118. def send_email(to_email, subject, plaintext, html):
  119. if NOT_SEND_EMAIL:
  120. LOG.d(
  121. "send email with subject %s to %s, plaintext: %s",
  122. subject,
  123. to_email,
  124. plaintext,
  125. )
  126. return
  127. # host IP, setup via Docker network
  128. smtp = SMTP(POSTFIX_SERVER, 25)
  129. msg = EmailMessage()
  130. msg["Subject"] = subject
  131. msg["From"] = f"Son from SimpleLogin <{SUPPORT_EMAIL}>"
  132. msg["To"] = to_email
  133. msg.set_content(plaintext)
  134. if html is not None:
  135. msg.add_alternative(html, subtype="html")
  136. msg_id_header = make_msgid()
  137. LOG.d("message-id %s", msg_id_header)
  138. msg["Message-ID"] = msg_id_header
  139. date_header = formatdate()
  140. LOG.d("Date header: %s", date_header)
  141. msg["Date"] = date_header
  142. # add DKIM
  143. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  144. add_dkim_signature(msg, email_domain)
  145. msg_raw = msg.as_string().encode()
  146. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  147. def get_email_name(email_from):
  148. """parse email from header and return the name part
  149. First Last <ab@cd.com> -> First Last
  150. ab@cd.com -> ""
  151. """
  152. if "<" in email_from:
  153. return email_from[: email_from.find("<")].strip()
  154. return ""
  155. def get_email_part(email_from):
  156. """parse email from header and return the email part
  157. First Last <ab@cd.com> -> ab@cd.com
  158. ab@cd.com -> ""
  159. """
  160. if "<" in email_from:
  161. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  162. return email_from
  163. def get_email_local_part(email):
  164. """
  165. Get the local part from email
  166. ab@cd.com -> ab
  167. """
  168. return email[: email.find("@")]
  169. def get_email_domain_part(email):
  170. """
  171. Get the domain part from email
  172. ab@cd.com -> cd.com
  173. """
  174. return email[email.find("@") + 1 :]
  175. def add_dkim_signature(msg: Message, email_domain: str):
  176. if msg["DKIM-Signature"]:
  177. LOG.d("Remove DKIM-Signature %s", msg["DKIM-Signature"])
  178. del msg["DKIM-Signature"]
  179. # Specify headers in "byte" form
  180. # Generate message signature
  181. sig = dkim.sign(
  182. msg.as_string().encode(),
  183. DKIM_SELECTOR,
  184. email_domain.encode(),
  185. DKIM_PRIVATE_KEY.encode(),
  186. include_headers=DKIM_HEADERS,
  187. )
  188. sig = sig.decode()
  189. # remove linebreaks from sig
  190. sig = sig.replace("\n", " ").replace("\r", "")
  191. msg.add_header("DKIM-Signature", sig[len("DKIM-Signature: ") :])
  192. def add_or_replace_header(msg: Message, header: str, value: str):
  193. try:
  194. msg.add_header(header, value)
  195. except ValueError:
  196. # the header exists already
  197. msg.replace_header(header, value)
  198. def delete_header(msg: Message, header: str):
  199. """a header can appear several times in message."""
  200. for h in msg._headers:
  201. if h[0].lower() == header.lower():
  202. msg._headers.remove(h)