浏览代码

Add state management to job (#1113)

* Add state management to job

* Add migration

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
Adrià Casajús 3 年之前
父节点
当前提交
5fa41d6ccf
共有 3 个文件被更改,包括 171 次插入112 次删除
  1. 15 0
      app/models.py
  2. 123 112
      job_runner.py
  3. 33 0
      migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py

+ 15 - 0
app/models.py

@@ -262,6 +262,13 @@ class VerpType(EnumE):
     transactional = 2
 
 
+class JobState(EnumE):
+    ready = 0
+    taken = 1
+    done = 2
+    error = 3
+
+
 class Hibp(Base, ModelMixin):
     __tablename__ = "hibp"
     name = sa.Column(sa.String(), nullable=False, unique=True, index=True)
@@ -2370,6 +2377,14 @@ class Job(Base, ModelMixin):
     # whether the job has been taken by the job runner
     taken = sa.Column(sa.Boolean, default=False, nullable=False)
     run_at = sa.Column(ArrowType)
+    state = sa.Column(
+        sa.Integer,
+        nullable=False,
+        server_default=str(JobState.ready.value),
+        default=JobState.ready.value,
+    )
+    attempts = sa.Column(sa.Integer, nullable=False, server_default="0", default=0)
+    taken_at = sa.Column(ArrowType, nullable=True)
 
     def __repr__(self):
         return f"<Job {self.id} {self.name} {self.payload}>"

+ 123 - 112
job_runner.py

@@ -15,7 +15,7 @@ from app.email_utils import (
 from app.import_utils import handle_batch_import
 from app.jobs.export_user_data_job import ExportUserDataJob
 from app.log import LOG
-from app.models import User, Job, BatchImport, Mailbox, CustomDomain
+from app.models import User, Job, BatchImport, Mailbox, CustomDomain, JobState
 from server import create_light_app
 
 
@@ -106,6 +106,120 @@ def welcome_proton(user):
     )
 
 
+def process_job(job: Job):
+    if job.name == config.JOB_ONBOARDING_1:
+        user_id = job.payload.get("user_id")
+        user = User.get(user_id)
+
+        # user might delete their account in the meantime
+        # or disable the notification
+        if user and user.notification and user.activated:
+            LOG.d("send onboarding send-from-alias email to user %s", user)
+            onboarding_send_from_alias(user)
+    elif job.name == config.JOB_ONBOARDING_2:
+        user_id = job.payload.get("user_id")
+        user = User.get(user_id)
+
+        # user might delete their account in the meantime
+        # or disable the notification
+        if user and user.notification and user.activated:
+            LOG.d("send onboarding mailbox email to user %s", user)
+            onboarding_mailbox(user)
+    elif job.name == config.JOB_ONBOARDING_4:
+        user_id = job.payload.get("user_id")
+        user = User.get(user_id)
+
+        # user might delete their account in the meantime
+        # or disable the notification
+        if user and user.notification and user.activated:
+            LOG.d("send onboarding pgp email to user %s", user)
+            onboarding_pgp(user)
+
+    elif job.name == config.JOB_BATCH_IMPORT:
+        batch_import_id = job.payload.get("batch_import_id")
+        batch_import = BatchImport.get(batch_import_id)
+        handle_batch_import(batch_import)
+    elif job.name == config.JOB_DELETE_ACCOUNT:
+        user_id = job.payload.get("user_id")
+        user = User.get(user_id)
+
+        if not user:
+            LOG.i("No user found for %s", user_id)
+            return
+
+        user_email = user.email
+        LOG.w("Delete user %s", user)
+        User.delete(user.id)
+        Session.commit()
+
+        send_email(
+            user_email,
+            "Your SimpleLogin account has been deleted",
+            render("transactional/account-delete.txt"),
+            render("transactional/account-delete.html"),
+            retries=3,
+        )
+    elif job.name == config.JOB_DELETE_MAILBOX:
+        mailbox_id = job.payload.get("mailbox_id")
+        mailbox = Mailbox.get(mailbox_id)
+        if not mailbox:
+            return
+
+        mailbox_email = mailbox.email
+        user = mailbox.user
+
+        Mailbox.delete(mailbox_id)
+        Session.commit()
+        LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email)
+
+        send_email(
+            user.email,
+            f"Your mailbox {mailbox_email} has been deleted",
+            f"""Mailbox {mailbox_email} along with its aliases are deleted successfully.
+Regards,
+SimpleLogin team.
+""",
+            retries=3,
+        )
+
+    elif job.name == config.JOB_DELETE_DOMAIN:
+        custom_domain_id = job.payload.get("custom_domain_id")
+        custom_domain = CustomDomain.get(custom_domain_id)
+        if not custom_domain:
+            return
+
+        domain_name = custom_domain.domain
+        user = custom_domain.user
+
+        CustomDomain.delete(custom_domain.id)
+        Session.commit()
+
+        LOG.d("Domain %s deleted", domain_name)
+
+        send_email(
+            user.email,
+            f"Your domain {domain_name} has been deleted",
+            f"""Domain {domain_name} along with its aliases are deleted successfully.
+
+Regards,
+SimpleLogin team.
+""",
+            retries=3,
+        )
+    elif job.name == config.JOB_SEND_USER_REPORT:
+        export_job = ExportUserDataJob.create_from_job(job)
+        if export_job:
+            export_job.run()
+    elif job.name == config.JOB_SEND_PROTON_WELCOME_1:
+        user_id = job.payload.get("user_id")
+        user = User.get(user_id)
+        if user and user.activated:
+            LOG.d("send proton welcome email to user %s", user)
+            welcome_proton(user)
+    else:
+        LOG.e("Unknown job name %s", job.name)
+
+
 if __name__ == "__main__":
     while True:
         # wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
