123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- import os
- from email.header import decode_header
- from email.message import Message
- from email.mime.base import MIMEBase
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.utils import make_msgid, formatdate, parseaddr
- from smtplib import SMTP
- from typing import Optional
- import arrow
- import dkim
- from jinja2 import Environment, FileSystemLoader
- from app.config import (
- SUPPORT_EMAIL,
- ROOT_DIR,
- POSTFIX_SERVER,
- NOT_SEND_EMAIL,
- DKIM_SELECTOR,
- DKIM_PRIVATE_KEY,
- DKIM_HEADERS,
- ALIAS_DOMAINS,
- SUPPORT_NAME,
- POSTFIX_SUBMISSION_TLS,
- MAX_NB_EMAIL_FREE_PLAN,
- DISPOSABLE_EMAIL_DOMAINS,
- MAX_ALERT_24H,
- POSTFIX_PORT,
- )
- from app.dns_utils import get_mx_domains
- from app.extensions import db
- from app.log import LOG
- from app.models import Mailbox, User, SentAlert
- def render(template_name, **kwargs) -> str:
- templates_dir = os.path.join(ROOT_DIR, "templates", "emails")
- env = Environment(loader=FileSystemLoader(templates_dir))
- template = env.get_template(template_name)
- return template.render(MAX_NB_EMAIL_FREE_PLAN=MAX_NB_EMAIL_FREE_PLAN, **kwargs)
- def send_welcome_email(user):
- send_email(
- user.email,
- f"Welcome to SimpleLogin {user.name}",
- render("com/welcome.txt", name=user.name, user=user),
- render("com/welcome.html", name=user.name, user=user),
- )
- def send_trial_end_soon_email(user):
- send_email(
- user.email,
- f"Your trial will end soon {user.name}",
- render("transactional/trial-end.txt", name=user.name, user=user),
- render("transactional/trial-end.html", name=user.name, user=user),
- )
- def send_activation_email(email, name, activation_link):
- send_email(
- email,
- f"Just one more step to join SimpleLogin {name}",
- render(
- "transactional/activation.txt",
- name=name,
- activation_link=activation_link,
- email=email,
- ),
- render(
- "transactional/activation.html",
- name=name,
- activation_link=activation_link,
- email=email,
- ),
- )
- def send_reset_password_email(email, name, reset_password_link):
- send_email(
- email,
- f"Reset your password on SimpleLogin",
- render(
- "transactional/reset-password.txt",
- name=name,
- reset_password_link=reset_password_link,
- ),
- render(
- "transactional/reset-password.html",
- name=name,
- reset_password_link=reset_password_link,
- ),
- )
- def send_change_email(new_email, current_email, name, link):
- send_email(
- new_email,
- f"Confirm email update on SimpleLogin",
- render(
- "transactional/change-email.txt",
- name=name,
- link=link,
- new_email=new_email,
- current_email=current_email,
- ),
- render(
- "transactional/change-email.html",
- name=name,
- link=link,
- new_email=new_email,
- current_email=current_email,
- ),
- )
- def send_new_app_email(email, name):
- send_email(
- email,
- f"Any question/feedback for SimpleLogin {name}?",
- render("com/new-app.txt", name=name),
- render("com/new-app.html", name=name),
- )
- def send_test_email_alias(email, name):
- send_email(
- email,
- f"This email is sent to {email}",
- render("transactional/test-email.txt", name=name, alias=email),
- render("transactional/test-email.html", name=name, alias=email),
- )
- def send_cannot_create_directory_alias(user, alias, directory):
- """when user cancels their subscription, they cannot create alias on the fly.
- If this happens, send them an email to notify
- """
- send_email(
- user.email,
- f"Alias {alias} cannot be created",
- render(
- "transactional/cannot-create-alias-directory.txt",
- name=user.name,
- alias=alias,
- directory=directory,
- ),
- render(
- "transactional/cannot-create-alias-directory.html",
- name=user.name,
- alias=alias,
- directory=directory,
- ),
- )
- def send_cannot_create_domain_alias(user, alias, domain):
- """when user cancels their subscription, they cannot create alias on the fly with custom domain.
- If this happens, send them an email to notify
- """
- send_email(
- user.email,
- f"Alias {alias} cannot be created",
- render(
- "transactional/cannot-create-alias-domain.txt",
- name=user.name,
- alias=alias,
- domain=domain,
- ),
- render(
- "transactional/cannot-create-alias-domain.html",
- name=user.name,
- alias=alias,
- domain=domain,
- ),
- )
- def send_email(to_email, subject, plaintext, html=None):
- if NOT_SEND_EMAIL:
- LOG.d(
- "send email with subject %s to %s, plaintext: %s",
- subject,
- to_email,
- plaintext,
- )
- return
- LOG.d("send email to %s, subject %s", to_email, subject)
- if POSTFIX_SUBMISSION_TLS:
- smtp = SMTP(POSTFIX_SERVER, 587)
- smtp.starttls()
- else:
- smtp = SMTP(POSTFIX_SERVER, POSTFIX_PORT or 25)
- msg = MIMEMultipart("alternative")
- msg.attach(MIMEText(plaintext, "text"))
- if html:
- msg.attach(MIMEText(html, "html"))
- msg["Subject"] = subject
- msg["From"] = f"{SUPPORT_NAME} <{SUPPORT_EMAIL}>"
- msg["To"] = to_email
- msg_id_header = make_msgid()
- msg["Message-ID"] = msg_id_header
- date_header = formatdate()
- msg["Date"] = date_header
- # add DKIM
- email_domain = SUPPORT_EMAIL[SUPPORT_EMAIL.find("@") + 1 :]
- add_dkim_signature(msg, email_domain)
- msg_raw = msg.as_bytes()
- smtp.sendmail(SUPPORT_EMAIL, to_email, msg_raw)
- def send_email_with_rate_control(
- user: User,
- alert_type: str,
- to_email: str,
- subject,
- plaintext,
- html=None,
- bounced_email: Optional[Message] = None,
- max_alert_24h=MAX_ALERT_24H,
- ) -> bool:
- """Same as send_email with rate control over alert_type.
- For now no more than _MAX_ALERT_24h alert can be sent in the last 24h
- Return true if the email is sent, otherwise False
- """
- to_email = to_email.lower().strip()
- one_day_ago = arrow.now().shift(days=-1)
- nb_alert = (
- SentAlert.query.filter_by(alert_type=alert_type, to_email=to_email)
- .filter(SentAlert.created_at > one_day_ago)
- .count()
- )
- if nb_alert >= max_alert_24h:
- LOG.warning(
- "%s emails were sent to %s in the last 24h, alert type %s",
- nb_alert,
- to_email,
- alert_type,
- )
- return False
- SentAlert.create(user_id=user.id, alert_type=alert_type, to_email=to_email)
- db.session.commit()
- send_email(to_email, subject, plaintext, html)
- return True
- def get_email_local_part(address):
- """
- Get the local part from email
- ab@cd.com -> ab
- """
- return address[: address.find("@")]
- def get_email_domain_part(address):
- """
- Get the domain part from email
- ab@cd.com -> cd.com
- """
- return address[address.find("@") + 1 :].strip().lower()
- def add_dkim_signature(msg: Message, email_domain: str):
- delete_header(msg, "DKIM-Signature")
- # Specify headers in "byte" form
- # Generate message signature
- sig = dkim.sign(
- msg.as_bytes(),
- DKIM_SELECTOR,
- email_domain.encode(),
- DKIM_PRIVATE_KEY.encode(),
- include_headers=DKIM_HEADERS,
- )
- sig = sig.decode()
- # remove linebreaks from sig
- sig = sig.replace("\n", " ").replace("\r", "")
- msg["DKIM-Signature"] = sig[len("DKIM-Signature: ") :]
- def add_or_replace_header(msg: Message, header: str, value: str):
- """
- Remove all occurrences of `header` and add `header` with `value`.
- """
- delete_header(msg, header)
- msg[header] = value
- def delete_header(msg: Message, header: str):
- """a header can appear several times in message."""
- # inspired from https://stackoverflow.com/a/47903323/1428034
- for i in reversed(range(len(msg._headers))):
- header_name = msg._headers[i][0].lower()
- if header_name == header.lower():
- del msg._headers[i]
- def delete_all_headers_except(msg: Message, headers: [str]):
- headers = [h.lower() for h in headers]
- for i in reversed(range(len(msg._headers))):
- header_name = msg._headers[i][0].lower()
- if header_name not in headers:
- del msg._headers[i]
- def email_belongs_to_alias_domains(address: str) -> bool:
- """return True if an email ends with one of the alias domains provided by SimpleLogin"""
- for domain in ALIAS_DOMAINS:
- if address.endswith("@" + domain):
- return True
- return False
- def email_domain_can_be_used_as_mailbox(email: str) -> bool:
- """return True if an email can be used as a personal email. An email domain can be used if it is not
- - one of ALIAS_DOMAINS
- - one of custom domains
- - disposable domain
- """
- domain = get_email_domain_part(email)
- if not domain:
- return False
- if domain in ALIAS_DOMAINS:
- return False
- from app.models import CustomDomain
- if CustomDomain.get_by(domain=domain, verified=True):
- return False
- if is_disposable_domain(domain):
- LOG.d("Domain %s is disposable", domain)
- return False
- # check if email MX domain is disposable
- mx_domains = get_mx_domain_list(domain)
- # if no MX record, email is not valid
- if not mx_domains:
- return False
- for mx_domain in mx_domains:
- if is_disposable_domain(mx_domain):
- LOG.d("MX Domain %s %s is disposable", mx_domain, domain)
- return False
- return True
- def is_disposable_domain(domain):
- for d in DISPOSABLE_EMAIL_DOMAINS:
- if domain == d:
- return True
- # subdomain
- if domain.endswith("." + d):
- return True
- return False
- def get_mx_domain_list(domain) -> [str]:
- """return list of MX domains for a given email.
- domain name ends *without* a dot (".") at the end.
- """
- priority_domains = get_mx_domains(domain)
- return [d[:-1] for _, d in priority_domains]
- def personal_email_already_used(email: str) -> bool:
- """test if an email can be used as user email
- """
- if User.get_by(email=email):
- return True
- return False
- def mailbox_already_used(email: str, user) -> bool:
- if Mailbox.get_by(email=email, user_id=user.id):
- return True
- # support the case user wants to re-add their real email as mailbox
- # can happen when user changes their root email and wants to add this new email as mailbox
- if email == user.email:
- return False
- return False
- def get_orig_message_from_bounce(msg: Message) -> Message:
- """parse the original email from Bounce"""
- i = 0
- for part in msg.walk():
- i += 1
- # the original message is the 4th part
- # 1st part is the root part, multipart/report
- # 2nd is text/plain, Postfix log
- # ...
- # 7th is original message
- if i == 7:
- return part
- def get_orig_message_from_spamassassin_report(msg: Message) -> Message:
- """parse the original email from Spamassassin report"""
- i = 0
- for part in msg.walk():
- i += 1
- # the original message is the 4th part
- # 1st part is the root part, multipart/report
- # 2nd is text/plain, SpamAssassin part
- # 3rd is the original message in message/rfc822 content type
- # 4th is original message
- if i == 4:
- return part
- def get_addrs_from_header(msg: Message, header) -> [str]:
- """Get all addresses contained in `header`
- Used for To or CC header.
- """
- ret = []
- header_content = msg.get_all(header)
- if not header_content:
- return ret
- for addrs in header_content:
- # force convert header to string, sometimes addrs is Header object
- addrs = str(addrs)
- for addr in addrs.split(","):
- ret.append(addr.strip())
- # do not return empty string
- return [r for r in ret if r]
- def get_spam_info(msg: Message) -> (bool, str):
- """parse SpamAssassin header to detect whether a message is classified as spam.
- Return (is spam, spam status detail)
- The header format is
- ```X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID,
- DKIM_VALID_AU,RCVD_IN_DNSWL_BLOCKED,RCVD_IN_MSPIKE_H2,SPF_PASS,
- URIBL_BLOCKED autolearn=unavailable autolearn_force=no version=3.4.2```
- """
- spamassassin_status = msg["X-Spam-Status"]
- if not spamassassin_status:
- return False, ""
- # yes or no
- spamassassin_answer = spamassassin_status[: spamassassin_status.find(",")]
- return spamassassin_answer.lower() == "yes", spamassassin_status
- def parseaddr_unicode(addr) -> (str, str):
- """Like parseaddr but return name in unicode instead of in RFC 2047 format
- '=?UTF-8?B?TmjGoW4gTmd1eeG7hW4=?= <abcd@gmail.com>' -> ('Nhơn Nguyễn', "abcd@gmail.com")
- """
- name, email = parseaddr(addr)
- email = email.strip().lower()
- if name:
- name = name.strip()
- decoded_string, charset = decode_header(name)[0]
- if charset is not None:
- try:
- name = decoded_string.decode(charset)
- except UnicodeDecodeError:
- LOG.warning("Cannot decode addr name %s", name)
- name = ""
- else:
- name = decoded_string
- return name, email
|