custom_alias.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from flask import render_template, redirect, url_for, flash, request
  2. from flask_login import login_required, current_user
  3. from itsdangerous import TimestampSigner, SignatureExpired
  4. from app.config import (
  5. DISABLE_ALIAS_SUFFIX,
  6. ALIAS_DOMAINS,
  7. CUSTOM_ALIAS_SECRET,
  8. )
  9. from app.dashboard.base import dashboard_bp
  10. from app.email_utils import email_belongs_to_alias_domains
  11. from app.extensions import db
  12. from app.log import LOG
  13. from app.models import Alias, CustomDomain, DeletedAlias, Mailbox, User, AliasMailbox
  14. from app.utils import convert_to_id, random_word, word_exist
  15. signer = TimestampSigner(CUSTOM_ALIAS_SECRET)
  16. def available_suffixes(user: User) -> [bool, str, str]:
  17. """Return (is_custom_domain, alias-suffix, time-signed alias-suffix)"""
  18. user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
  19. # List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
  20. suffixes = []
  21. # put custom domain first
  22. for alias_domain in user_custom_domains:
  23. suffix = "@" + alias_domain
  24. suffixes.append((True, suffix, signer.sign(suffix).decode()))
  25. # then default domain
  26. for domain in ALIAS_DOMAINS:
  27. suffix = ("" if DISABLE_ALIAS_SUFFIX else "." + random_word()) + "@" + domain
  28. suffixes.append((False, suffix, signer.sign(suffix).decode()))
  29. return suffixes
  30. @dashboard_bp.route("/custom_alias", methods=["GET", "POST"])
  31. @login_required
  32. def custom_alias():
  33. # check if user has not exceeded the alias quota
  34. if not current_user.can_create_new_alias():
  35. # notify admin
  36. LOG.error("user %s tries to create custom alias", current_user)
  37. flash(
  38. "You have reached free plan limit, please upgrade to create new aliases",
  39. "warning",
  40. )
  41. return redirect(url_for("dashboard.index"))
  42. user_custom_domains = [cd.domain for cd in current_user.verified_custom_domains()]
  43. # List of (is_custom_domain, alias-suffix, time-signed alias-suffix)
  44. suffixes = available_suffixes(current_user)
  45. mailboxes = current_user.mailboxes()
  46. if request.method == "POST":
  47. alias_prefix = request.form.get("prefix")
  48. signed_suffix = request.form.get("suffix")
  49. mailbox_ids = request.form.getlist("mailboxes")
  50. alias_note = request.form.get("note")
  51. # check if mailbox is not tempered with
  52. mailboxes = []
  53. for mailbox_id in mailbox_ids:
  54. mailbox = Mailbox.get(mailbox_id)
  55. if (
  56. not mailbox
  57. or mailbox.user_id != current_user.id
  58. or not mailbox.verified
  59. ):
  60. flash("Something went wrong, please retry", "warning")
  61. return redirect(url_for("dashboard.custom_alias"))
  62. mailboxes.append(mailbox)
  63. if not mailboxes:
  64. flash("At least one mailbox must be selected", "error")
  65. return redirect(url_for("dashboard.custom_alias"))
  66. # hypothesis: user will click on the button in the 600 secs
  67. try:
  68. alias_suffix = signer.unsign(signed_suffix, max_age=600).decode()
  69. except SignatureExpired:
  70. LOG.error("Alias creation time expired for %s", current_user)
  71. flash("Alias creation time is expired, please retry", "warning")
  72. return redirect(url_for("dashboard.custom_alias"))
  73. except Exception:
  74. LOG.error("Alias suffix is tampered, user %s", current_user)
  75. flash("Unknown error, refresh the page", "error")
  76. return redirect(url_for("dashboard.custom_alias"))
  77. if verify_prefix_suffix(current_user, alias_prefix, alias_suffix):
  78. full_alias = alias_prefix + alias_suffix
  79. if Alias.get_by(email=full_alias) or DeletedAlias.get_by(email=full_alias):
  80. LOG.d("full alias already used %s", full_alias)
  81. flash(
  82. f"Alias {full_alias} already exists, please choose another one",
  83. "warning",
  84. )
  85. else:
  86. alias = Alias.create(
  87. user_id=current_user.id,
  88. email=full_alias,
  89. note=alias_note,
  90. mailbox_id=mailboxes[0].id,
  91. )
  92. db.session.flush()
  93. for i in range(1, len(mailboxes)):
  94. AliasMailbox.create(
  95. user_id=alias.user_id,
  96. alias_id=alias.id,
  97. mailbox_id=mailboxes[i].id,
  98. )
  99. # get the custom_domain_id if alias is created with a custom domain
  100. if alias_suffix.startswith("@"):
  101. alias_domain = alias_suffix[1:]
  102. domain = CustomDomain.get_by(domain=alias_domain)
  103. LOG.d("Set alias %s domain to %s", full_alias, domain)
  104. alias.custom_domain_id = domain.id
  105. db.session.commit()
  106. flash(f"Alias {full_alias} has been created", "success")
  107. return redirect(url_for("dashboard.index", highlight_alias_id=alias.id))
  108. # only happen if the request has been "hacked"
  109. else:
  110. flash("something went wrong", "warning")
  111. return render_template(
  112. "dashboard/custom_alias.html",
  113. user_custom_domains=user_custom_domains,
  114. suffixes=suffixes,
  115. mailboxes=mailboxes,
  116. )
  117. def verify_prefix_suffix(user, alias_prefix, alias_suffix) -> bool:
  118. """verify if user could create an alias with the given prefix and suffix"""
  119. if not alias_prefix or not alias_suffix: # should be caught on frontend
  120. return False
  121. user_custom_domains = [cd.domain for cd in user.verified_custom_domains()]
  122. alias_prefix = alias_prefix.strip()
  123. alias_prefix = convert_to_id(alias_prefix)
  124. # make sure alias_suffix is either .random_word@simplelogin.co or @my-domain.com
  125. alias_suffix = alias_suffix.strip()
  126. if alias_suffix.startswith("@"):
  127. alias_domain = alias_suffix[1:]
  128. # alias_domain can be either custom_domain or if DISABLE_ALIAS_SUFFIX, one of the default ALIAS_DOMAINS
  129. if DISABLE_ALIAS_SUFFIX:
  130. if (
  131. alias_domain not in user_custom_domains
  132. and alias_domain not in ALIAS_DOMAINS
  133. ):
  134. LOG.error("wrong alias suffix %s, user %s", alias_suffix, user)
  135. return False
  136. else:
  137. if alias_domain not in user_custom_domains:
  138. LOG.error("wrong alias suffix %s, user %s", alias_suffix, user)
  139. return False
  140. else:
  141. if not alias_suffix.startswith("."):
  142. LOG.error("User %s submits a wrong alias suffix %s", user, alias_suffix)
  143. return False
  144. full_alias = alias_prefix + alias_suffix
  145. if not email_belongs_to_alias_domains(full_alias):
  146. LOG.error(
  147. "Alias suffix should end with one of the alias domains %s",
  148. user,
  149. alias_suffix,
  150. )
  151. return False
  152. random_word_part = alias_suffix[1 : alias_suffix.find("@")]
  153. if not word_exist(random_word_part):
  154. LOG.error(
  155. "alias suffix %s needs to start with a random word, user %s",
  156. alias_suffix,
  157. user,
  158. )
  159. return False
  160. return True