Browse Source

refactor email-handler: extract try_auto_create()

Son NK 5 years ago
parent
commit
fc09f911a4
2 changed files with 90 additions and 104 deletions
  1. 1 1
      app/email_utils.py
  2. 89 103
      email_handler.py

+ 1 - 1
app/email_utils.py

@@ -280,7 +280,7 @@ def delete_header(msg: Message, header: str):
 
 
 def email_belongs_to_alias_domains(email: str) -> bool:
-    """return True if an emails ends with one of the alias domains provided by SimpleLogin"""
+    """return True if an email ends with one of the alias domains provided by SimpleLogin"""
     for domain in ALIAS_DOMAINS:
         if email.endswith("@" + domain):
             return True

+ 89 - 103
email_handler.py

@@ -35,6 +35,7 @@ from email.message import Message
 from email.parser import Parser
 from email.policy import SMTPUTF8
 from smtplib import SMTP
+from typing import Optional
 
 from aiosmtpd.controller import Controller
 
@@ -90,6 +91,91 @@ def new_app():
     return app
 
 
+def try_auto_create(alias: str) -> Optional[GenEmail]:
+    """Try to auto-create the alias using directory or catch-all domain
+    """
+    # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
+    if email_belongs_to_alias_domains(alias):
+        # if there's no directory separator in the alias, no way to auto-create it
+        if "/" not in alias and "+" not in alias and "#" not in alias:
+            return None
+
+        # alias contains one of the 3 special directory separator: "/", "+" or "#"
+        if "/" in alias:
+            sep = "/"
+        elif "+" in alias:
+            sep = "+"
+        else:
+            sep = "#"
+
+        directory_name = alias[: alias.find(sep)]
+        LOG.d("directory_name %s", directory_name)
+
+        directory = Directory.get_by(name=directory_name)
+        if not directory:
+            return None
+
+        dir_user: User = directory.user
+
+        if not dir_user.can_create_new_alias():
+            send_cannot_create_directory_alias(dir_user, alias, directory_name)
+            return None
+
+        # if alias has been deleted before, do not auto-create it
+        if DeletedAlias.get_by(email=alias, user_id=directory.user_id):
+            LOG.error(
+                "Alias %s was deleted before, cannot auto-create using directory %s, user %s",
+                alias,
+                directory_name,
+                dir_user,
+            )
+            return None
+
+        LOG.d("create alias %s for directory %s", alias, directory)
+
+        gen_email = GenEmail.create(
+            email=alias, user_id=directory.user_id, directory_id=directory.id,
+        )
+        db.session.commit()
+        return gen_email
+
+    # try to create alias on-the-fly with custom-domain catch-all feature
+    # check if alias is custom-domain alias and if the custom-domain has catch-all enabled
+    alias_domain = get_email_domain_part(alias)
+    custom_domain = CustomDomain.get_by(domain=alias_domain)
+
+    if not custom_domain or custom_domain.catch_all:
+        return None
+
+    # custom_domain has catch-all enabled
+    domain_user: User = custom_domain.user
+
+    if not domain_user.can_create_new_alias():
+        send_cannot_create_domain_alias(domain_user, alias, alias_domain)
+        return None
+
+    # if alias has been deleted before, do not auto-create it
+    if DeletedAlias.get_by(email=alias, user_id=custom_domain.user_id):
+        LOG.error(
+            "Alias %s was deleted before, cannot auto-create using domain catch-all %s, user %s",
+            alias,
+            custom_domain,
+            domain_user,
+        )
+        return None
+
+    LOG.d("create alias %s for domain %s", alias, custom_domain)
+
+    gen_email = GenEmail.create(
+        email=alias,
+        user_id=custom_domain.user_id,
+        custom_domain_id=custom_domain.id,
+        automatic_creation=True,
+    )
+    db.session.commit()
+    return gen_email
+
+
 class MailHandler:
     async def handle_DATA(self, server, session, envelope):
         LOG.debug(">>> New message <<<")
@@ -98,17 +184,12 @@ class MailHandler:
         LOG.debug("Rcpt to %s", envelope.rcpt_tos)
         message_data = envelope.content.decode("utf8", errors="replace")
 
