email_utils.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. import os
  2. from email.message import EmailMessage, Message
  3. from email.utils import make_msgid, formatdate
  4. from smtplib import SMTP
  5. import dkim
  6. from jinja2 import Environment, FileSystemLoader
  7. from app.config import (
  8. SUPPORT_EMAIL,
  9. ROOT_DIR,
  10. POSTFIX_SERVER,
  11. NOT_SEND_EMAIL,
  12. DKIM_SELECTOR,
  13. DKIM_PRIVATE_KEY,
  14. DKIM_HEADERS,
  15. ALIAS_DOMAINS,
  16. SUPPORT_NAME,
  17. )
  18. from app.log import LOG
  19. def _render(template_name, **kwargs) -> str:
  20. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  21. env = Environment(loader=FileSystemLoader(templates_dir))
  22. template = env.get_template(template_name)
  23. return template.render(**kwargs)
  24. def send_welcome_email(user):
  25. send_email(
  26. user.email,
  27. f"Welcome to SimpleLogin {user.name}",
  28. _render("welcome.txt", name=user.name, user=user),
  29. _render("welcome.html", name=user.name, user=user),
  30. )
  31. def send_trial_end_soon_email(user):
  32. send_email(
  33. user.email,
  34. f"Your trial will end soon {user.name}",
  35. _render("trial-end.txt", name=user.name, user=user),
  36. _render("trial-end.html", name=user.name, user=user),
  37. )
  38. def send_activation_email(email, name, activation_link):
  39. send_email(
  40. email,
  41. f"Just one more step to join SimpleLogin {name}",
  42. _render(
  43. "activation.txt", name=name, activation_link=activation_link, email=email
  44. ),
  45. _render(
  46. "activation.html", name=name, activation_link=activation_link, email=email
  47. ),
  48. )
  49. def send_reset_password_email(email, name, reset_password_link):
  50. send_email(
  51. email,
  52. f"Reset your password on SimpleLogin",
  53. _render(
  54. "reset-password.txt", name=name, reset_password_link=reset_password_link
  55. ),
  56. _render(
  57. "reset-password.html", name=name, reset_password_link=reset_password_link
  58. ),
  59. )
  60. def send_change_email(new_email, current_email, name, link):
  61. send_email(
  62. new_email,
  63. f"Confirm email update on SimpleLogin",
  64. _render(
  65. "change-email.txt",
  66. name=name,
  67. link=link,
  68. new_email=new_email,
  69. current_email=current_email,
  70. ),
  71. _render(
  72. "change-email.html",
  73. name=name,
  74. link=link,
  75. new_email=new_email,
  76. current_email=current_email,
  77. ),
  78. )
  79. def send_new_app_email(email, name):
  80. send_email(
  81. email,
  82. f"Any question/feedback for SimpleLogin {name}?",
  83. _render("new-app.txt", name=name),
  84. _render("new-app.html", name=name),
  85. )
  86. def send_test_email_alias(email, name):
  87. send_email(
  88. email,
  89. f"This email is sent to {email}",
  90. _render("test-email.txt", name=name, alias=email),
  91. _render("test-email.html", name=name, alias=email),
  92. )
  93. def send_cannot_create_directory_alias(user, alias, directory):
  94. """when user cancels their subscription, they cannot create alias on the fly.
  95. If this happens, send them an email to notify
  96. """
  97. send_email(
  98. user.email,
  99. f"Alias {alias} cannot be created",
  100. _render(
  101. "cannot-create-alias-directory.txt",
  102. name=user.name,
  103. alias=alias,
  104. directory=directory,
  105. ),
  106. _render(
  107. "cannot-create-alias-directory.html",
  108. name=user.name,
  109. alias=alias,
  110. directory=directory,
  111. ),
  112. )
  113. def send_cannot_create_domain_alias(user, alias, domain):
  114. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  115. If this happens, send them an email to notify
  116. """
  117. send_email(
  118. user.email,
  119. f"Alias {alias} cannot be created",
  120. _render(
  121. "cannot-create-alias-domain.txt", name=user.name, alias=alias, domain=domain
  122. ),
  123. _render(
  124. "cannot-create-alias-domain.html",
  125. name=user.name,
  126. alias=alias,
  127. domain=domain,
  128. ),
  129. )
  130. def send_reply_alias_must_use_personal_email(user, alias, sender):
  131. """
  132. The reply_email can be used only by user personal email.
  133. Notify user if it's used by someone else
  134. """
  135. send_email(
  136. user.email,
  137. f"Reply from your alias {alias} only works with your personal email",
  138. _render(
  139. "reply-must-use-personal-email.txt",
  140. name=user.name,
  141. alias=alias,
  142. sender=sender,
  143. user_email=user.email,
  144. ),
  145. _render(
  146. "reply-must-use-personal-email.html",
  147. name=user.name,
  148. alias=alias,
  149. sender=sender,
  150. user_email=user.email,
  151. ),
  152. )
  153. def send_email(to_email, subject, plaintext, html):
  154. if NOT_SEND_EMAIL:
  155. LOG.d(
  156. "send email with subject %s to %s, plaintext: %s",
  157. subject,
  158. to_email,
  159. plaintext,
  160. )
  161. return
  162. # host IP, setup via Docker network
  163. smtp = SMTP(POSTFIX_SERVER, 25)
  164. msg = EmailMessage()
  165. msg["Subject"] = subject
  166. msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
  167. msg["To"] = to_email
  168. msg.set_content(plaintext)
  169. if html is not None:
  170. msg.add_alternative(html, subtype="html")
  171. msg_id_header = make_msgid()
  172. LOG.d("message-id %s", msg_id_header)
  173. msg["Message-ID"] = msg_id_header
  174. date_header = formatdate()
  175. LOG.d("Date header: %s", date_header)
  176. msg["Date"] = date_header
  177. # add DKIM
  178. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  179. add_dkim_signature(msg, email_domain)
  180. msg_raw = msg.as_string().encode()
  181. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  182. def get_email_name(email_from):
  183. """parse email from header and return the name part
  184. First Last <ab@cd.com> -> First Last
  185. ab@cd.com -> ""
  186. """
  187. if "<" in email_from:
  188. return email_from[: email_from.find("<")].strip()
  189. return ""
  190. def get_email_part(email_from):
  191. """parse email from header and return the email part
  192. First Last <ab@cd.com> -> ab@cd.com
  193. ab@cd.com -> ""
  194. """
  195. if "<" in email_from:
  196. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  197. return email_from
  198. def get_email_local_part(email):
  199. """
  200. Get the local part from email
  201. ab@cd.com -> ab
  202. """
  203. return email[: email.find("@")]
  204. def get_email_domain_part(email):
  205. """
  206. Get the domain part from email
  207. ab@cd.com -> cd.com
  208. """
  209. return email[email.find("@") + 1 :]
  210. def add_dkim_signature(msg: Message, email_domain: str):
  211. if msg["DKIM-Signature"]:
  212. LOG.d("Remove DKIM-Signature %s", msg["DKIM-Signature"])
  213. del msg["DKIM-Signature"]
  214. # Specify headers in "byte" form
  215. # Generate message signature
  216. sig = dkim.sign(
  217. msg.as_string().encode(),
  218. DKIM_SELECTOR,
  219. email_domain.encode(),
  220. DKIM_PRIVATE_KEY.encode(),
  221. include_headers=DKIM_HEADERS,
  222. )
  223. sig = sig.decode()
  224. # remove linebreaks from sig
  225. sig = sig.replace("\n", " ").replace("\r", "")
  226. msg.add_header("DKIM-Signature", sig[len("DKIM-Signature: ") :])
  227. def add_or_replace_header(msg: Message, header: str, value: str):
  228. try:
  229. msg.add_header(header, value)
  230. except ValueError:
  231. # the header exists already
  232. msg.replace_header(header, value)
  233. def delete_header(msg: Message, header: str):
  234. """a header can appear several times in message."""
  235. for h in msg._headers:
  236. if h[0].lower() == header.lower():
  237. msg._headers.remove(h)
  238. def email_belongs_to_alias_domains(email: str) -> bool:
  239. """return True if an emails ends with one of the alias domains provided by SimpleLogin"""
  240. for domain in ALIAS_DOMAINS:
  241. if email.endswith("@" + domain):
  242. return True
  243. return False
  244. def can_be_used_as_personal_email(email: str) -> bool:
  245. """return True if an email can be used as a personal email. Currently the only condition is email domain is not
  246. - one of ALIAS_DOMAINS
  247. - one of custom domains
  248. """
  249. domain = get_email_domain_part(email)
  250. if not domain:
  251. return False
  252. if domain in ALIAS_DOMAINS:
  253. return False
  254. from app.models import CustomDomain
  255. if CustomDomain.get_by(domain=domain, verified=True):
  256. return False
  257. return True