123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- """
- Run scheduled jobs.
- Not meant for running job at precise time (+- 1h)
- """
- import csv
- import time
- import arrow
- import requests
- from app import s3
- from app.config import (
- JOB_ONBOARDING_1,
- JOB_ONBOARDING_2,
- JOB_ONBOARDING_4,
- JOB_BATCH_IMPORT,
- )
- from app.email_utils import (
- send_email,
- render,
- get_email_domain_part,
- )
- from app.extensions import db
- from app.log import LOG
- from app.models import (
- User,
- Job,
- BatchImport,
- Alias,
- DeletedAlias,
- DomainDeletedAlias,
- CustomDomain,
- )
- from server import create_app
- # fix the database connection leak issue
- # use this method instead of create_app
- def new_app():
- app = create_app()
- @app.teardown_appcontext
- def shutdown_session(response_or_exc):
- # same as shutdown_session() in flask-sqlalchemy but this is not enough
- db.session.remove()
- # dispose the engine too
- db.engine.dispose()
- return app
- def onboarding_send_from_alias(user):
- to_email, unsubscribe_link, via_email = user.get_communication_email()
- if not to_email:
- return
- send_email(
- to_email,
- f"SimpleLogin Tip: Send emails from your alias",
- render("com/onboarding/send-from-alias.txt", user=user, to_email=to_email),
- render("com/onboarding/send-from-alias.html", user=user, to_email=to_email),
- unsubscribe_link,
- via_email,
- )
- def onboarding_pgp(user):
- to_email, unsubscribe_link, via_email = user.get_communication_email()
- if not to_email:
- return
- send_email(
- to_email,
- f"SimpleLogin Tip: Secure your emails with PGP",
- render("com/onboarding/pgp.txt", user=user, to_email=to_email),
- render("com/onboarding/pgp.html", user=user, to_email=to_email),
- unsubscribe_link,
- via_email,
- )
- def onboarding_browser_extension(user):
- to_email, unsubscribe_link, via_email = user.get_communication_email()
- if not to_email:
- return
- send_email(
- to_email,
- f"SimpleLogin Tip: Chrome/Firefox/Safari extensions and Android/iOS apps",
- render("com/onboarding/browser-extension.txt", user=user, to_email=to_email),
- render("com/onboarding/browser-extension.html", user=user, to_email=to_email),
- unsubscribe_link,
- via_email,
- )
- def onboarding_mailbox(user):
- to_email, unsubscribe_link, via_email = user.get_communication_email()
- if not to_email:
- return
- send_email(
- to_email,
- f"SimpleLogin Tip: Multiple mailboxes",
- render("com/onboarding/mailbox.txt", user=user, to_email=to_email),
- render("com/onboarding/mailbox.html", user=user, to_email=to_email),
- unsubscribe_link,
- via_email,
- )
- def handle_batch_import(batch_import: BatchImport):
- user = batch_import.user
- batch_import.processed = True
- db.session.commit()
- LOG.debug("Start batch import for %s %s", batch_import, user)
- file_url = s3.get_url(batch_import.file.path)
- LOG.d("Download file %s from %s", batch_import.file, file_url)
- r = requests.get(file_url)
- lines = [l.decode() for l in r.iter_lines()]
- reader = csv.DictReader(lines)
- for row in reader:
- try:
- full_alias = row["alias"].lower().strip().replace(" ", "")
- note = row["note"]
- except KeyError:
- LOG.warning("Cannot parse row %s", row)
- continue
- alias_domain = get_email_domain_part(full_alias)
- custom_domain = CustomDomain.get_by(domain=alias_domain)
- if (
- not custom_domain
- or not custom_domain.verified
- or custom_domain.user_id != user.id
- ):
- LOG.debug("domain %s can't be used %s", alias_domain, user)
- continue
- if (
- Alias.get_by(email=full_alias)
- or DeletedAlias.get_by(email=full_alias)
- or DomainDeletedAlias.get_by(email=full_alias)
- ):
- LOG.d("alias already used %s", full_alias)
- continue
- alias = Alias.create(
- user_id=user.id,
- email=full_alias,
- note=note,
- mailbox_id=user.default_mailbox_id,
- custom_domain_id=custom_domain.id,
- batch_import_id=batch_import.id,
- )
- db.session.commit()
- LOG.d("Create %s", alias)
- if __name__ == "__main__":
- while True:
- # run a job 1h earlier or later is not a big deal ...
- min_dt = arrow.now().shift(hours=-1)
- max_dt = arrow.now().shift(hours=1)
- app = new_app()
- with app.app_context():
- for job in Job.query.filter(
- Job.taken == False, Job.run_at > min_dt, Job.run_at <= max_dt
- ).all():
- LOG.d("Take job %s", job)
- # mark the job as taken, whether it will be executed successfully or not
- job.taken = True
- db.session.commit()
- if job.name == JOB_ONBOARDING_1:
- user_id = job.payload.get("user_id")
- user = User.get(user_id)
- # user might delete their account in the meantime
- # or disable the notification
- if user and user.notification and user.activated:
- LOG.d("send onboarding send-from-alias email to user %s", user)
- onboarding_send_from_alias(user)
- elif job.name == JOB_ONBOARDING_2:
- user_id = job.payload.get("user_id")
- user = User.get(user_id)
- # user might delete their account in the meantime
- # or disable the notification
- if user and user.notification and user.activated:
- LOG.d("send onboarding mailbox email to user %s", user)
- onboarding_mailbox(user)
- elif job.name == JOB_ONBOARDING_4:
- user_id = job.payload.get("user_id")
- user = User.get(user_id)
- # user might delete their account in the meantime
- # or disable the notification
- if user and user.notification and user.activated:
- LOG.d("send onboarding pgp email to user %s", user)
- onboarding_pgp(user)
- elif job.name == JOB_BATCH_IMPORT:
- batch_import_id = job.payload.get("batch_import_id")
- batch_import = BatchImport.get(batch_import_id)
- handle_batch_import(batch_import)
- else:
- LOG.exception("Unknown job name %s", job.name)
- time.sleep(10)
|