123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- from flask import render_template, redirect, url_for, flash, request
- from flask_login import login_required, current_user
- from itsdangerous import TimestampSigner, SignatureExpired
- from app.config import (
- DISABLE_ALIAS_SUFFIX,
- ALIAS_DOMAINS,
- CUSTOM_ALIAS_SECRET,
- )
- from app.dashboard.base import dashboard_bp
- from app.email_utils import email_belongs_to_alias_domains
- from app.extensions import db
- from app.log import LOG
- from app.models import Alias, CustomDomain, DeletedAlias, Mailbox, User, AliasMailbox
- from app.utils import convert_to_id, random_word, word_exist
- signer = TimestampSigner(CUSTOM_ALIAS_SECRET)
- def available_suffixes(user: User) -> [bool, str, str]:
- """Return (is_custom_domain, alias-suffix, time-signed alias-suffix)"""
- user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
- # List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
- suffixes = []
- # put custom domain first
- for alias_domain in user_custom_domains:
- suffix = "@" + alias_domain
- suffixes.append((True, suffix, signer.sign(suffix).decode()))
- # then default domain
- for domain in ALIAS_DOMAINS:
- suffix = ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain
- suffixes.append((False, suffix, signer.sign(suffix).decode()))
- return suffixes
- @dashboard_bp.route("/custom_alias", methods=["GET", "POST"])
- @login_required
- def custom_alias():
- # check if user has not exceeded the alias quota
- if not current_user.can_create_new_alias():
- # notify admin
- LOG.error("user %s tries to create custom alias", current_user)
- flash(
- "You have reached free plan limit, please upgrade to create new aliases",
- "warning",
- )
- return redirect(url_for("dashboard.index"))
- user_custom_domains = [cd.domain for cd in current_user.verified_custom_domains()]
- # List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
- suffixes = available_suffixes(current_user)
- mailboxes = current_user.mailboxes()
- if request.method == "POST":
- alias_prefix = request.form.get("prefix")
- signed_suffix = request.form.get("suffix")
- mailbox_ids = request.form.getlist("mailboxes")
- alias_note = request.form.get("note")
- # check if mailbox is not tempered with
- mailboxes = []
- for mailbox_id in mailbox_ids:
- mailbox = Mailbox.get(mailbox_id)
- if (
- not mailbox
- or mailbox.user_id != current_user.id
- or not mailbox.verified
- ):
- flash("Something went wrong, please retry", "warning")
- return redirect(url_for("dashboard.custom_alias"))
- mailboxes.append(mailbox)
- if not mailboxes:
- flash("At least one mailbox must be selected", "error")
- return redirect(url_for("dashboard.custom_alias"))
- # hypothesis: user will click on the button in the 600 secs
- try:
- alias_suffix = signer.unsign(signed_suffix, max_age=600).decode()
- except SignatureExpired:
- LOG.error("Alias creation time expired for %s", current_user)
- flash("Alias creation time is expired, please retry", "warning")
- return redirect(url_for("dashboard.custom_alias"))
- except Exception:
- LOG.error("Alias suffix is tampered, user %s", current_user)
- flash("Unknown error, refresh the page", "error")
- return redirect(url_for("dashboard.custom_alias"))
- if verify_prefix_suffix(current_user, alias_prefix, alias_suffix):
- full_alias = alias_prefix + alias_suffix
- if Alias.get_by(email=full_alias) or DeletedAlias.get_by(email=full_alias):
- LOG.d("full alias already used %s", full_alias)
- flash(
- f"Alias {full_alias} already exists, please choose another one",
- "warning",
- )
- else:
- alias = Alias.create(
- user_id=current_user.id,
- email=full_alias,
- note=alias_note,
- mailbox_id=mailboxes[0].id,
- )
- db.session.flush()
- for i in range(1, len(mailboxes)):
- AliasMailbox.create(
- user_id=alias.user_id,
- alias_id=alias.id,
- mailbox_id=mailboxes[i].id,
- )
- # get the custom_domain_id if alias is created with a custom domain
- if alias_suffix.startswith("@"):
- alias_domain = alias_suffix[1:]
- domain = CustomDomain.get_by(domain=alias_domain)
- LOG.d("Set alias %s domain to %s", full_alias, domain)
- alias.custom_domain_id = domain.id
- db.session.commit()
- flash(f"Alias {full_alias} has been created", "success")
- return redirect(url_for("dashboard.index", highlight_alias_id=alias.id))
- # only happen if the request has been "hacked"
- else:
- flash("something went wrong", "warning")
- return render_template(
- "dashboard/custom_alias.html",
- user_custom_domains=user_custom_domains,
- suffixes=suffixes,
- mailboxes=mailboxes,
- )
- def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
- """verify if user could create an alias with the given prefix and suffix"""
- if not alias_prefix or not alias_suffix: # should be caught on frontend
- return False
- user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
- alias_prefix = alias_prefix.strip()
- alias_prefix = convert_to_id(alias_prefix)
- # make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com
- alias_suffix = alias_suffix.strip()
- if alias_suffix.startswith("@"):
- alias_domain = alias_suffix[1:]
- # alias_domain can be either custom_domain or if DISABLE_ALIAS_SUFFIX, one of the default ALIAS_DOMAINS
- if DISABLE_ALIAS_SUFFIX:
- if (
- alias_domain not in user_custom_domains
- and alias_domain not in ALIAS_DOMAINS
- ):
- LOG.error("wrong alias suffix %s, user %s", alias_suffix, user)
- return False
- else:
- if alias_domain not in user_custom_domains:
- LOG.error("wrong alias suffix %s, user %s", alias_suffix, user)
- return False
- else:
- if not alias_suffix.startswith("."):
- LOG.error("User %s submits a wrong alias suffix %s", user, alias_suffix)
- return False
- full_alias = alias_prefix + alias_suffix
- if not email_belongs_to_alias_domains(full_alias):
- LOG.error(
- "Alias suffix should end with one of the alias domains %s",
- user,
- alias_suffix,
- )
- return False
- random_word_part = alias_suffix[1 : alias_suffix.find("@")]
- if not word_exist(random_word_part):
- LOG.error(
- "alias suffix %s needs to start with a random word, user %s",
- alias_suffix,
- user,
- )
- return False
- return True
|