email_utils.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import email
  2. import os
  3. from email.message import EmailMessage, Message
  4. from email.mime.application import MIMEApplication
  5. from email.mime.base import MIMEBase
  6. from email.mime.multipart import MIMEMultipart
  7. from email.mime.text import MIMEText
  8. from email.utils import make_msgid, formatdate
  9. from smtplib import SMTP
  10. from typing import Optional
  11. import dkim
  12. from jinja2 import Environment, FileSystemLoader
  13. from app.config import (
  14. SUPPORT_EMAIL,
  15. ROOT_DIR,
  16. POSTFIX_SERVER,
  17. NOT_SEND_EMAIL,
  18. DKIM_SELECTOR,
  19. DKIM_PRIVATE_KEY,
  20. DKIM_HEADERS,
  21. ALIAS_DOMAINS,
  22. SUPPORT_NAME,
  23. )
  24. from app.log import LOG
  25. from app.models import Mailbox, User
  26. def render(template_name, **kwargs) -> str:
  27. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  28. env = Environment(loader=FileSystemLoader(templates_dir))
  29. template = env.get_template(template_name)
  30. return template.render(**kwargs)
  31. def send_welcome_email(user):
  32. send_email(
  33. user.email,
  34. f"Welcome to SimpleLogin {user.name}",
  35. render("com/welcome.txt", name=user.name, user=user),
  36. render("com/welcome.html", name=user.name, user=user),
  37. )
  38. def send_trial_end_soon_email(user):
  39. send_email(
  40. user.email,
  41. f"Your trial will end soon {user.name}",
  42. render("transactional/trial-end.txt", name=user.name, user=user),
  43. render("transactional/trial-end.html", name=user.name, user=user),
  44. )
  45. def send_activation_email(email, name, activation_link):
  46. send_email(
  47. email,
  48. f"Just one more step to join SimpleLogin {name}",
  49. render(
  50. "transactional/activation.txt",
  51. name=name,
  52. activation_link=activation_link,
  53. email=email,
  54. ),
  55. render(
  56. "transactional/activation.html",
  57. name=name,
  58. activation_link=activation_link,
  59. email=email,
  60. ),
  61. )
  62. def send_reset_password_email(email, name, reset_password_link):
  63. send_email(
  64. email,
  65. f"Reset your password on SimpleLogin",
  66. render(
  67. "transactional/reset-password.txt",
  68. name=name,
  69. reset_password_link=reset_password_link,
  70. ),
  71. render(
  72. "transactional/reset-password.html",
  73. name=name,
  74. reset_password_link=reset_password_link,
  75. ),
  76. )
  77. def send_change_email(new_email, current_email, name, link):
  78. send_email(
  79. new_email,
  80. f"Confirm email update on SimpleLogin",
  81. render(
  82. "transactional/change-email.txt",
  83. name=name,
  84. link=link,
  85. new_email=new_email,
  86. current_email=current_email,
  87. ),
  88. render(
  89. "transactional/change-email.html",
  90. name=name,
  91. link=link,
  92. new_email=new_email,
  93. current_email=current_email,
  94. ),
  95. )
  96. def send_new_app_email(email, name):
  97. send_email(
  98. email,
  99. f"Any question/feedback for SimpleLogin {name}?",
  100. render("com/new-app.txt", name=name),
  101. render("com/new-app.html", name=name),
  102. )
  103. def send_test_email_alias(email, name):
  104. send_email(
  105. email,
  106. f"This email is sent to {email}",
  107. render("transactional/test-email.txt", name=name, alias=email),
  108. render("transactional/test-email.html", name=name, alias=email),
  109. )
  110. def send_cannot_create_directory_alias(user, alias, directory):
  111. """when user cancels their subscription, they cannot create alias on the fly.
  112. If this happens, send them an email to notify
  113. """
  114. send_email(
  115. user.email,
  116. f"Alias {alias} cannot be created",
  117. render(
  118. "transactional/cannot-create-alias-directory.txt",
  119. name=user.name,
  120. alias=alias,
  121. directory=directory,
  122. ),
  123. render(
  124. "transactional/cannot-create-alias-directory.html",
  125. name=user.name,
  126. alias=alias,
  127. directory=directory,
  128. ),
  129. )
  130. def send_cannot_create_domain_alias(user, alias, domain):
  131. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  132. If this happens, send them an email to notify
  133. """
  134. send_email(
  135. user.email,
  136. f"Alias {alias} cannot be created",
  137. render(
  138. "transactional/cannot-create-alias-domain.txt",
  139. name=user.name,
  140. alias=alias,
  141. domain=domain,
  142. ),
  143. render(
  144. "transactional/cannot-create-alias-domain.html",
  145. name=user.name,
  146. alias=alias,
  147. domain=domain,
  148. ),
  149. )
  150. def send_email(
  151. to_email, subject, plaintext, html, bounced_email: Optional[Message] = None
  152. ):
  153. if NOT_SEND_EMAIL:
  154. LOG.d(
  155. "send email with subject %s to %s, plaintext: %s",
  156. subject,
  157. to_email,
  158. plaintext,
  159. )
  160. return
  161. # host IP, setup via Docker network
  162. smtp = SMTP(POSTFIX_SERVER, 25)
  163. if bounced_email:
  164. msg = MIMEMultipart("mixed")
  165. # add email main body
  166. body = MIMEMultipart("alternative")
  167. body.attach(MIMEText(plaintext, "text"))
  168. if html:
  169. body.attach(MIMEText(html, "html"))
  170. msg.attach(body)
  171. # add attachment
  172. rfcmessage = MIMEBase("message", "rfc822")
  173. rfcmessage.attach(bounced_email)
  174. msg.attach(rfcmessage)
  175. else:
  176. msg = MIMEMultipart("alternative")
  177. msg.attach(MIMEText(plaintext, "text"))
  178. if html:
  179. msg.attach(MIMEText(html, "html"))
  180. msg["Subject"] = subject
  181. msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
  182. msg["To"] = to_email
  183. msg_id_header = make_msgid()
  184. LOG.d("message-id %s", msg_id_header)
  185. msg["Message-ID"] = msg_id_header
  186. date_header = formatdate()
  187. LOG.d("Date header: %s", date_header)
  188. msg["Date"] = date_header
  189. # add DKIM
  190. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  191. add_dkim_signature(msg, email_domain)
  192. msg_raw = msg.as_string().encode()
  193. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  194. def get_email_name(email_from):
  195. """parse email from header and return the name part
  196. First Last <ab@cd.com> -> First Last
  197. ab@cd.com -> ""
  198. """
  199. if "<" in email_from:
  200. return email_from[: email_from.find("<")].strip()
  201. return ""
  202. def get_email_part(email_from):
  203. """parse email from header and return the email part
  204. First Last <ab@cd.com> -> ab@cd.com
  205. ab@cd.com -> ""
  206. """
  207. if "<" in email_from:
  208. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  209. return email_from
  210. def get_email_local_part(email):
  211. """
  212. Get the local part from email
  213. ab@cd.com -> ab
  214. """
  215. return email[: email.find("@")]
  216. def get_email_domain_part(email):
  217. """
  218. Get the domain part from email
  219. ab@cd.com -> cd.com
  220. """
  221. return email[email.find("@") + 1 :]
  222. def add_dkim_signature(msg: Message, email_domain: str):
  223. delete_header(msg, "DKIM-Signature")
  224. # Specify headers in "byte" form
  225. # Generate message signature
  226. sig = dkim.sign(
  227. msg.as_string().encode(),
  228. DKIM_SELECTOR,
  229. email_domain.encode(),
  230. DKIM_PRIVATE_KEY.encode(),
  231. include_headers=DKIM_HEADERS,
  232. )
  233. sig = sig.decode()
  234. # remove linebreaks from sig
  235. sig = sig.replace("\n", " ").replace("\r", "")
  236. msg["DKIM-Signature"] = sig[len("DKIM-Signature: ") :]
  237. def add_or_replace_header(msg: Message, header: str, value: str):
  238. """
  239. Remove all occurrences of `header` and add `header` with `value`.
  240. """
  241. delete_header(msg, header)
  242. msg[header] = value
  243. def delete_header(msg: Message, header: str):
  244. """a header can appear several times in message."""
  245. # inspired from https://stackoverflow.com/a/47903323/1428034
  246. for i in reversed(range(len(msg._headers))):
  247. header_name = msg._headers[i][0].lower()
  248. if header_name == header.lower():
  249. del msg._headers[i]
  250. def email_belongs_to_alias_domains(email: str) -> bool:
  251. """return True if an email ends with one of the alias domains provided by SimpleLogin"""
  252. for domain in ALIAS_DOMAINS:
  253. if email.endswith("@" + domain):
  254. return True
  255. return False
  256. def can_be_used_as_personal_email(email: str) -> bool:
  257. """return True if an email can be used as a personal email. Currently the only condition is email domain is not
  258. - one of ALIAS_DOMAINS
  259. - one of custom domains
  260. """
  261. domain = get_email_domain_part(email)
  262. if not domain:
  263. return False
  264. if domain in ALIAS_DOMAINS:
  265. return False
  266. from app.models import CustomDomain
  267. if CustomDomain.get_by(domain=domain, verified=True):
  268. return False
  269. return True
  270. def email_already_used(email: str) -> bool:
  271. """test if an email can be used when:
  272. - user signs up
  273. - add a new mailbox
  274. """
  275. if User.get_by(email=email):
  276. return True
  277. if Mailbox.get_by(email=email):
  278. return True
  279. return False