email_utils.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. import email
  2. import os
  3. from email.message import EmailMessage, Message
  4. from email.mime.application import MIMEApplication
  5. from email.mime.base import MIMEBase
  6. from email.mime.multipart import MIMEMultipart
  7. from email.mime.text import MIMEText
  8. from email.utils import make_msgid, formatdate
  9. from smtplib import SMTP
  10. from typing import Optional
  11. import dkim
  12. from jinja2 import Environment, FileSystemLoader
  13. from app.config import (
  14. SUPPORT_EMAIL,
  15. ROOT_DIR,
  16. POSTFIX_SERVER,
  17. NOT_SEND_EMAIL,
  18. DKIM_SELECTOR,
  19. DKIM_PRIVATE_KEY,
  20. DKIM_HEADERS,
  21. ALIAS_DOMAINS,
  22. SUPPORT_NAME,
  23. POSTFIX_SUBMISSION_TLS,
  24. )
  25. from app.log import LOG
  26. from app.models import Mailbox, User
  27. def render(template_name, **kwargs) -> str:
  28. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  29. env = Environment(loader=FileSystemLoader(templates_dir))
  30. template = env.get_template(template_name)
  31. return template.render(**kwargs)
  32. def send_welcome_email(user):
  33. send_email(
  34. user.email,
  35. f"Welcome to SimpleLogin {user.name}",
  36. render("com/welcome.txt", name=user.name, user=user),
  37. render("com/welcome.html", name=user.name, user=user),
  38. )
  39. def send_trial_end_soon_email(user):
  40. send_email(
  41. user.email,
  42. f"Your trial will end soon {user.name}",
  43. render("transactional/trial-end.txt", name=user.name, user=user),
  44. render("transactional/trial-end.html", name=user.name, user=user),
  45. )
  46. def send_activation_email(email, name, activation_link):
  47. send_email(
  48. email,
  49. f"Just one more step to join SimpleLogin {name}",
  50. render(
  51. "transactional/activation.txt",
  52. name=name,
  53. activation_link=activation_link,
  54. email=email,
  55. ),
  56. render(
  57. "transactional/activation.html",
  58. name=name,
  59. activation_link=activation_link,
  60. email=email,
  61. ),
  62. )
  63. def send_reset_password_email(email, name, reset_password_link):
  64. send_email(
  65. email,
  66. f"Reset your password on SimpleLogin",
  67. render(
  68. "transactional/reset-password.txt",
  69. name=name,
  70. reset_password_link=reset_password_link,
  71. ),
  72. render(
  73. "transactional/reset-password.html",
  74. name=name,
  75. reset_password_link=reset_password_link,
  76. ),
  77. )
  78. def send_change_email(new_email, current_email, name, link):
  79. send_email(
  80. new_email,
  81. f"Confirm email update on SimpleLogin",
  82. render(
  83. "transactional/change-email.txt",
  84. name=name,
  85. link=link,
  86. new_email=new_email,
  87. current_email=current_email,
  88. ),
  89. render(
  90. "transactional/change-email.html",
  91. name=name,
  92. link=link,
  93. new_email=new_email,
  94. current_email=current_email,
  95. ),
  96. )
  97. def send_new_app_email(email, name):
  98. send_email(
  99. email,
  100. f"Any question/feedback for SimpleLogin {name}?",
  101. render("com/new-app.txt", name=name),
  102. render("com/new-app.html", name=name),
  103. )
  104. def send_test_email_alias(email, name):
  105. send_email(
  106. email,
  107. f"This email is sent to {email}",
  108. render("transactional/test-email.txt", name=name, alias=email),
  109. render("transactional/test-email.html", name=name, alias=email),
  110. )
  111. def send_cannot_create_directory_alias(user, alias, directory):
  112. """when user cancels their subscription, they cannot create alias on the fly.
  113. If this happens, send them an email to notify
  114. """
  115. send_email(
  116. user.email,
  117. f"Alias {alias} cannot be created",
  118. render(
  119. "transactional/cannot-create-alias-directory.txt",
  120. name=user.name,
  121. alias=alias,
  122. directory=directory,
  123. ),
  124. render(
  125. "transactional/cannot-create-alias-directory.html",
  126. name=user.name,
  127. alias=alias,
  128. directory=directory,
  129. ),
  130. )
  131. def send_cannot_create_domain_alias(user, alias, domain):
  132. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  133. If this happens, send them an email to notify
  134. """
  135. send_email(
  136. user.email,
  137. f"Alias {alias} cannot be created",
  138. render(
  139. "transactional/cannot-create-alias-domain.txt",
  140. name=user.name,
  141. alias=alias,
  142. domain=domain,
  143. ),
  144. render(
  145. "transactional/cannot-create-alias-domain.html",
  146. name=user.name,
  147. alias=alias,
  148. domain=domain,
  149. ),
  150. )
  151. def send_email(
  152. to_email, subject, plaintext, html=None, bounced_email: Optional[Message] = None
  153. ):
  154. if NOT_SEND_EMAIL:
  155. LOG.d(
  156. "send email with subject %s to %s, plaintext: %s",
  157. subject,
  158. to_email,
  159. plaintext,
  160. )
  161. return
  162. LOG.d("send email to %s, subject %s", to_email, subject)
  163. if POSTFIX_SUBMISSION_TLS:
  164. smtp = SMTP(POSTFIX_SERVER, 587)
  165. smtp.starttls()
  166. else:
  167. smtp = SMTP(POSTFIX_SERVER, 25)
  168. if bounced_email:
  169. msg = MIMEMultipart("mixed")
  170. # add email main body
  171. body = MIMEMultipart("alternative")
  172. body.attach(MIMEText(plaintext, "text"))
  173. if html:
  174. body.attach(MIMEText(html, "html"))
  175. msg.attach(body)
  176. # add attachment
  177. rfcmessage = MIMEBase("message", "rfc822")
  178. rfcmessage.attach(bounced_email)
  179. msg.attach(rfcmessage)
  180. else:
  181. msg = MIMEMultipart("alternative")
  182. msg.attach(MIMEText(plaintext, "text"))
  183. if html:
  184. msg.attach(MIMEText(html, "html"))
  185. msg["Subject"] = subject
  186. msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
  187. msg["To"] = to_email
  188. msg_id_header = make_msgid()
  189. msg["Message-ID"] = msg_id_header
  190. date_header = formatdate()
  191. msg["Date"] = date_header
  192. # add DKIM
  193. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  194. add_dkim_signature(msg, email_domain)
  195. msg_raw = msg.as_string().encode()
  196. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  197. def get_email_part(email_from):
  198. """parse email from header and return the email part
  199. First Last <ab@cd.com> -> ab@cd.com
  200. ab@cd.com -> ""
  201. """
  202. if "<" in email_from:
  203. return email_from[email_from.find("<") + 1 : email_from.find(">")].strip()
  204. return email_from
  205. def get_email_local_part(email):
  206. """
  207. Get the local part from email
  208. ab@cd.com -> ab
  209. """
  210. return email[: email.find("@")]
  211. def get_email_domain_part(email):
  212. """
  213. Get the domain part from email
  214. ab@cd.com -> cd.com
  215. """
  216. return email[email.find("@") + 1 :]
  217. def add_dkim_signature(msg: Message, email_domain: str):
  218. delete_header(msg, "DKIM-Signature")
  219. # Specify headers in "byte" form
  220. # Generate message signature
  221. sig = dkim.sign(
  222. msg.as_string().encode(),
  223. DKIM_SELECTOR,
  224. email_domain.encode(),
  225. DKIM_PRIVATE_KEY.encode(),
  226. include_headers=DKIM_HEADERS,
  227. )
  228. sig = sig.decode()
  229. # remove linebreaks from sig
  230. sig = sig.replace("\n", " ").replace("\r", "")
  231. msg["DKIM-Signature"] = sig[len("DKIM-Signature: ") :]
  232. def add_or_replace_header(msg: Message, header: str, value: str):
  233. """
  234. Remove all occurrences of `header` and add `header` with `value`.
  235. """
  236. delete_header(msg, header)
  237. msg[header] = value
  238. def delete_header(msg: Message, header: str):
  239. """a header can appear several times in message."""
  240. # inspired from https://stackoverflow.com/a/47903323/1428034
  241. for i in reversed(range(len(msg._headers))):
  242. header_name = msg._headers[i][0].lower()
  243. if header_name == header.lower():
  244. del msg._headers[i]
  245. def delete_all_headers_except(msg: Message, headers: [str]):
  246. headers = [h.lower() for h in headers]
  247. for i in reversed(range(len(msg._headers))):
  248. header_name = msg._headers[i][0].lower()
  249. if header_name not in headers:
  250. del msg._headers[i]
  251. def email_belongs_to_alias_domains(email: str) -> bool:
  252. """return True if an email ends with one of the alias domains provided by SimpleLogin"""
  253. for domain in ALIAS_DOMAINS:
  254. if email.endswith("@" + domain):
  255. return True
  256. return False
  257. def can_be_used_as_personal_email(email: str) -> bool:
  258. """return True if an email can be used as a personal email. Currently the only condition is email domain is not
  259. - one of ALIAS_DOMAINS
  260. - one of custom domains
  261. """
  262. domain = get_email_domain_part(email)
  263. if not domain:
  264. return False
  265. if domain in ALIAS_DOMAINS:
  266. return False
  267. from app.models import CustomDomain
  268. if CustomDomain.get_by(domain=domain, verified=True):
  269. return False
  270. return True
  271. def email_already_used(email: str) -> bool:
  272. """test if an email can be used when:
  273. - user signs up
  274. - add a new mailbox
  275. """
  276. if User.get_by(email=email):
  277. return True
  278. if Mailbox.get_by(email=email):
  279. return True
  280. return False
  281. def mailbox_already_used(email: str, user) -> bool:
  282. if Mailbox.get_by(email=email):
  283. return True
  284. # support the case user wants to re-add their real email as mailbox
  285. # can happen when user changes their root email and wants to add this new email as mailbox
  286. if email == user.email:
  287. return False
  288. if User.get_by(email=email):
  289. return True
  290. return False
  291. def get_orig_message_from_bounce(msg: Message) -> Message:
  292. """parse the original email from Bounce"""
  293. i = 0
  294. for part in msg.walk():
  295. i += 1
  296. # the original message is the 4th part
  297. # 1st part is the root part, multipart/report
  298. # 2nd is text/plain, Postfix log
  299. # ...
  300. # 7th is original message
  301. if i == 7:
  302. return part