greylisting.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import arrow
  2. from app.alias_utils import try_auto_create
  3. from app.config import (
  4. MAX_ACTIVITY_DURING_MINUTE_PER_ALIAS,
  5. MAX_ACTIVITY_DURING_MINUTE_PER_MAILBOX,
  6. )
  7. from app.extensions import db
  8. from app.log import LOG
  9. from app.models import Alias, EmailLog, Contact
  10. def greylisting_needed_for_alias(alias: Alias) -> bool:
  11. min_time = arrow.now().shift(minutes=-1)
  12. # get the nb of activity on this alias
  13. nb_activity = (
  14. db.session.query(EmailLog)
  15. .join(Contact, EmailLog.contact_id == Contact.id)
  16. .filter(
  17. Contact.alias_id == alias.id,
  18. EmailLog.created_at > min_time,
  19. )
  20. .group_by(EmailLog.id)
  21. .count()
  22. )
  23. if nb_activity > MAX_ACTIVITY_DURING_MINUTE_PER_ALIAS:
  24. LOG.d(
  25. "Too much forward on alias %s. Nb Activity %s",
  26. alias,
  27. nb_activity,
  28. )
  29. return True
  30. return False
  31. def greylisting_needed_for_mailbox(alias: Alias) -> bool:
  32. min_time = arrow.now().shift(minutes=-1)
  33. # get nb of activity on this mailbox
  34. nb_activity = (
  35. db.session.query(EmailLog)
  36. .join(Contact, EmailLog.contact_id == Contact.id)
  37. .join(Alias, Contact.alias_id == Alias.id)
  38. .filter(
  39. Alias.mailbox_id == alias.mailbox_id,
  40. EmailLog.created_at > min_time,
  41. )
  42. .group_by(EmailLog.id)
  43. .count()
  44. )
  45. if nb_activity > MAX_ACTIVITY_DURING_MINUTE_PER_MAILBOX:
  46. LOG.d(
  47. "Too much forward on mailbox %s, alias %s. Nb Activity %s",
  48. alias.mailbox,
  49. alias,
  50. nb_activity,
  51. )
  52. return True
  53. return False
  54. def greylisting_needed_forward_phase(alias_address: str) -> bool:
  55. alias = Alias.get_by(email=alias_address)
  56. if alias:
  57. return greylisting_needed_for_alias(alias) or greylisting_needed_for_mailbox(
  58. alias
  59. )
  60. else:
  61. LOG.d(
  62. "alias %s not exist. Try to see if it can be created on the fly",
  63. alias_address,
  64. )
  65. alias = try_auto_create(alias_address)
  66. if alias:
  67. return greylisting_needed_for_mailbox(alias)
  68. return False
  69. def greylisting_needed_reply_phase(reply_email: str) -> bool:
  70. contact = Contact.get_by(reply_email=reply_email)
  71. if not contact:
  72. return False
  73. alias = contact.alias
  74. return greylisting_needed_for_alias(alias) or greylisting_needed_for_mailbox(alias)
  75. def greylisting_needed(mail_from: str, rcpt_tos: [str]) -> bool:
  76. for rcpt_to in rcpt_tos:
  77. if rcpt_to.startswith("reply+") or rcpt_to.startswith("ra+"):
  78. if greylisting_needed_reply_phase(rcpt_to):
  79. return True
  80. else:
  81. # Forward phase
  82. address = rcpt_to # alias@SL
  83. if greylisting_needed_forward_phase(address):
  84. return True
  85. return False