email_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. import os
  2. from email.header import decode_header
  3. from email.message import Message
  4. from email.mime.base import MIMEBase
  5. from email.mime.multipart import MIMEMultipart
  6. from email.mime.text import MIMEText
  7. from email.utils import make_msgid, formatdate, parseaddr
  8. from smtplib import SMTP
  9. from typing import Optional
  10. import dkim
  11. from jinja2 import Environment, FileSystemLoader
  12. from app.config import (
  13. SUPPORT_EMAIL,
  14. ROOT_DIR,
  15. POSTFIX_SERVER,
  16. NOT_SEND_EMAIL,
  17. DKIM_SELECTOR,
  18. DKIM_PRIVATE_KEY,
  19. DKIM_HEADERS,
  20. ALIAS_DOMAINS,
  21. SUPPORT_NAME,
  22. POSTFIX_SUBMISSION_TLS,
  23. MAX_NB_EMAIL_FREE_PLAN,
  24. DISPOSABLE_EMAIL_DOMAINS,
  25. )
  26. from app.dns_utils import get_mx_domains
  27. from app.log import LOG
  28. from app.models import Mailbox, User
  29. def render(template_name, **kwargs) -> str:
  30. templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
  31. env = Environment(loader=FileSystemLoader(templates_dir))
  32. template = env.get_template(template_name)
  33. return template.render(MAX_NB_EMAIL_FREE_PLAN=MAX_NB_EMAIL_FREE_PLAN, **kwargs)
  34. def send_welcome_email(user):
  35. send_email(
  36. user.email,
  37. f"Welcome to SimpleLogin {user.name}",
  38. render("com/welcome.txt", name=user.name, user=user),
  39. render("com/welcome.html", name=user.name, user=user),
  40. )
  41. def send_trial_end_soon_email(user):
  42. send_email(
  43. user.email,
  44. f"Your trial will end soon {user.name}",
  45. render("transactional/trial-end.txt", name=user.name, user=user),
  46. render("transactional/trial-end.html", name=user.name, user=user),
  47. )
  48. def send_activation_email(email, name, activation_link):
  49. send_email(
  50. email,
  51. f"Just one more step to join SimpleLogin {name}",
  52. render(
  53. "transactional/activation.txt",
  54. name=name,
  55. activation_link=activation_link,
  56. email=email,
  57. ),
  58. render(
  59. "transactional/activation.html",
  60. name=name,
  61. activation_link=activation_link,
  62. email=email,
  63. ),
  64. )
  65. def send_reset_password_email(email, name, reset_password_link):
  66. send_email(
  67. email,
  68. f"Reset your password on SimpleLogin",
  69. render(
  70. "transactional/reset-password.txt",
  71. name=name,
  72. reset_password_link=reset_password_link,
  73. ),
  74. render(
  75. "transactional/reset-password.html",
  76. name=name,
  77. reset_password_link=reset_password_link,
  78. ),
  79. )
  80. def send_change_email(new_email, current_email, name, link):
  81. send_email(
  82. new_email,
  83. f"Confirm email update on SimpleLogin",
  84. render(
  85. "transactional/change-email.txt",
  86. name=name,
  87. link=link,
  88. new_email=new_email,
  89. current_email=current_email,
  90. ),
  91. render(
  92. "transactional/change-email.html",
  93. name=name,
  94. link=link,
  95. new_email=new_email,
  96. current_email=current_email,
  97. ),
  98. )
  99. def send_new_app_email(email, name):
  100. send_email(
  101. email,
  102. f"Any question/feedback for SimpleLogin {name}?",
  103. render("com/new-app.txt", name=name),
  104. render("com/new-app.html", name=name),
  105. )
  106. def send_test_email_alias(email, name):
  107. send_email(
  108. email,
  109. f"This email is sent to {email}",
  110. render("transactional/test-email.txt", name=name, alias=email),
  111. render("transactional/test-email.html", name=name, alias=email),
  112. )
  113. def send_cannot_create_directory_alias(user, alias, directory):
  114. """when user cancels their subscription, they cannot create alias on the fly.
  115. If this happens, send them an email to notify
  116. """
  117. send_email(
  118. user.email,
  119. f"Alias {alias} cannot be created",
  120. render(
  121. "transactional/cannot-create-alias-directory.txt",
  122. name=user.name,
  123. alias=alias,
  124. directory=directory,
  125. ),
  126. render(
  127. "transactional/cannot-create-alias-directory.html",
  128. name=user.name,
  129. alias=alias,
  130. directory=directory,
  131. ),
  132. )
  133. def send_cannot_create_domain_alias(user, alias, domain):
  134. """when user cancels their subscription, they cannot create alias on the fly with custom domain.
  135. If this happens, send them an email to notify
  136. """
  137. send_email(
  138. user.email,
  139. f"Alias {alias} cannot be created",
  140. render(
  141. "transactional/cannot-create-alias-domain.txt",
  142. name=user.name,
  143. alias=alias,
  144. domain=domain,
  145. ),
  146. render(
  147. "transactional/cannot-create-alias-domain.html",
  148. name=user.name,
  149. alias=alias,
  150. domain=domain,
  151. ),
  152. )
  153. def send_email(
  154. to_email, subject, plaintext, html=None, bounced_email: Optional[Message] = None
  155. ):
  156. if NOT_SEND_EMAIL:
  157. LOG.d(
  158. "send email with subject %s to %s, plaintext: %s",
  159. subject,
  160. to_email,
  161. plaintext,
  162. )
  163. return
  164. LOG.d("send email to %s, subject %s", to_email, subject)
  165. if POSTFIX_SUBMISSION_TLS:
  166. smtp = SMTP(POSTFIX_SERVER, 587)
  167. smtp.starttls()
  168. else:
  169. smtp = SMTP(POSTFIX_SERVER, 25)
  170. if bounced_email:
  171. msg = MIMEMultipart("mixed")
  172. # add email main body
  173. body = MIMEMultipart("alternative")
  174. body.attach(MIMEText(plaintext, "text"))
  175. if html:
  176. body.attach(MIMEText(html, "html"))
  177. msg.attach(body)
  178. # add attachment
  179. rfcmessage = MIMEBase("message", "rfc822")
  180. rfcmessage.attach(bounced_email)
  181. msg.attach(rfcmessage)
  182. else:
  183. msg = MIMEMultipart("alternative")
  184. msg.attach(MIMEText(plaintext, "text"))
  185. if html:
  186. msg.attach(MIMEText(html, "html"))
  187. msg["Subject"] = subject
  188. msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
  189. msg["To"] = to_email
  190. msg_id_header = make_msgid()
  191. msg["Message-ID"] = msg_id_header
  192. date_header = formatdate()
  193. msg["Date"] = date_header
  194. # add DKIM
  195. email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
  196. add_dkim_signature(msg, email_domain)
  197. msg_raw = msg.as_bytes()
  198. smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
  199. def get_email_local_part(address):
  200. """
  201. Get the local part from email
  202. ab@cd.com -> ab
  203. """
  204. return address[: address.find("@")]
  205. def get_email_domain_part(address):
  206. """
  207. Get the domain part from email
  208. ab@cd.com -> cd.com
  209. """
  210. return address[address.find("@") + 1 :].strip().lower()
  211. def add_dkim_signature(msg: Message, email_domain: str):
  212. delete_header(msg, "DKIM-Signature")
  213. # Specify headers in "byte" form
  214. # Generate message signature
  215. sig = dkim.sign(
  216. msg.as_bytes(),
  217. DKIM_SELECTOR,
  218. email_domain.encode(),
  219. DKIM_PRIVATE_KEY.encode(),
  220. include_headers=DKIM_HEADERS,
  221. )
  222. sig = sig.decode()
  223. # remove linebreaks from sig
  224. sig = sig.replace("\n", " ").replace("\r", "")
  225. msg["DKIM-Signature"] = sig[len("DKIM-Signature: ") :]
  226. def add_or_replace_header(msg: Message, header: str, value: str):
  227. """
  228. Remove all occurrences of `header` and add `header` with `value`.
  229. """
  230. delete_header(msg, header)
  231. msg[header] = value
  232. def delete_header(msg: Message, header: str):
  233. """a header can appear several times in message."""
  234. # inspired from https://stackoverflow.com/a/47903323/1428034
  235. for i in reversed(range(len(msg._headers))):
  236. header_name = msg._headers[i][0].lower()
  237. if header_name == header.lower():
  238. del msg._headers[i]
  239. def delete_all_headers_except(msg: Message, headers: [str]):
  240. headers = [h.lower() for h in headers]
  241. for i in reversed(range(len(msg._headers))):
  242. header_name = msg._headers[i][0].lower()
  243. if header_name not in headers:
  244. del msg._headers[i]
  245. def email_belongs_to_alias_domains(address: str) -> bool:
  246. """return True if an email ends with one of the alias domains provided by SimpleLogin"""
  247. for domain in ALIAS_DOMAINS:
  248. if address.endswith("@" + domain):
  249. return True
  250. return False
  251. def can_be_used_as_personal_email(email: str) -> bool:
  252. """return True if an email can be used as a personal email. Currently the only condition is email domain is not
  253. - one of ALIAS_DOMAINS
  254. - one of custom domains
  255. """
  256. domain = get_email_domain_part(email)
  257. if not domain:
  258. return False
  259. if domain in ALIAS_DOMAINS:
  260. return False
  261. from app.models import CustomDomain
  262. if CustomDomain.get_by(domain=domain, verified=True):
  263. return False
  264. if is_disposable_domain(domain):
  265. LOG.d("Domain %s is disposable", domain)
  266. return False
  267. # check if email MX domain is disposable
  268. mx_domains = get_mx_domain_list(domain)
  269. # if no MX record, email is not valid
  270. if not mx_domains:
  271. return False
  272. for mx_domain in mx_domains:
  273. if is_disposable_domain(mx_domain):
  274. LOG.d("MX Domain %s %s is disposable", mx_domain, domain)
  275. return False
  276. return True
  277. def is_disposable_domain(domain):
  278. for d in DISPOSABLE_EMAIL_DOMAINS:
  279. if domain == d:
  280. return True
  281. # subdomain
  282. if domain.endswith("." + d):
  283. return True
  284. return False
  285. def get_mx_domain_list(domain) -> [str]:
  286. """return list of MX domains for a given email.
  287. domain name ends *without* a dot (".") at the end.
  288. """
  289. priority_domains = get_mx_domains(domain)
  290. return [d[:-1] for _, d in priority_domains]
  291. def email_already_used(email: str) -> bool:
  292. """test if an email can be used when:
  293. - user signs up
  294. - add a new mailbox
  295. """
  296. if User.get_by(email=email):
  297. return True
  298. if Mailbox.get_by(email=email):
  299. return True
  300. return False
  301. def mailbox_already_used(email: str, user) -> bool:
  302. if Mailbox.get_by(email=email, user_id=user.id):
  303. return True
  304. # support the case user wants to re-add their real email as mailbox
  305. # can happen when user changes their root email and wants to add this new email as mailbox
  306. if email == user.email:
  307. return False
  308. return False
  309. def get_orig_message_from_bounce(msg: Message) -> Message:
  310. """parse the original email from Bounce"""
  311. i = 0
  312. for part in msg.walk():
  313. i += 1
  314. # the original message is the 4th part
  315. # 1st part is the root part, multipart/report
  316. # 2nd is text/plain, Postfix log
  317. # ...
  318. # 7th is original message
  319. if i == 7:
  320. return part
  321. def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
  322. """parse the original email from Spamassassin report"""
  323. i = 0
  324. for part in msg.walk():
  325. i += 1
  326. # the original message is the 4th part
  327. # 1st part is the root part, multipart/report
  328. # 2nd is text/plain, SpamAssassin part
  329. # 3rd is the original message in message/rfc822 content type
  330. # 4th is original message
  331. if i == 4:
  332. return part
  333. def get_addrs_from_header(msg: Message, header) -> [str]:
  334. """Get all addresses contained in `header`
  335. Used for To or CC header.
  336. """
  337. ret = []
  338. header_content = msg.get_all(header)
  339. if not header_content:
  340. return ret
  341. for addrs in header_content:
  342. for addr in addrs.split(","):
  343. ret.append(addr.strip())
  344. # do not return empty string
  345. return [r for r in ret if r]
  346. def get_spam_info(msg: Message) -> (bool, str):
  347. """parse SpamAssassin header to detect whether a message is classified as spam.
  348. Return (is spam, spam status detail)
  349. The header format is
  350. ```X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
  351. DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
  352. URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
  353. """
  354. spamassassin_status = msg["X-Spam-Status"]
  355. if not spamassassin_status:
  356. return False, ""
  357. # yes or no
  358. spamassassin_answer = spamassassin_status[: spamassassin_status.find(",")]
  359. return spamassassin_answer.lower() == "yes", spamassassin_status
  360. def parseaddr_unicode(addr) -> (str, str):
  361. """Like parseaddr but return name in unicode instead of in RFC 2047 format
  362. '=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>' -> ('Nhơn Nguyễn', "abcd@gmail.com")
  363. """
  364. name, email = parseaddr(addr)
  365. email = email.strip().lower()
  366. if name:
  367. name = name.strip()
  368. decoded_string, charset = decode_header(name)[0]
  369. if charset is not None:
  370. name = decoded_string.decode(charset)
  371. else:
  372. name = decoded_string
  373. return name, email