فهرست منبع

Move alias auto-creation to alias_utils

Son NK 5 سال پیش
والد
کامیت
c1f5c07d86
2فایلهای تغییر یافته به همراه127 افزوده شده و 115 حذف شده
  1. 126 0
      app/alias_utils.py
  2. 1 115
      email_handler.py

+ 126 - 0
app/alias_utils.py

@@ -0,0 +1,126 @@
+from typing import Optional
+
+from app.email_utils import (
+    get_email_domain_part,
+    send_cannot_create_directory_alias,
+    send_cannot_create_domain_alias,
+    email_belongs_to_alias_domains,
+)
+from app.extensions import db
+from app.log import LOG
+from app.models import (
+    Alias,
+    CustomDomain,
+    Directory,
+    User,
+    DeletedAlias,
+)
+
+
+def try_auto_create(address: str) -> Optional[Alias]:
+    """Try to auto-create the alias using directory or catch-all domain
+    """
+    alias = try_auto_create_catch_all_domain(address)
+    if not alias:
+        alias = try_auto_create_directory(address)
+
+    return alias
+
+
+def try_auto_create_directory(address: str) -> Optional[Alias]:
+    """
+    Try to create an alias with directory
+    """
+    # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
+    if email_belongs_to_alias_domains(address):
+        # if there's no directory separator in the alias, no way to auto-create it
+        if "/" not in address and "+" not in address and "#" not in address:
+            return None
+
+        # alias contains one of the 3 special directory separator: "/", "+" or "#"
+        if "/" in address:
+            sep = "/"
+        elif "+" in address:
+            sep = "+"
+        else:
+            sep = "#"
+
+        directory_name = address[: address.find(sep)]
+        LOG.d("directory_name %s", directory_name)
+
+        directory = Directory.get_by(name=directory_name)
+        if not directory:
+            return None
+
+        dir_user: User = directory.user
+
+        if not dir_user.can_create_new_alias():
+            send_cannot_create_directory_alias(dir_user, address, directory_name)
+            return None
+
+        # if alias has been deleted before, do not auto-create it
+        if DeletedAlias.get_by(email=address, user_id=directory.user_id):
+            LOG.warning(
+                "Alias %s was deleted before, cannot auto-create using directory %s, user %s",
+                address,
+                directory_name,
+                dir_user,
+            )
+            return None
+
+        LOG.d("create alias %s for directory %s", address, directory)
+
+        alias = Alias.create(
+            email=address,
+            user_id=directory.user_id,
+            directory_id=directory.id,
+            mailbox_id=dir_user.default_mailbox_id,
+        )
+        db.session.commit()
+        return alias
+
+
+def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]:
+    """Try to create an alias with catch-all domain"""
+
+    # try to create alias on-the-fly with custom-domain catch-all feature
+    # check if alias is custom-domain alias and if the custom-domain has catch-all enabled
+    alias_domain = get_email_domain_part(address)
+    custom_domain = CustomDomain.get_by(domain=alias_domain)
+
+    if not custom_domain:
+        return None
+
+    # custom_domain exists
+    if not custom_domain.catch_all:
+        return None
+
+    # custom_domain has catch-all enabled
+    domain_user: User = custom_domain.user
+
+    if not domain_user.can_create_new_alias():
+        send_cannot_create_domain_alias(domain_user, address, alias_domain)
+        return None
+
+    # if alias has been deleted before, do not auto-create it
+    if DeletedAlias.get_by(email=address, user_id=custom_domain.user_id):
+        LOG.warning(
+            "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
+            address,
+            custom_domain,
+            domain_user,
+        )
+        return None
+
+    LOG.d("create alias %s for domain %s", address, custom_domain)
+
+    alias = Alias.create(
+        email=address,
+        user_id=custom_domain.user_id,
+        custom_domain_id=custom_domain.id,
+        automatic_creation=True,
+        mailbox_id=domain_user.default_mailbox_id,
+    )
+
+    db.session.commit()
+    return alias

+ 1 - 115
email_handler.py

@@ -40,12 +40,12 @@ from email.mime.multipart import MIMEMultipart
 from email.utils import parseaddr
 from io import BytesIO
 from smtplib import SMTP
-from typing import Optional
 
 from aiosmtpd.controller import Controller
 from aiosmtpd.smtp import Envelope
 
 from app import pgp_utils, s3
