job_runner.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. """
  2. Run scheduled jobs.
  3. Not meant for running job at precise time (+- 1h)
  4. """
  5. import csv
  6. import time
  7. import arrow
  8. import requests
  9. from app import s3
  10. from app.config import (
  11. JOB_ONBOARDING_1,
  12. JOB_ONBOARDING_2,
  13. JOB_ONBOARDING_4,
  14. JOB_BATCH_IMPORT,
  15. )
  16. from app.email_utils import (
  17. send_email,
  18. render,
  19. get_email_domain_part,
  20. )
  21. from app.extensions import db
  22. from app.log import LOG
  23. from app.models import (
  24. User,
  25. Job,
  26. BatchImport,
  27. Alias,
  28. DeletedAlias,
  29. DomainDeletedAlias,
  30. CustomDomain,
  31. )
  32. from server import create_app
  33. # fix the database connection leak issue
  34. # use this method instead of create_app
  35. def new_app():
  36. app = create_app()
  37. @app.teardown_appcontext
  38. def shutdown_session(response_or_exc):
  39. # same as shutdown_session() in flask-sqlalchemy but this is not enough
  40. db.session.remove()
  41. # dispose the engine too
  42. db.engine.dispose()
  43. return app
  44. def onboarding_send_from_alias(user):
  45. to_email, unsubscribe_link, via_email = user.get_communication_email()
  46. if not to_email:
  47. return
  48. send_email(
  49. to_email,
  50. f"SimpleLogin Tip: Send emails from your alias",
  51. render("com/onboarding/send-from-alias.txt", user=user, to_email=to_email),
  52. render("com/onboarding/send-from-alias.html", user=user, to_email=to_email),
  53. unsubscribe_link,
  54. via_email,
  55. )
  56. def onboarding_pgp(user):
  57. to_email, unsubscribe_link, via_email = user.get_communication_email()
  58. if not to_email:
  59. return
  60. send_email(
  61. to_email,
  62. f"SimpleLogin Tip: Secure your emails with PGP",
  63. render("com/onboarding/pgp.txt", user=user, to_email=to_email),
  64. render("com/onboarding/pgp.html", user=user, to_email=to_email),
  65. unsubscribe_link,
  66. via_email,
  67. )
  68. def onboarding_browser_extension(user):
  69. to_email, unsubscribe_link, via_email = user.get_communication_email()
  70. if not to_email:
  71. return
  72. send_email(
  73. to_email,
  74. f"SimpleLogin Tip: Chrome/Firefox/Safari extensions and Android/iOS apps",
  75. render("com/onboarding/browser-extension.txt", user=user, to_email=to_email),
  76. render("com/onboarding/browser-extension.html", user=user, to_email=to_email),
  77. unsubscribe_link,
  78. via_email,
  79. )
  80. def onboarding_mailbox(user):
  81. to_email, unsubscribe_link, via_email = user.get_communication_email()
  82. if not to_email:
  83. return
  84. send_email(
  85. to_email,
  86. f"SimpleLogin Tip: Multiple mailboxes",
  87. render("com/onboarding/mailbox.txt", user=user, to_email=to_email),
  88. render("com/onboarding/mailbox.html", user=user, to_email=to_email),
  89. unsubscribe_link,
  90. via_email,
  91. )
  92. def handle_batch_import(batch_import: BatchImport):
  93. user = batch_import.user
  94. batch_import.processed = True
  95. db.session.commit()
  96. LOG.debug("Start batch import for %s %s", batch_import, user)
  97. file_url = s3.get_url(batch_import.file.path)
  98. LOG.d("Download file %s from %s", batch_import.file, file_url)
  99. r = requests.get(file_url)
  100. lines = [l.decode() for l in r.iter_lines()]
  101. reader = csv.DictReader(lines)
  102. for row in reader:
  103. try:
  104. full_alias = row["alias"].lower().strip().replace(" ", "")
  105. note = row["note"]
  106. except KeyError:
  107. LOG.warning("Cannot parse row %s", row)
  108. continue
  109. alias_domain = get_email_domain_part(full_alias)
  110. custom_domain = CustomDomain.get_by(domain=alias_domain)
  111. if (
  112. not custom_domain
  113. or not custom_domain.verified
  114. or custom_domain.user_id != user.id
  115. ):
  116. LOG.debug("domain %s can't be used %s", alias_domain, user)
  117. continue
  118. if (
  119. Alias.get_by(email=full_alias)
  120. or DeletedAlias.get_by(email=full_alias)
  121. or DomainDeletedAlias.get_by(email=full_alias)
  122. ):
  123. LOG.d("alias already used %s", full_alias)
  124. continue
  125. alias = Alias.create(
  126. user_id=user.id,
  127. email=full_alias,
  128. note=note,
  129. mailbox_id=user.default_mailbox_id,
  130. custom_domain_id=custom_domain.id,
  131. batch_import_id=batch_import.id,
  132. )
  133. db.session.commit()
  134. LOG.d("Create %s", alias)
  135. if __name__ == "__main__":
  136. while True:
  137. # run a job 1h earlier or later is not a big deal ...
  138. min_dt = arrow.now().shift(hours=-1)
  139. max_dt = arrow.now().shift(hours=1)
  140. app = new_app()
  141. with app.app_context():
  142. for job in Job.query.filter(
  143. Job.taken == False, Job.run_at > min_dt, Job.run_at <= max_dt
  144. ).all():
  145. LOG.d("Take job %s", job)
  146. # mark the job as taken, whether it will be executed successfully or not
  147. job.taken = True
  148. db.session.commit()
  149. if job.name == JOB_ONBOARDING_1:
  150. user_id = job.payload.get("user_id")
  151. user = User.get(user_id)
  152. # user might delete their account in the meantime
  153. # or disable the notification
  154. if user and user.notification and user.activated:
  155. LOG.d("send onboarding send-from-alias email to user %s", user)
  156. onboarding_send_from_alias(user)
  157. elif job.name == JOB_ONBOARDING_2:
  158. user_id = job.payload.get("user_id")
  159. user = User.get(user_id)
  160. # user might delete their account in the meantime
  161. # or disable the notification
  162. if user and user.notification and user.activated:
  163. LOG.d("send onboarding mailbox email to user %s", user)
  164. onboarding_mailbox(user)
  165. elif job.name == JOB_ONBOARDING_4:
  166. user_id = job.payload.get("user_id")
  167. user = User.get(user_id)
  168. # user might delete their account in the meantime
  169. # or disable the notification
  170. if user and user.notification and user.activated:
  171. LOG.d("send onboarding pgp email to user %s", user)
  172. onboarding_pgp(user)
  173. elif job.name == JOB_BATCH_IMPORT:
  174. batch_import_id = job.payload.get("batch_import_id")
  175. batch_import = BatchImport.get(batch_import_id)
  176. handle_batch_import(batch_import)
  177. else:
  178. LOG.exception("Unknown job name %s", job.name)
  179. time.sleep(10)