bounce.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package bounce
  2. import (
  3. "errors"
  4. "log"
  5. "time"
  6. "github.com/jmoiron/sqlx"
  7. "github.com/knadh/listmonk/internal/bounce/mailbox"
  8. "github.com/knadh/listmonk/internal/bounce/webhooks"
  9. "github.com/knadh/listmonk/models"
  10. "github.com/lib/pq"
  11. )
  12. const (
  13. // subID is the identifying subscriber ID header to look for in
  14. // bounced e-mails.
  15. subID = "X-Listmonk-Subscriber"
  16. campID = "X-Listmonk-Campaign"
  17. )
  18. // Mailbox represents a POP/IMAP mailbox client that can scan messages and pass
  19. // them to a given channel.
  20. type Mailbox interface {
  21. Scan(limit int, ch chan models.Bounce) error
  22. }
  23. // Opt represents bounce processing options.
  24. type Opt struct {
  25. BounceCount int `json:"count"`
  26. BounceAction string `json:"action"`
  27. MailboxEnabled bool `json:"mailbox_enabled"`
  28. MailboxType string `json:"mailbox_type"`
  29. Mailbox mailbox.Opt `json:"mailbox"`
  30. WebhooksEnabled bool `json:"webhooks_enabled"`
  31. SESEnabled bool `json:"ses_enabled"`
  32. SendgridEnabled bool `json:"sendgrid_enabled"`
  33. SendgridKey string `json:"sendgrid_key"`
  34. }
  35. // Manager handles e-mail bounces.
  36. type Manager struct {
  37. queue chan models.Bounce
  38. mailbox Mailbox
  39. SES *webhooks.SES
  40. Sendgrid *webhooks.Sendgrid
  41. queries *Queries
  42. opt Opt
  43. log *log.Logger
  44. }
  45. // Queries contains the queries.
  46. type Queries struct {
  47. DB *sqlx.DB
  48. RecordQuery *sqlx.Stmt
  49. }
  50. // New returns a new instance of the bounce manager.
  51. func New(opt Opt, q *Queries, lo *log.Logger) (*Manager, error) {
  52. m := &Manager{
  53. opt: opt,
  54. queries: q,
  55. queue: make(chan models.Bounce, 1000),
  56. log: lo,
  57. }
  58. // Is there a mailbox?
  59. if opt.MailboxEnabled {
  60. switch opt.MailboxType {
  61. case "pop":
  62. m.mailbox = mailbox.NewPOP(opt.Mailbox)
  63. default:
  64. return nil, errors.New("unknown bounce mailbox type")
  65. }
  66. }
  67. if opt.WebhooksEnabled {
  68. if opt.SESEnabled {
  69. m.SES = webhooks.NewSES()
  70. }
  71. if opt.SendgridEnabled {
  72. sg, err := webhooks.NewSendgrid(opt.SendgridKey)
  73. if err != nil {
  74. lo.Printf("error initializing sendgrid webhooks: %v", err)
  75. } else {
  76. m.Sendgrid = sg
  77. }
  78. }
  79. }
  80. return m, nil
  81. }
  82. // Run is a blocking function that listens for bounce events from webhooks and or mailboxes
  83. // and executes them on the DB.
  84. func (m *Manager) Run() {
  85. if m.opt.MailboxEnabled {
  86. go m.runMailboxScanner()
  87. }
  88. for {
  89. select {
  90. case b, ok := <-m.queue:
  91. if !ok {
  92. return
  93. }
  94. date := b.CreatedAt
  95. if date.IsZero() {
  96. date = time.Now()
  97. }
  98. _, err := m.queries.RecordQuery.Exec(b.SubscriberUUID,
  99. b.Email,
  100. b.CampaignUUID,
  101. b.Type,
  102. b.Source,
  103. b.Meta,
  104. date,
  105. m.opt.BounceCount,
  106. m.opt.BounceAction)
  107. if err != nil {
  108. // Ignore the error if it complained of no subscriber.
  109. if pqErr, ok := err.(*pq.Error); ok && pqErr.Column == "subscriber_id" {
  110. m.log.Printf("bounced subscriber (%s / %s) not found", b.SubscriberUUID, b.Email)
  111. continue
  112. }
  113. m.log.Printf("error recording bounce: %v", err)
  114. }
  115. }
  116. }
  117. }
  118. // runMailboxScanner runs a blocking loop that scans the mailbox at given intervals.
  119. func (m *Manager) runMailboxScanner() {
  120. for {
  121. if err := m.mailbox.Scan(1000, m.queue); err != nil {
  122. m.log.Printf("error scanning bounce mailbox: %v", err)
  123. }
  124. time.Sleep(m.opt.Mailbox.ScanInterval)
  125. }
  126. }
  127. // Record records a new bounce event given the subscriber's email or UUID.
  128. func (m *Manager) Record(b models.Bounce) error {
  129. select {
  130. case m.queue <- b:
  131. }
  132. return nil
  133. }