email_utils.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import os
  2. from email.message import EmailMessage, Message
  3. from email.utils import make_msgid, formatdate
  4. from smtplib import SMTP
  5. import dkim
  6. from jinja2 import Environment, FileSystemLoader
  7. from app.config import (
  8. SUPPORT_EMAIL,
  9. ROOT_DIR,
  10. POSTFIX_SERVER,
  11. NOT_SEND_EMAIL,
  12. DKIM_SELECTOR,
  13. DKIM_PRIVATE_KEY,
  14. DKIM_HEADERS,
  15. ALIAS_DOMAINS,
  16. )
  17. from app.log import LOG
  18. def _render(template_name, **kwargs) -> str:
  19. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  20. env = Environment(loader=FileSystemLoader(templates_dir))
  21. template = env.get_template(template_name)
  22. return template.render(**kwargs)
  23. def send_welcome_email(email, name):
  24. send_email(
  25. email,
  26. f"Welcome to SimpleLogin {name}!",
  27. _render("welcome.txt", name=name),
  28. _render("welcome.html", name=name),
  29. )
  30. def send_activation_email(email, name, activation_link):
  31. send_email(
  32. email,
  33. f"Just one more step to join SimpleLogin {name}",
  34. _render(
  35. "activation.txt", name=name, activation_link=activation_link, email=email
  36. ),
  37. _render(
  38. "activation.html", name=name, activation_link=activation_link, email=email
  39. ),
  40. )
  41. def send_reset_password_email(email, name, reset_password_link):
  42. send_email(
  43. email,
  44. f"Reset your password on SimpleLogin",
  45. _render(
  46. "reset-password.txt", name=name, reset_password_link=reset_password_link
  47. ),
  48. _render(
  49. "reset-password.html", name=name, reset_password_link=reset_password_link
  50. ),
  51. )
  52. def send_change_email(new_email, current_email, name, link):
  53. send_email(
  54. new_email,
  55. f"Confirm email update on SimpleLogin",
  56. _render(
  57. "change-email.txt",
  58. name=name,
  59. link=link,
  60. new_email=new_email,
  61. current_email=current_email,
  62. ),
  63. _render(
  64. "change-email.html",
  65. name=name,
  66. link=link,
  67. new_email=new_email,
  68. current_email=current_email,
  69. ),
  70. )
  71. def send_new_app_email(email, name):
  72. send_email(
  73. email,
  74. f"Any question/feedback for SimpleLogin {name}?",
  75. _render("new-app.txt", name=name),
  76. _render("new-app.html", name=name),
  77. )
  78. def send_test_email_alias(email, name):
  79. send_email(
  80. email,
  81. f"This email is sent to {email}",
  82. _render("test-email.txt", name=name, alias=email),
  83. _render("test-email.html", name=name, alias=email),
  84. )
  85. def send_cannot_create_directory_alias(user, alias, directory):
  86. """when user cancels their subscription, they cannot create alias on the fly.
  87. If this happens, send them an email to notify
  88. """
  89. send_email(
  90. user.email,
  91. f"Alias {alias} cannot be created",
  92. _render(
  93. "cannot-create-alias-directory.txt",
  94. name=user.name,
  95. alias=alias,
  96. directory=directory,
  97. ),
  98. _render(
  99. "cannot-create-alias-directory.html",
  100. name=user.name,
  101. alias=alias,
  102. directory=directory,
  103. ),
  104. )
  105. def send_cannot_create_domain_alias(user, alias, domain):
  106. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  107. If this happens, send them an email to notify
  108. """
  109. send_email(
  110. user.email,
  111. f"Alias {alias} cannot be created",
  112. _render(
  113. "cannot-create-alias-domain.txt", name=user.name, alias=alias, domain=domain
  114. ),
  115. _render(
  116. "cannot-create-alias-domain.html",
  117. name=user.name,
  118. alias=alias,
  119. domain=domain,
  120. ),
  121. )
  122. def send_reply_alias_must_use_personal_email(user, alias, sender):
  123. """
  124. The reply_email can be used only by user personal email.
  125. Notify user if it's used by someone else
  126. """
  127. send_email(
  128. user.email,
  129. f"Reply from your alias {alias} only works with your personal email",
  130. _render(
  131. "reply-must-use-personal-email.txt",
  132. name=user.name,
  133. alias=alias,
  134. sender=sender,
  135. user_email=user.email,
  136. ),
  137. _render(
  138. "reply-must-use-personal-email.html",
  139. name=user.name,
  140. alias=alias,
  141. sender=sender,
  142. user_email=user.email,
  143. ),
  144. )
  145. def send_email(to_email, subject, plaintext, html):
  146. if NOT_SEND_EMAIL:
  147. LOG.d(
  148. "send email with subject %s to %s, plaintext: %s",
  149. subject,
  150. to_email,
  151. plaintext,
  152. )
  153. return
  154. # host IP, setup via Docker network
  155. smtp = SMTP(POSTFIX_SERVER, 25)
  156. msg = EmailMessage()
  157. msg["Subject"] = subject
  158. msg["From"] = f"Son from SimpleLogin <{SUPPORT_EMAIL}>"
  159. msg["To"] = to_email
  160. msg.set_content(plaintext)
  161. if html is not None:
  162. msg.add_alternative(html, subtype="html")
  163. msg_id_header = make_msgid()
  164. LOG.d("message-id %s", msg_id_header)
  165. msg["Message-ID"] = msg_id_header
  166. date_header = formatdate()
  167. LOG.d("Date header: %s", date_header)
  168. msg["Date"] = date_header
  169. # add DKIM
  170. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  171. add_dkim_signature(msg, email_domain)
  172. msg_raw = msg.as_string().encode()
  173. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  174. def get_email_name(email_from):
  175. """parse email from header and return the name part
  176. First Last <ab@cd.com> -> First Last
  177. ab@cd.com -> ""
  178. """
  179. if "<" in email_from:
  180. return email_from[: email_from.find("<")].strip()
  181. return ""
  182. def get_email_part(email_from):
  183. """parse email from header and return the email part
  184. First Last <ab@cd.com> -> ab@cd.com
  185. ab@cd.com -> ""
  186. """
  187. if "<" in email_from:
  188. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  189. return email_from
  190. def get_email_local_part(email):
  191. """
  192. Get the local part from email
  193. ab@cd.com -> ab
  194. """
  195. return email[: email.find("@")]
  196. def get_email_domain_part(email):
  197. """
  198. Get the domain part from email
  199. ab@cd.com -> cd.com
  200. """
  201. return email[email.find("@") + 1 :]
  202. def add_dkim_signature(msg: Message, email_domain: str):
  203. if msg["DKIM-Signature"]:
  204. LOG.d("Remove DKIM-Signature %s", msg["DKIM-Signature"])
  205. del msg["DKIM-Signature"]
  206. # Specify headers in "byte" form
  207. # Generate message signature
  208. sig = dkim.sign(
  209. msg.as_string().encode(),
  210. DKIM_SELECTOR,
  211. email_domain.encode(),
  212. DKIM_PRIVATE_KEY.encode(),
  213. include_headers=DKIM_HEADERS,
  214. )
  215. sig = sig.decode()
  216. # remove linebreaks from sig
  217. sig = sig.replace("\n", " ").replace("\r", "")
  218. msg.add_header("DKIM-Signature", sig[len("DKIM-Signature: ") :])
  219. def add_or_replace_header(msg: Message, header: str, value: str):
  220. try:
  221. msg.add_header(header, value)
  222. except ValueError:
  223. # the header exists already
  224. msg.replace_header(header, value)
  225. def delete_header(msg: Message, header: str):
  226. """a header can appear several times in message."""
  227. for h in msg._headers:
  228. if h[0].lower() == header.lower():
  229. msg._headers.remove(h)
  230. def email_belongs_to_alias_domains(email: str) -> bool:
  231. """return True if an emails ends with one of the alias domains provided by SimpleLogin"""
  232. for domain in ALIAS_DOMAINS:
  233. if email.endswith("@" + domain):
  234. return True
  235. return False