Add support for pushing non-campaign message with workers.

- Refactor campaign.Message into campaign.Message and
  campaign.CampaignMessage
- Remove ad-hoc goroutines (flawed approach) that were used to push
  admin and optin notifications.
- Provision for largscale pushing of ad-hoc, non-campaign messages
  such as transactional messages (in the future).
This commit is contained in:
Kailash Nadh 2020-03-08 12:27:41 +05:30
parent 5f6a4af6b4
commit d4aea0a436
7 changed files with 113 additions and 69 deletions

View file

@ -180,7 +180,7 @@ func handlePreviewCampaign(c echo.Context) error {
} }
// Render the message body. // Render the message body.
m := app.manager.NewMessage(camp, sub) m := app.manager.NewCampaignMessage(camp, sub)
if err := m.Render(); err != nil { if err := m.Render(); err != nil {
app.log.Printf("error rendering message: %v", err) app.log.Printf("error rendering message: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, return echo.NewHTTPError(http.StatusBadRequest,
@ -555,7 +555,7 @@ func sendTestMessage(sub models.Subscriber, camp *models.Campaign, app *App) err
} }
// Render the message body. // Render the message body.
m := app.manager.NewMessage(camp, sub) m := app.manager.NewCampaignMessage(camp, sub)
if err := m.Render(); err != nil { if err := m.Render(); err != nil {
app.log.Printf("error rendering message: %v", err) app.log.Printf("error rendering message: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, return echo.NewHTTPError(http.StatusBadRequest,

View file

@ -159,7 +159,7 @@ func initConstants() *constants {
// initCampaignManager initializes the campaign manager. // initCampaignManager initializes the campaign manager.
func initCampaignManager(app *App) *manager.Manager { func initCampaignManager(app *App) *manager.Manager {
campNotifCB := func(subject string, data interface{}) error { campNotifCB := func(subject string, data interface{}) error {
return sendNotification(app.constants.NotifyEmails, subject, notifTplCampaign, data, app) return app.sendNotification(app.constants.NotifyEmails, subject, notifTplCampaign, data)
} }
return manager.New(manager.Config{ return manager.New(manager.Config{
Concurrency: ko.Int("app.concurrency"), Concurrency: ko.Int("app.concurrency"),
@ -180,7 +180,7 @@ func initImporter(app *App) *subimporter.Importer {
app.queries.UpdateListsDate.Stmt, app.queries.UpdateListsDate.Stmt,
app.db.DB, app.db.DB,
func(subject string, data interface{}) error { func(subject string, data interface{}) error {
go sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data, app) app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data)
return nil return nil
}) })
} }

View file

@ -2,6 +2,7 @@ package manager
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"html/template" "html/template"
"log" "log"
@ -43,7 +44,8 @@ type Manager struct {
logger *log.Logger logger *log.Logger
// Campaigns that are currently running. // Campaigns that are currently running.
camps map[int]*models.Campaign camps map[int]*models.Campaign
campsMutex sync.RWMutex
// Links generated using Track() are cached here so as to not query // Links generated using Track() are cached here so as to not query
// the database for the link UUID for every message sent. This has to // the database for the link UUID for every message sent. This has to
@ -51,14 +53,16 @@ type Manager struct {
links map[string]string links map[string]string
linksMutex sync.RWMutex linksMutex sync.RWMutex
subFetchQueue chan *models.Campaign subFetchQueue chan *models.Campaign
msgQueue chan Message campMsgQueue chan CampaignMessage
msgErrorQueue chan msgError campMsgErrorQueue chan msgError
msgErrorCounts map[int]int campMsgErrorCounts map[int]int
msgQueue chan Message
} }
// Message represents an active subscriber that's being processed. // CampaignMessage represents an instance of campaign message to be pushed out,
type Message struct { // specific to a subscriber, via the campaign's messenger.
type CampaignMessage struct {
Campaign *models.Campaign Campaign *models.Campaign
Subscriber models.Subscriber Subscriber models.Subscriber
Body []byte Body []byte
@ -68,6 +72,15 @@ type Message struct {
unsubURL string unsubURL string
} }
// Message represents a generic message to be pushed to a messenger.
type Message struct {
From string
To []string
Subject string
Body []byte
Messenger string
}
// Config has parameters for configuring the manager. // Config has parameters for configuring the manager.
type Config struct { type Config struct {
Concurrency int Concurrency int
@ -88,24 +101,26 @@ type msgError struct {
// New returns a new instance of Mailer. // New returns a new instance of Mailer.
func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager { func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager {
return &Manager{ return &Manager{
cfg: cfg, cfg: cfg,
src: src, src: src,
notifCB: notifCB, notifCB: notifCB,
logger: l, logger: l,
messengers: make(map[string]messenger.Messenger), messengers: make(map[string]messenger.Messenger),
camps: make(map[int]*models.Campaign), camps: make(map[int]*models.Campaign),
links: make(map[string]string), links: make(map[string]string),
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency), subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
msgQueue: make(chan Message, cfg.Concurrency), campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
msgErrorQueue: make(chan msgError, cfg.MaxSendErrors), msgQueue: make(chan Message, cfg.Concurrency),
msgErrorCounts: make(map[int]int), campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
campMsgErrorCounts: make(map[int]int),
} }
} }
// NewMessage creates and returns a Message that is made available // NewCampaignMessage creates and returns a CampaignMessage that is made available
// to message templates while they're compiled. // to message templates while they're compiled. It represents a message from
func (m *Manager) NewMessage(c *models.Campaign, s models.Subscriber) Message { // a campaign that's bound to a single Subscriber.
return Message{ func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) CampaignMessage {
return CampaignMessage{
Campaign: c, Campaign: c,
Subscriber: s, Subscriber: s,
@ -125,6 +140,17 @@ func (m *Manager) AddMessenger(msg messenger.Messenger) error {
return nil return nil
} }
// PushMessage pushes a Message to be sent out by the workers.
func (m *Manager) PushMessage(msg Message) error {
select {
case m.msgQueue <- msg:
case <-time.After(time.Second * 3):
m.logger.Println("message push timed out: %'s'", msg.Subject)
return errors.New("message push timed out")
}
return nil
}
// GetMessengerNames returns the list of registered messengers. // GetMessengerNames returns the list of registered messengers.
func (m *Manager) GetMessengerNames() []string { func (m *Manager) GetMessengerNames() []string {
names := make([]string, 0, len(m.messengers)) names := make([]string, 0, len(m.messengers))
@ -177,21 +203,21 @@ func (m *Manager) Run(tick time.Duration) {
// Aggregate errors from sending messages to check against the error threshold // Aggregate errors from sending messages to check against the error threshold
// after which a campaign is paused. // after which a campaign is paused.
case e := <-m.msgErrorQueue: case e := <-m.campMsgErrorQueue:
if m.cfg.MaxSendErrors < 1 { if m.cfg.MaxSendErrors < 1 {
continue continue
} }
// If the error threshold is met, pause the campaign. // If the error threshold is met, pause the campaign.
m.msgErrorCounts[e.camp.ID]++ m.campMsgErrorCounts[e.camp.ID]++
if m.msgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors { if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
m.logger.Printf("error counted exceeded %d. pausing campaign %s", m.logger.Printf("error counted exceeded %d. pausing campaign %s",
m.cfg.MaxSendErrors, e.camp.Name) m.cfg.MaxSendErrors, e.camp.Name)
if m.isCampaignProcessing(e.camp.ID) { if m.isCampaignProcessing(e.camp.ID) {
m.exhaustCampaign(e.camp, models.CampaignStatusPaused) m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
} }
delete(m.msgErrorCounts, e.camp.ID) delete(m.campMsgErrorCounts, e.camp.ID)
// Notify admins. // Notify admins.
m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors") m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
@ -228,23 +254,31 @@ func (m *Manager) Run(tick time.Duration) {
func (m *Manager) SpawnWorkers() { func (m *Manager) SpawnWorkers() {
for i := 0; i < m.cfg.Concurrency; i++ { for i := 0; i < m.cfg.Concurrency; i++ {
go func() { go func() {
for msg := range m.msgQueue { for {
if !m.isCampaignProcessing(msg.Campaign.ID) { select {
continue // Campaign message.
} case msg := <-m.campMsgQueue:
if !m.isCampaignProcessing(msg.Campaign.ID) {
continue
}
err := m.messengers[msg.Campaign.MessengerID].Push( err := m.messengers[msg.Campaign.MessengerID].Push(
msg.from, msg.from, []string{msg.to}, msg.Campaign.Subject, msg.Body, nil)
[]string{msg.to}, if err != nil {
msg.Campaign.Subject, m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err)
msg.Body, nil)
if err != nil {
m.logger.Printf("error sending message in campaign %s: %v",
msg.Campaign.Name, err)
select { select {
case m.msgErrorQueue <- msgError{camp: msg.Campaign, err: err}: case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
default: default:
}
}
// Arbitrary message.
case msg := <-m.msgQueue:
err := m.messengers[msg.Messenger].Push(
msg.From, msg.To, msg.Subject, msg.Body, nil)
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
} }
} }
} }
@ -256,17 +290,17 @@ func (m *Manager) SpawnWorkers() {
// compiled campaign templates. // compiled campaign templates.
func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap { func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap {
return template.FuncMap{ return template.FuncMap{
"TrackLink": func(url string, msg *Message) string { "TrackLink": func(url string, msg *CampaignMessage) string {
return m.trackLink(url, msg.Campaign.UUID, msg.Subscriber.UUID) return m.trackLink(url, msg.Campaign.UUID, msg.Subscriber.UUID)
}, },
"TrackView": func(msg *Message) template.HTML { "TrackView": func(msg *CampaignMessage) template.HTML {
return template.HTML(fmt.Sprintf(`<img src="%s" alt="" />`, return template.HTML(fmt.Sprintf(`<img src="%s" alt="" />`,
fmt.Sprintf(m.cfg.ViewTrackURL, msg.Campaign.UUID, msg.Subscriber.UUID))) fmt.Sprintf(m.cfg.ViewTrackURL, msg.Campaign.UUID, msg.Subscriber.UUID)))
}, },
"UnsubscribeURL": func(msg *Message) string { "UnsubscribeURL": func(msg *CampaignMessage) string {
return msg.unsubURL return msg.unsubURL
}, },
"OptinURL": func(msg *Message) string { "OptinURL": func(msg *CampaignMessage) string {
// Add list IDs. // Add list IDs.
// TODO: Show private lists list on optin e-mail // TODO: Show private lists list on optin e-mail
return fmt.Sprintf(m.cfg.OptinURL, msg.Subscriber.UUID, "") return fmt.Sprintf(m.cfg.OptinURL, msg.Subscriber.UUID, "")
@ -294,17 +328,21 @@ func (m *Manager) addCampaign(c *models.Campaign) error {
} }
// Add the campaign to the active map. // Add the campaign to the active map.
m.campsMutex.Lock()
m.camps[c.ID] = c m.camps[c.ID] = c
m.campsMutex.Unlock()
return nil return nil
} }
// getPendingCampaignIDs returns the IDs of campaigns currently being processed. // getPendingCampaignIDs returns the IDs of campaigns currently being processed.
func (m *Manager) getPendingCampaignIDs() []int64 { func (m *Manager) getPendingCampaignIDs() []int64 {
// Needs to return an empty slice in case there are no campaigns. // Needs to return an empty slice in case there are no campaigns.
ids := make([]int64, 0) m.campsMutex.RLock()
ids := make([]int64, 0, len(m.camps))
for _, c := range m.camps { for _, c := range m.camps {
ids = append(ids, int64(c.ID)) ids = append(ids, int64(c.ID))
} }
m.campsMutex.RUnlock()
return ids return ids
} }
@ -326,7 +364,7 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
// Push messages. // Push messages.
for _, s := range subs { for _, s := range subs {
msg := m.NewMessage(c, s) msg := m.NewCampaignMessage(c, s)
if err := msg.Render(); err != nil { if err := msg.Render(); err != nil {
m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err) m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
continue continue
@ -334,7 +372,7 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
// Push the message to the queue while blocking and waiting until // Push the message to the queue while blocking and waiting until
// the queue is drained. // the queue is drained.
m.msgQueue <- msg m.campMsgQueue <- msg
} }
return true, nil return true, nil
@ -342,12 +380,16 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
// isCampaignProcessing checks if the campaign is bing processed. // isCampaignProcessing checks if the campaign is bing processed.
func (m *Manager) isCampaignProcessing(id int) bool { func (m *Manager) isCampaignProcessing(id int) bool {
m.campsMutex.RLock()
_, ok := m.camps[id] _, ok := m.camps[id]
m.campsMutex.RUnlock()
return ok return ok
} }
func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) { func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
m.campsMutex.Lock()
delete(m.camps, c.ID) delete(m.camps, c.ID)
m.campsMutex.Unlock()
// A status has been passed. Change the campaign's status // A status has been passed. Change the campaign's status
// without further checks. // without further checks.
@ -420,13 +462,12 @@ func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error {
"Reason": reason, "Reason": reason,
} }
) )
return m.notifCB(subject, data) return m.notifCB(subject, data)
} }
// Render takes a Message, executes its pre-compiled Campaign.Tpl // Render takes a Message, executes its pre-compiled Campaign.Tpl
// and applies the resultant bytes to Message.body to be used in messages. // and applies the resultant bytes to Message.body to be used in messages.
func (m *Message) Render() error { func (m *CampaignMessage) Render() error {
out := bytes.Buffer{} out := bytes.Buffer{}
if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil { if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
return err return err

View file

@ -138,7 +138,8 @@ func main() {
app.messenger = initMessengers(app.manager) app.messenger = initMessengers(app.manager)
app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.constants) app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.constants)
// Start the campaign workers. // Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5) go app.manager.Run(time.Second * 5)
app.manager.SpawnWorkers() app.manager.SpawnWorkers()

View file

@ -2,6 +2,8 @@ package main
import ( import (
"bytes" "bytes"
"github.com/knadh/listmonk/internal/manager"
) )
const ( const (
@ -19,18 +21,20 @@ type notifData struct {
} }
// sendNotification sends out an e-mail notification to admins. // sendNotification sends out an e-mail notification to admins.
func sendNotification(toEmails []string, subject, tplName string, data interface{}, app *App) error { func (app *App) sendNotification(toEmails []string, subject, tplName string, data interface{}) error {
var b bytes.Buffer var b bytes.Buffer
if err := app.notifTpls.ExecuteTemplate(&b, tplName, data); err != nil { if err := app.notifTpls.ExecuteTemplate(&b, tplName, data); err != nil {
app.log.Printf("error compiling notification template '%s': %v", tplName, err) app.log.Printf("error compiling notification template '%s': %v", tplName, err)
return err return err
} }
err := app.messenger.Push(app.constants.FromEmail, err := app.manager.PushMessage(manager.Message{
toEmails, From: app.constants.FromEmail,
subject, To: toEmails,
b.Bytes(), Subject: subject,
nil) Body: b.Bytes(),
Messenger: "email",
})
if err != nil { if err != nil {
app.log.Printf("error sending admin notification (%s): %v", subject, err) app.log.Printf("error sending admin notification (%s): %v", subject, err)
return err return err

View file

@ -12,8 +12,8 @@ import (
"github.com/asaskevich/govalidator" "github.com/asaskevich/govalidator"
"github.com/gofrs/uuid" "github.com/gofrs/uuid"
"github.com/knadh/listmonk/models"
"github.com/knadh/listmonk/internal/subimporter" "github.com/knadh/listmonk/internal/subimporter"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo" "github.com/labstack/echo"
"github.com/lib/pq" "github.com/lib/pq"
) )
@ -181,7 +181,7 @@ func handleCreateSubscriber(c echo.Context) error {
// If the lists are double-optins, send confirmation e-mails. // If the lists are double-optins, send confirmation e-mails.
// Todo: This arbitrary goroutine should be moved to a centralised pool. // Todo: This arbitrary goroutine should be moved to a centralised pool.
go sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) _ = sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app)
// Hand over to the GET handler to return the last insertion. // Hand over to the GET handler to return the last insertion.
c.SetParamNames("id") c.SetParamNames("id")
@ -536,7 +536,7 @@ func insertSubscriber(req subimporter.SubReq, app *App) (int, error) {
// If the lists are double-optins, send confirmation e-mails. // If the lists are double-optins, send confirmation e-mails.
// Todo: This arbitrary goroutine should be moved to a centralised pool. // Todo: This arbitrary goroutine should be moved to a centralised pool.
go sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app)
return req.ID, nil return req.ID, nil
} }
@ -613,13 +613,11 @@ func sendOptinConfirmation(sub models.Subscriber, listIDs []int64, app *App) err
out.OptinURL = fmt.Sprintf(app.constants.OptinURL, sub.UUID, qListIDs.Encode()) out.OptinURL = fmt.Sprintf(app.constants.OptinURL, sub.UUID, qListIDs.Encode())
// Send the e-mail. // Send the e-mail.
if err := sendNotification([]string{sub.Email}, if err := app.sendNotification([]string{sub.Email},
"Confirm subscription", "Confirm subscription", notifSubscriberOptin, out); err != nil {
notifSubscriberOptin, out, app); err != nil {
app.log.Printf("error e-mailing subscriber profile: %s", err) app.log.Printf("error e-mailing subscriber profile: %s", err)
return err return err
} }
return nil return nil
} }

View file

@ -114,7 +114,7 @@ func handlePreviewTemplate(c echo.Context) error {
} }
// Render the message body. // Render the message body.
m := app.manager.NewMessage(&camp, dummySubscriber) m := app.manager.NewCampaignMessage(&camp, dummySubscriber)
if err := m.Render(); err != nil { if err := m.Render(); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error rendering message: %v", err)) fmt.Sprintf("Error rendering message: %v", err))