@@ -114,6 +228,8 @@ if __name__ == "__main__":
             min_dt = arrow.now().shift(hours=-1)
             max_dt = arrow.now().shift(hours=1)
 
+            # TODO: Change This condition after deploying this MR
+            # to Job.state == ready or (Job.state == taken and job.taken_at < arrow.now.shift(minutes=-10))
             for job in Job.filter(
                 Job.taken.is_(False), Job.run_at > min_dt, Job.run_at <= max_dt
             ).all():
@@ -121,118 +237,13 @@ if __name__ == "__main__":
 
                 # mark the job as taken, whether it will be executed successfully or not
                 job.taken = True
+                job.taken_at = arrow.now()
+                job.state = JobState.taken.value
+                job.attempts += 1
                 Session.commit()
+                process_job(job)
 
-                if job.name == config.JOB_ONBOARDING_1:
-                    user_id = job.payload.get("user_id")
-                    user = User.get(user_id)
-
-                    # user might delete their account in the meantime
-                    # or disable the notification
-                    if user and user.notification and user.activated:
-                        LOG.d("send onboarding send-from-alias email to user %s", user)
-                        onboarding_send_from_alias(user)
-                elif job.name == config.JOB_ONBOARDING_2:
-                    user_id = job.payload.get("user_id")
-                    user = User.get(user_id)
-
-                    # user might delete their account in the meantime
-                    # or disable the notification
-                    if user and user.notification and user.activated:
-                        LOG.d("send onboarding mailbox email to user %s", user)
-                        onboarding_mailbox(user)
-                elif job.name == config.JOB_ONBOARDING_4:
-                    user_id = job.payload.get("user_id")
-                    user = User.get(user_id)
-
-                    # user might delete their account in the meantime
-                    # or disable the notification
-                    if user and user.notification and user.activated:
-                        LOG.d("send onboarding pgp email to user %s", user)
-                        onboarding_pgp(user)
-
-                elif job.name == config.JOB_BATCH_IMPORT:
-                    batch_import_id = job.payload.get("batch_import_id")
-                    batch_import = BatchImport.get(batch_import_id)
-                    handle_batch_import(batch_import)
-                elif job.name == config.JOB_DELETE_ACCOUNT:
-                    user_id = job.payload.get("user_id")
-                    user = User.get(user_id)
-
-                    if not user:
-                        LOG.i("No user found for %s", user_id)
-                        continue
-
-                    user_email = user.email
-                    LOG.w("Delete user %s", user)
-                    User.delete(user.id)
-                    Session.commit()
-
-                    send_email(
-                        user_email,
-                        "Your SimpleLogin account has been deleted",
-                        render("transactional/account-delete.txt"),
-                        render("transactional/account-delete.html"),
-                        retries=3,
-                    )
-                elif job.name == config.JOB_DELETE_MAILBOX:
-                    mailbox_id = job.payload.get("mailbox_id")
-                    mailbox = Mailbox.get(mailbox_id)
-                    if not mailbox:
-                        continue
-
-                    mailbox_email = mailbox.email
-                    user = mailbox.user
-
-                    Mailbox.delete(mailbox_id)
-                    Session.commit()
-                    LOG.d("Mailbox %s %s deleted", mailbox_id, mailbox_email)
-
-                    send_email(
-                        user.email,
-                        f"Your mailbox {mailbox_email} has been deleted",
-                        f"""Mailbox {mailbox_email} along with its aliases are deleted successfully.
-Regards,
-SimpleLogin team.
-""",
-                        retries=3,
-                    )
-
-                elif job.name == config.JOB_DELETE_DOMAIN:
-                    custom_domain_id = job.payload.get("custom_domain_id")
-                    custom_domain = CustomDomain.get(custom_domain_id)
-                    if not custom_domain:
-                        continue
-
-                    domain_name = custom_domain.domain
-                    user = custom_domain.user
-
-                    CustomDomain.delete(custom_domain.id)
-                    Session.commit()
-
-                    LOG.d("Domain %s deleted", domain_name)
-
-                    send_email(
-                        user.email,
-                        f"Your domain {domain_name} has been deleted",
-                        f"""Domain {domain_name} along with its aliases are deleted successfully.
-
-Regards,
-SimpleLogin team.
-""",
-                        retries=3,
-                    )
-                elif job.name == config.JOB_SEND_USER_REPORT:
-                    export_job = ExportUserDataJob.create_from_job(job)
-                    if export_job:
-                        export_job.run()
-                elif job.name == config.JOB_SEND_PROTON_WELCOME_1:
-                    user_id = job.payload.get("user_id")
-                    user = User.get(user_id)
-                    if user and user.activated:
-                        LOG.d("send proton welcome email to user %s", user)
-                        welcome_proton(user)
-                else:
-                    LOG.e("Unknown job name %s", job.name)
+                job.state = JobState.done.value
+                Session.commit()
 
             time.sleep(10)

+ 33 - 0
migrations/versions/2022_062714_bfebc2d5c719_add_state_to_job.py

@@ -0,0 +1,33 @@
+"""Add state to job
+
+Revision ID: bfebc2d5c719
+Revises: d1fb679f7eec
+Create Date: 2022-06-27 14:56:58.797121
+
+"""
+import sqlalchemy_utils
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = 'bfebc2d5c719'
+down_revision = 'd1fb679f7eec'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.add_column('job', sa.Column('attempts', sa.Integer(), server_default='0', nullable=False))
+    op.add_column('job', sa.Column('state', sa.Integer(), server_default='0', nullable=False))
+    op.add_column('job', sa.Column('taken_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True))
+    # ### end Alembic commands ###
+
+
+def downgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.drop_column('job', 'taken_at')
+    op.drop_column('job', 'state')
+    op.drop_column('job', 'attempts')
+    # ### end Alembic commands ###