alias_utils.py 6.7 KB


  1. import re
  2. from typing import Optional
  3. from sqlalchemy.exc import IntegrityError
  4. from app.email_utils import (
  5. get_email_domain_part,
  6. send_cannot_create_directory_alias,
  7. send_cannot_create_domain_alias,
  8. can_create_directory_for_address,
  9. )
  10. from app.errors import AliasInTrashError
  11. from app.extensions import db
  12. from app.log import LOG
  13. from app.models import (
  14. Alias,
  15. CustomDomain,
  16. Directory,
  17. User,
  18. DeletedAlias,
  19. DomainDeletedAlias,
  20. AliasMailbox,
  21. Mailbox,
  22. EmailLog,
  23. Contact,
  24. )
  25. def try_auto_create(address: str) -> Optional[Alias]:
  26. """Try to auto-create the alias using directory or catch-all domain"""
  27. alias = try_auto_create_catch_all_domain(address)
  28. if not alias:
  29. alias = try_auto_create_directory(address)
  30. return alias
  31. def try_auto_create_directory(address: str) -> Optional[Alias]:
  32. """
  33. Try to create an alias with directory
  34. """
  35. # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
  36. if can_create_directory_for_address(address):
  37. # if there's no directory separator in the alias, no way to auto-create it
  38. if "/" not in address and "+" not in address and "#" not in address:
  39. return None
  40. # alias contains one of the 3 special directory separator: "/", "+" or "#"
  41. if "/" in address:
  42. sep = "/"
  43. elif "+" in address:
  44. sep = "+"
  45. else:
  46. sep = "#"
  47. directory_name = address[: address.find(sep)]
  48. LOG.d("directory_name %s", directory_name)
  49. directory = Directory.get_by(name=directory_name)
  50. if not directory:
  51. return None
  52. dir_user: User = directory.user
  53. if not dir_user.can_create_new_alias():
  54. send_cannot_create_directory_alias(dir_user, address, directory_name)
  55. return None
  56. try:
  57. LOG.d("create alias %s for directory %s", address, directory)
  58. mailboxes = directory.mailboxes
  59. alias = Alias.create(
  60. email=address,
  61. user_id=directory.user_id,
  62. directory_id=directory.id,
  63. mailbox_id=mailboxes[0].id,
  64. )
  65. db.session.flush()
  66. for i in range(1, len(mailboxes)):
  67. AliasMailbox.create(
  68. alias_id=alias.id,
  69. mailbox_id=mailboxes[i].id,
  70. )
  71. db.session.commit()
  72. return alias
  73. except AliasInTrashError:
  74. LOG.warning(
  75. "Alias %s was deleted before, cannot auto-create using directory %s, user %s",
  76. address,
  77. directory_name,
  78. dir_user,
  79. )
  80. return None
  81. except IntegrityError:
  82. LOG.warning("Alias %s already exists", address)
  83. db.session.rollback()
  84. alias = Alias.get_by(email=address)
  85. return alias
  86. def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]:
  87. """Try to create an alias with catch-all domain"""
  88. # try to create alias on-the-fly with custom-domain catch-all feature
  89. # check if alias is custom-domain alias and if the custom-domain has catch-all enabled
  90. alias_domain = get_email_domain_part(address)
  91. custom_domain: CustomDomain = CustomDomain.get_by(domain=alias_domain)
  92. if not custom_domain:
  93. return None
  94. # custom_domain exists
  95. if not custom_domain.catch_all:
  96. return None
  97. # custom_domain has catch-all enabled
  98. domain_user: User = custom_domain.user
  99. if not domain_user.can_create_new_alias():
  100. send_cannot_create_domain_alias(domain_user, address, alias_domain)
  101. return None
  102. try:
  103. LOG.d("create alias %s for domain %s", address, custom_domain)
  104. mailboxes = custom_domain.mailboxes
  105. alias = Alias.create(
  106. email=address,
  107. user_id=custom_domain.user_id,
  108. custom_domain_id=custom_domain.id,
  109. automatic_creation=True,
  110. mailbox_id=mailboxes[0].id,
  111. )
  112. db.session.flush()
  113. for i in range(1, len(mailboxes)):
  114. AliasMailbox.create(
  115. alias_id=alias.id,
  116. mailbox_id=mailboxes[i].id,
  117. )
  118. db.session.commit()
  119. return alias
  120. except AliasInTrashError:
  121. LOG.warning(
  122. "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
  123. address,
  124. custom_domain,
  125. domain_user,
  126. )
  127. return None
  128. except IntegrityError:
  129. LOG.warning("Alias %s already exists", address)
  130. db.session.rollback()
  131. alias = Alias.get_by(email=address)
  132. return alias
  133. def delete_alias(alias: Alias, user: User):
  134. """
  135. Delete an alias and add it to either global or domain trash
  136. Should be used instead of Alias.delete, DomainDeletedAlias.create, DeletedAlias.create
  137. """
  138. # save deleted alias to either global or domain trash
  139. if alias.custom_domain_id:
  140. if not DomainDeletedAlias.get_by(
  141. email=alias.email, domain_id=alias.custom_domain_id
  142. ):
  143. LOG.debug("add %s to domain %s trash", alias, alias.custom_domain_id)
  144. db.session.add(
  145. DomainDeletedAlias(
  146. user_id=user.id, email=alias.email, domain_id=alias.custom_domain_id
  147. )
  148. )
  149. db.session.commit()
  150. else:
  151. if not DeletedAlias.get_by(email=alias.email):
  152. LOG.d("add %s to global trash", alias)
  153. db.session.add(DeletedAlias(email=alias.email))
  154. db.session.commit()
  155. Alias.query.filter(Alias.id == alias.id).delete()
  156. db.session.commit()
  157. def aliases_for_mailbox(mailbox: Mailbox) -> [Alias]:
  158. """
  159. get list of aliases for a given mailbox
  160. """
  161. ret = set(Alias.query.filter(Alias.mailbox_id == mailbox.id).all())
  162. for alias in (
  163. db.session.query(Alias)
  164. .join(AliasMailbox, Alias.id == AliasMailbox.alias_id)
  165. .filter(AliasMailbox.mailbox_id == mailbox.id)
  166. ):
  167. ret.add(alias)
  168. return list(ret)
  169. def nb_email_log_for_mailbox(mailbox: Mailbox):
  170. aliases = aliases_for_mailbox(mailbox)
  171. alias_ids = [alias.id for alias in aliases]
  172. return (
  173. db.session.query(EmailLog)
  174. .join(Contact, EmailLog.contact_id == Contact.id)
  175. .filter(Contact.alias_id.in_(alias_ids))
  176. .count()
  177. )
  178. # Only lowercase letters, numbers, dashes (-) and underscores (_) are currently supported
  179. _ALIAS_PREFIX_PATTERN = r"[0-9a-z-_]{1,}"
  180. def check_alias_prefix(alias_prefix) -> bool:
  181. if re.fullmatch(_ALIAS_PREFIX_PATTERN, alias_prefix) is None:
  182. return False
  183. return True