-        # Only when debug
-        # LOG.debug("Message data:\n")
-        # LOG.debug(message_data)
-
-        # host IP, setup via Docker network
         smtp = SMTP(POSTFIX_SERVER, 25)
         msg = Parser(policy=SMTPUTF8).parsestr(message_data)
 
         for rcpt_to in envelope.rcpt_tos:
             # Reply case
-            # reply+ or ra+ (reverse-alias) prefix
+            # recipient starts with "reply+" or "ra+" (ra=reverse-alias) prefix
             if rcpt_to.startswith("reply+") or rcpt_to.startswith("ra+"):
                 LOG.debug("Reply phase")
                 app = new_app()
@@ -131,103 +212,8 @@ class MailHandler:
             LOG.d(
                 "alias %s not exist. Try to see if it can be created on the fly", alias
             )
-
-            # try to see if alias could be created on-the-fly
-            on_the_fly = False
-
-            # check if alias belongs to a directory, ie having directory/anything@EMAIL_DOMAIN format
-            if email_belongs_to_alias_domains(alias):
-                if "/" in alias or "+" in alias or "#" in alias:
-                    if "/" in alias:
-                        sep = "/"
-                    elif "+" in alias:
-                        sep = "+"
-                    else:
-                        sep = "#"
-
-                    directory_name = alias[: alias.find(sep)]
-                    LOG.d("directory_name %s", directory_name)
-
-                    directory = Directory.get_by(name=directory_name)
-
-                    # Only premium user can use the directory feature
-                    if directory:
-                        dir_user: User = directory.user
-                        if dir_user.can_create_new_alias():
-                            # if alias has been deleted before, do not auto-create it
-                            if DeletedAlias.get_by(
-                                email=alias, user_id=directory.user_id
-                            ):
-                                LOG.error(
-                                    "Alias %s has been deleted before, cannot auto-create using directory %s, user %s",
-                                    alias,
-                                    directory_name,
-                                    dir_user,
-                                )
-                            else:
-
-                                LOG.d(
-                                    "create alias %s for directory %s", alias, directory
-                                )
-                                on_the_fly = True
-
-                                gen_email = GenEmail.create(
-                                    email=alias,
-                                    user_id=directory.user_id,
-                                    directory_id=directory.id,
-                                )
-                                db.session.commit()
-                        else:
-                            LOG.error(
-                                "User %s is not premium anymore and cannot create alias with directory",
-                                dir_user,
-                            )
-                            send_cannot_create_directory_alias(
-                                dir_user, alias, directory_name
-                            )
-
-            # try to create alias on-the-fly with custom-domain catch-all feature
-            # check if alias is custom-domain alias and if the custom-domain has catch-all enabled
-            if not on_the_fly:
-                alias_domain = get_email_domain_part(alias)
-                custom_domain = CustomDomain.get_by(domain=alias_domain)
-
-                # Only premium user can continue using the catch-all feature
-                if custom_domain and custom_domain.catch_all:
-                    domain_user: User = custom_domain.user
-                    if domain_user.can_create_new_alias():
-                        # if alias has been deleted before, do not auto-create it
-                        if DeletedAlias.get_by(
-                            email=alias, user_id=custom_domain.user_id
-                        ):
-                            LOG.error(
-                                "Alias %s has been deleted before, cannot auto-create using domain catch-all %s, user %s",
-                                alias,
-                                custom_domain,
-                                domain_user,
-                            )
-                        else:
-                            LOG.d("create alias %s for domain %s", alias, custom_domain)
-                            on_the_fly = True
-
-                            gen_email = GenEmail.create(
-                                email=alias,
-                                user_id=custom_domain.user_id,
-                                custom_domain_id=custom_domain.id,
-                                automatic_creation=True,
-                            )
-                            db.session.commit()
-                    else:
-                        LOG.error(
-                            "User %s is not premium anymore and cannot create alias with domain %s",
-                            domain_user,
-                            alias_domain,
-                        )
-                        send_cannot_create_domain_alias(
-                            domain_user, alias, alias_domain
-                        )
-
-            if not on_the_fly:
+            gen_email = try_auto_create(alias)
+            if not gen_email:
                 LOG.d("alias %s cannot be created on-the-fly, return 510", alias)
                 return "510 Email not exist"