+from app.alias_utils import try_auto_create
 from app.config import (
     EMAIL_DOMAIN,
     POSTFIX_SERVER,
@@ -57,11 +57,8 @@ from app.config import (
 from app.email_utils import (
     send_email,
     add_dkim_signature,
-    get_email_domain_part,
     add_or_replace_header,
     delete_header,
-    send_cannot_create_directory_alias,
-    send_cannot_create_domain_alias,
     email_belongs_to_alias_domains,
     render,
     get_orig_message_from_bounce,
@@ -78,9 +75,7 @@ from app.models import (
     Contact,
     EmailLog,
     CustomDomain,
-    Directory,
     User,
-    DeletedAlias,
     RefusedEmail,
 )
 from app.utils import random_string
@@ -106,115 +101,6 @@ def new_app():
     return app
 
 
-def try_auto_create(address: str) -> Optional[Alias]:
-    """Try to auto-create the alias using directory or catch-all domain
-    """
-    alias = try_auto_create_catch_all_domain(address)
-    if not alias:
-        alias = try_auto_create_directory(address)
-
-    return alias
-
-
-def try_auto_create_directory(address: str) -> Optional[Alias]:
-    """
-    Try to create an alias with directory
-    """
-    # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
-    if email_belongs_to_alias_domains(address):
-        # if there's no directory separator in the alias, no way to auto-create it
-        if "/" not in address and "+" not in address and "#" not in address:
-            return None
-
-        # alias contains one of the 3 special directory separator: "/", "+" or "#"
-        if "/" in address:
-            sep = "/"
-        elif "+" in address:
-            sep = "+"
-        else:
-            sep = "#"
-
-        directory_name = address[: address.find(sep)]
-        LOG.d("directory_name %s", directory_name)
-
-        directory = Directory.get_by(name=directory_name)
-        if not directory:
-            return None
-
-        dir_user: User = directory.user
-
-        if not dir_user.can_create_new_alias():
-            send_cannot_create_directory_alias(dir_user, address, directory_name)
-            return None
-
-        # if alias has been deleted before, do not auto-create it
-        if DeletedAlias.get_by(email=address, user_id=directory.user_id):
-            LOG.warning(
-                "Alias %s was deleted before, cannot auto-create using directory %s, user %s",
-                address,
-                directory_name,
-                dir_user,
-            )
-            return None
-
-        LOG.d("create alias %s for directory %s", address, directory)
-
-        alias = Alias.create(
-            email=address,
-            user_id=directory.user_id,
-            directory_id=directory.id,
-            mailbox_id=dir_user.default_mailbox_id,
-        )
-        db.session.commit()
-        return alias
-
-
-def try_auto_create_catch_all_domain(address: str) -> Optional[Alias]:
-    """Try to create an alias with catch-all domain"""
-
-    # try to create alias on-the-fly with custom-domain catch-all feature
-    # check if alias is custom-domain alias and if the custom-domain has catch-all enabled
-    alias_domain = get_email_domain_part(address)
-    custom_domain = CustomDomain.get_by(domain=alias_domain)
-
-    if not custom_domain:
-        return None
-
-    # custom_domain exists
-    if not custom_domain.catch_all:
-        return None
-
-    # custom_domain has catch-all enabled
-    domain_user: User = custom_domain.user
-
-    if not domain_user.can_create_new_alias():
-        send_cannot_create_domain_alias(domain_user, address, alias_domain)
-        return None
-
-    # if alias has been deleted before, do not auto-create it
-    if DeletedAlias.get_by(email=address, user_id=custom_domain.user_id):
-        LOG.warning(
-            "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
-            address,
-            custom_domain,
-            domain_user,
-        )
-        return None
-
-    LOG.d("create alias %s for domain %s", address, custom_domain)
-
-    alias = Alias.create(
-        email=address,
-        user_id=custom_domain.user_id,
-        custom_domain_id=custom_domain.id,
-        automatic_creation=True,
-        mailbox_id=domain_user.default_mailbox_id,
-    )
-
-    db.session.commit()
-    return alias
-
-
 def get_or_create_contact(website_from_header: str, alias: Alias) -> Contact:
     """
     website_from_header can be the full-form email, i.e. "First Last <email@example.com>"