email_utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. import os
  2. from email.message import Message
  3. from email.mime.base import MIMEBase
  4. from email.mime.multipart import MIMEMultipart
  5. from email.mime.text import MIMEText
  6. from email.utils import make_msgid, formatdate, parseaddr, formataddr
  7. from smtplib import SMTP
  8. from typing import Optional
  9. import dkim
  10. from jinja2 import Environment, FileSystemLoader
  11. from app.config import (
  12. SUPPORT_EMAIL,
  13. ROOT_DIR,
  14. POSTFIX_SERVER,
  15. NOT_SEND_EMAIL,
  16. DKIM_SELECTOR,
  17. DKIM_PRIVATE_KEY,
  18. DKIM_HEADERS,
  19. ALIAS_DOMAINS,
  20. SUPPORT_NAME,
  21. POSTFIX_SUBMISSION_TLS,
  22. )
  23. from app.log import LOG
  24. from app.models import Mailbox, User
  25. def render(template_name, **kwargs) -> str:
  26. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  27. env = Environment(loader=FileSystemLoader(templates_dir))
  28. template = env.get_template(template_name)
  29. return template.render(**kwargs)
  30. def send_welcome_email(user):
  31. send_email(
  32. user.email,
  33. f"Welcome to SimpleLogin {user.name}",
  34. render("com/welcome.txt", name=user.name, user=user),
  35. render("com/welcome.html", name=user.name, user=user),
  36. )
  37. def send_trial_end_soon_email(user):
  38. send_email(
  39. user.email,
  40. f"Your trial will end soon {user.name}",
  41. render("transactional/trial-end.txt", name=user.name, user=user),
  42. render("transactional/trial-end.html", name=user.name, user=user),
  43. )
  44. def send_activation_email(email, name, activation_link):
  45. send_email(
  46. email,
  47. f"Just one more step to join SimpleLogin {name}",
  48. render(
  49. "transactional/activation.txt",
  50. name=name,
  51. activation_link=activation_link,
  52. email=email,
  53. ),
  54. render(
  55. "transactional/activation.html",
  56. name=name,
  57. activation_link=activation_link,
  58. email=email,
  59. ),
  60. )
  61. def send_reset_password_email(email, name, reset_password_link):
  62. send_email(
  63. email,
  64. f"Reset your password on SimpleLogin",
  65. render(
  66. "transactional/reset-password.txt",
  67. name=name,
  68. reset_password_link=reset_password_link,
  69. ),
  70. render(
  71. "transactional/reset-password.html",
  72. name=name,
  73. reset_password_link=reset_password_link,
  74. ),
  75. )
  76. def send_change_email(new_email, current_email, name, link):
  77. send_email(
  78. new_email,
  79. f"Confirm email update on SimpleLogin",
  80. render(
  81. "transactional/change-email.txt",
  82. name=name,
  83. link=link,
  84. new_email=new_email,
  85. current_email=current_email,
  86. ),
  87. render(
  88. "transactional/change-email.html",
  89. name=name,
  90. link=link,
  91. new_email=new_email,
  92. current_email=current_email,
  93. ),
  94. )
  95. def send_new_app_email(email, name):
  96. send_email(
  97. email,
  98. f"Any question/feedback for SimpleLogin {name}?",
  99. render("com/new-app.txt", name=name),
  100. render("com/new-app.html", name=name),
  101. )
  102. def send_test_email_alias(email, name):
  103. send_email(
  104. email,
  105. f"This email is sent to {email}",
  106. render("transactional/test-email.txt", name=name, alias=email),
  107. render("transactional/test-email.html", name=name, alias=email),
  108. )
  109. def send_cannot_create_directory_alias(user, alias, directory):
  110. """when user cancels their subscription, they cannot create alias on the fly.
  111. If this happens, send them an email to notify
  112. """
  113. send_email(
  114. user.email,
  115. f"Alias {alias} cannot be created",
  116. render(
  117. "transactional/cannot-create-alias-directory.txt",
  118. name=user.name,
  119. alias=alias,
  120. directory=directory,
  121. ),
  122. render(
  123. "transactional/cannot-create-alias-directory.html",
  124. name=user.name,
  125. alias=alias,
  126. directory=directory,
  127. ),
  128. )
  129. def send_cannot_create_domain_alias(user, alias, domain):
  130. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  131. If this happens, send them an email to notify
  132. """
  133. send_email(
  134. user.email,
  135. f"Alias {alias} cannot be created",
  136. render(
  137. "transactional/cannot-create-alias-domain.txt",
  138. name=user.name,
  139. alias=alias,
  140. domain=domain,
  141. ),
  142. render(
  143. "transactional/cannot-create-alias-domain.html",
  144. name=user.name,
  145. alias=alias,
  146. domain=domain,
  147. ),
  148. )
  149. def send_email(
  150. to_email, subject, plaintext, html=None, bounced_email: Optional[Message] = None
  151. ):
  152. if NOT_SEND_EMAIL:
  153. LOG.d(
  154. "send email with subject %s to %s, plaintext: %s",
  155. subject,
  156. to_email,
  157. plaintext,
  158. )
  159. return
  160. LOG.d("send email to %s, subject %s", to_email, subject)
  161. if POSTFIX_SUBMISSION_TLS:
  162. smtp = SMTP(POSTFIX_SERVER, 587)
  163. smtp.starttls()
  164. else:
  165. smtp = SMTP(POSTFIX_SERVER, 25)
  166. if bounced_email:
  167. msg = MIMEMultipart("mixed")
  168. # add email main body
  169. body = MIMEMultipart("alternative")
  170. body.attach(MIMEText(plaintext, "text"))
  171. if html:
  172. body.attach(MIMEText(html, "html"))
  173. msg.attach(body)
  174. # add attachment
  175. rfcmessage = MIMEBase("message", "rfc822")
  176. rfcmessage.attach(bounced_email)
  177. msg.attach(rfcmessage)
  178. else:
  179. msg = MIMEMultipart("alternative")
  180. msg.attach(MIMEText(plaintext, "text"))
  181. if html:
  182. msg.attach(MIMEText(html, "html"))
  183. msg["Subject"] = subject
  184. msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
  185. msg["To"] = to_email
  186. msg_id_header = make_msgid()
  187. msg["Message-ID"] = msg_id_header
  188. date_header = formatdate()
  189. msg["Date"] = date_header
  190. # add DKIM
  191. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  192. add_dkim_signature(msg, email_domain)
  193. msg_raw = msg.as_string().encode()
  194. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  195. def get_email_local_part(address):
  196. """
  197. Get the local part from email
  198. ab@cd.com -> ab
  199. """
  200. return address[: address.find("@")]
  201. def get_email_domain_part(address):
  202. """
  203. Get the domain part from email
  204. ab@cd.com -> cd.com
  205. """
  206. return address[address.find("@") + 1 :]
  207. def add_dkim_signature(msg: Message, email_domain: str):
  208. delete_header(msg, "DKIM-Signature")
  209. # Specify headers in "byte" form
  210. # Generate message signature
  211. sig = dkim.sign(
  212. msg.as_string().encode(),
  213. DKIM_SELECTOR,
  214. email_domain.encode(),
  215. DKIM_PRIVATE_KEY.encode(),
  216. include_headers=DKIM_HEADERS,
  217. )
  218. sig = sig.decode()
  219. # remove linebreaks from sig
  220. sig = sig.replace("\n", " ").replace("\r", "")
  221. msg["DKIM-Signature"] = sig[len("DKIM-Signature: ") :]
  222. def add_or_replace_header(msg: Message, header: str, value: str):
  223. """
  224. Remove all occurrences of `header` and add `header` with `value`.
  225. """
  226. delete_header(msg, header)
  227. msg[header] = value
  228. def delete_header(msg: Message, header: str):
  229. """a header can appear several times in message."""
  230. # inspired from https://stackoverflow.com/a/47903323/1428034
  231. for i in reversed(range(len(msg._headers))):
  232. header_name = msg._headers[i][0].lower()
  233. if header_name == header.lower():
  234. del msg._headers[i]
  235. def delete_all_headers_except(msg: Message, headers: [str]):
  236. headers = [h.lower() for h in headers]
  237. for i in reversed(range(len(msg._headers))):
  238. header_name = msg._headers[i][0].lower()
  239. if header_name not in headers:
  240. del msg._headers[i]
  241. def email_belongs_to_alias_domains(address: str) -> bool:
  242. """return True if an email ends with one of the alias domains provided by SimpleLogin"""
  243. for domain in ALIAS_DOMAINS:
  244. if address.endswith("@" + domain):
  245. return True
  246. return False
  247. def can_be_used_as_personal_email(email: str) -> bool:
  248. """return True if an email can be used as a personal email. Currently the only condition is email domain is not
  249. - one of ALIAS_DOMAINS
  250. - one of custom domains
  251. """
  252. domain = get_email_domain_part(email)
  253. if not domain:
  254. return False
  255. if domain in ALIAS_DOMAINS:
  256. return False
  257. from app.models import CustomDomain
  258. if CustomDomain.get_by(domain=domain, verified=True):
  259. return False
  260. return True
  261. def email_already_used(email: str) -> bool:
  262. """test if an email can be used when:
  263. - user signs up
  264. - add a new mailbox
  265. """
  266. if User.get_by(email=email):
  267. return True
  268. if Mailbox.get_by(email=email):
  269. return True
  270. return False
  271. def mailbox_already_used(email: str, user) -> bool:
  272. if Mailbox.get_by(email=email):
  273. return True
  274. # support the case user wants to re-add their real email as mailbox
  275. # can happen when user changes their root email and wants to add this new email as mailbox
  276. if email == user.email:
  277. return False
  278. if User.get_by(email=email):
  279. return True
  280. return False
  281. def get_orig_message_from_bounce(msg: Message) -> Message:
  282. """parse the original email from Bounce"""
  283. i = 0
  284. for part in msg.walk():
  285. i += 1
  286. # the original message is the 4th part
  287. # 1st part is the root part, multipart/report
  288. # 2nd is text/plain, Postfix log
  289. # ...
  290. # 7th is original message
  291. if i == 7:
  292. return part
  293. def new_addr(old_addr, new_email) -> str:
  294. """replace First Last <first@example.com> by
  295. first@example.com by SimpleLogin <new_email>
  296. `new_email` is a special reply address
  297. """
  298. name, old_email = parseaddr(old_addr)
  299. new_name = f"{old_email} via SimpleLogin"
  300. new_addr = formataddr((new_name, new_email)).strip()
  301. return new_addr.strip()
  302. def get_addrs_from_header(msg: Message, header) -> [str]:
  303. """Get all addresses contained in `header`
  304. Used for To or CC header.
  305. """
  306. ret = []
  307. header_content = msg.get_all(header)
  308. if not header_content:
  309. return ret
  310. for addrs in header_content:
  311. for addr in addrs.split(","):
  312. ret.append(addr.strip())
  313. # do not return empty string
  314. return [r for r in ret if r]