|
@@ -21,27 +21,37 @@ const (
|
|
// BaseTPL is the name of the base template.
|
|
// BaseTPL is the name of the base template.
|
|
BaseTPL = "base"
|
|
BaseTPL = "base"
|
|
|
|
|
|
|
|
+ BounceTypeBlocklist = "blocklist"
|
|
|
|
+ BounceTypeDelete = "delete"
|
|
|
|
+
|
|
// ContentTpl is the name of the compiled message.
|
|
// ContentTpl is the name of the compiled message.
|
|
ContentTpl = "content"
|
|
ContentTpl = "content"
|
|
|
|
|
|
dummyUUID = "00000000-0000-0000-0000-000000000000"
|
|
dummyUUID = "00000000-0000-0000-0000-000000000000"
|
|
)
|
|
)
|
|
|
|
|
|
-// DataSource represents a data backend, such as a database,
|
|
|
|
|
|
+// Store represents a data backend, such as a database,
|
|
// that provides subscriber and campaign records.
|
|
// that provides subscriber and campaign records.
|
|
-type DataSource interface {
|
|
|
|
|
|
+type Store interface {
|
|
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
|
|
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
|
|
NextSubscribers(campID, limit int) ([]models.Subscriber, error)
|
|
NextSubscribers(campID, limit int) ([]models.Subscriber, error)
|
|
GetCampaign(campID int) (*models.Campaign, error)
|
|
GetCampaign(campID int) (*models.Campaign, error)
|
|
UpdateCampaignStatus(campID int, status string) error
|
|
UpdateCampaignStatus(campID int, status string) error
|
|
CreateLink(url string) (string, error)
|
|
CreateLink(url string) (string, error)
|
|
|
|
+
|
|
|
|
+ // RecordBounce records an external bounce event identified by
|
|
|
|
+ // a user's UUID/e-mail and a campaign UUID.
|
|
|
|
+ RecordBounce(b models.Bounce) (int64, int, error)
|
|
|
|
+
|
|
|
|
+ BlocklistSubscriber(id int64) error
|
|
|
|
+ DeleteSubscriber(id int64) error
|
|
}
|
|
}
|
|
|
|
|
|
// Manager handles the scheduling, processing, and queuing of campaigns
|
|
// Manager handles the scheduling, processing, and queuing of campaigns
|
|
// and message pushes.
|
|
// and message pushes.
|
|
type Manager struct {
|
|
type Manager struct {
|
|
cfg Config
|
|
cfg Config
|
|
- src DataSource
|
|
|
|
|
|
+ store Store
|
|
i18n *i18n.I18n
|
|
i18n *i18n.I18n
|
|
messengers map[string]messenger.Messenger
|
|
messengers map[string]messenger.Messenger
|
|
notifCB models.AdminNotifCallback
|
|
notifCB models.AdminNotifCallback
|
|
@@ -62,6 +72,7 @@ type Manager struct {
|
|
campMsgErrorQueue chan msgError
|
|
campMsgErrorQueue chan msgError
|
|
campMsgErrorCounts map[int]int
|
|
campMsgErrorCounts map[int]int
|
|
msgQueue chan Message
|
|
msgQueue chan Message
|
|
|
|
+ bounceQueue chan models.Bounce
|
|
|
|
|
|
// Sliding window keeps track of the total number of messages sent in a period
|
|
// Sliding window keeps track of the total number of messages sent in a period
|
|
// and on reaching the specified limit, waits until the window is over before
|
|
// and on reaching the specified limit, waits until the window is over before
|
|
@@ -113,6 +124,8 @@ type Config struct {
|
|
MessageURL string
|
|
MessageURL string
|
|
ViewTrackURL string
|
|
ViewTrackURL string
|
|
UnsubHeader bool
|
|
UnsubHeader bool
|
|
|
|
+ BounceCount int
|
|
|
|
+ BounceAction string
|
|
}
|
|
}
|
|
|
|
|
|
type msgError struct {
|
|
type msgError struct {
|
|
@@ -120,8 +133,10 @@ type msgError struct {
|
|
err error
|
|
err error
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+var pushTimeout = time.Second * 3
|
|
|
|
+
|
|
// New returns a new instance of Mailer.
|
|
// New returns a new instance of Mailer.
|
|
-func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, i *i18n.I18n, l *log.Logger) *Manager {
|
|
|
|
|
|
+func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18n, l *log.Logger) *Manager {
|
|
if cfg.BatchSize < 1 {
|
|
if cfg.BatchSize < 1 {
|
|
cfg.BatchSize = 1000
|
|
cfg.BatchSize = 1000
|
|
}
|
|
}
|
|
@@ -134,7 +149,7 @@ func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, i *i18n.
|
|
|
|
|
|
return &Manager{
|
|
return &Manager{
|
|
cfg: cfg,
|
|
cfg: cfg,
|
|
- src: src,
|
|
|
|
|
|
+ store: store,
|
|
i18n: i,
|
|
i18n: i,
|
|
notifCB: notifCB,
|
|
notifCB: notifCB,
|
|
logger: l,
|
|
logger: l,
|
|
@@ -144,6 +159,7 @@ func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, i *i18n.
|
|
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
|
|
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
|
|
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
|
|
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
|
|
msgQueue: make(chan Message, cfg.Concurrency),
|
|
msgQueue: make(chan Message, cfg.Concurrency),
|
|
|
|
+ bounceQueue: make(chan models.Bounce, cfg.Concurrency),
|
|
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
|
|
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
|
|
campMsgErrorCounts: make(map[int]int),
|
|
campMsgErrorCounts: make(map[int]int),
|
|
slidingWindowStart: time.Now(),
|
|
slidingWindowStart: time.Now(),
|
|
@@ -184,7 +200,7 @@ func (m *Manager) AddMessenger(msg messenger.Messenger) error {
|
|
// PushMessage pushes an arbitrary non-campaign Message to be sent out by the workers.
|
|
// PushMessage pushes an arbitrary non-campaign Message to be sent out by the workers.
|
|
// It times out if the queue is busy.
|
|
// It times out if the queue is busy.
|
|
func (m *Manager) PushMessage(msg Message) error {
|
|
func (m *Manager) PushMessage(msg Message) error {
|
|
- t := time.NewTicker(time.Second * 3)
|
|
|
|
|
|
+ t := time.NewTicker(pushTimeout)
|
|
defer t.Stop()
|
|
defer t.Stop()
|
|
|
|
|
|
select {
|
|
select {
|
|
@@ -199,7 +215,7 @@ func (m *Manager) PushMessage(msg Message) error {
|
|
// PushCampaignMessage pushes a campaign messages to be sent out by the workers.
|
|
// PushCampaignMessage pushes a campaign messages to be sent out by the workers.
|
|
// It times out if the queue is busy.
|
|
// It times out if the queue is busy.
|
|
func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
|
|
func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
|
|
- t := time.NewTicker(time.Second * 3)
|
|
|
|
|
|
+ t := time.NewTicker(pushTimeout)
|
|
defer t.Stop()
|
|
defer t.Stop()
|
|
|
|
|
|
select {
|
|
select {
|
|
@@ -224,6 +240,20 @@ func (m *Manager) HasRunningCampaigns() bool {
|
|
return len(m.camps) > 0
|
|
return len(m.camps) > 0
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// PushBounce records a bounce event.
|
|
|
|
+func (m *Manager) PushBounce(b models.Bounce) error {
|
|
|
|
+ t := time.NewTicker(pushTimeout)
|
|
|
|
+ defer t.Stop()
|
|
|
|
+
|
|
|
|
+ select {
|
|
|
|
+ case m.bounceQueue <- b:
|
|
|
|
+ case <-t.C:
|
|
|
|
+ m.logger.Printf("bounce pushed timed out: %s / %s", b.SubscriberUUID, b.Email)
|
|
|
|
+ return errors.New("bounce push timed out")
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
// Run is a blocking function (that should be invoked as a goroutine)
|
|
// Run is a blocking function (that should be invoked as a goroutine)
|
|
// that scans the data source at regular intervals for pending campaigns,
|
|
// that scans the data source at regular intervals for pending campaigns,
|
|
// and queues them for processing. The process queue fetches batches of
|
|
// and queues them for processing. The process queue fetches batches of
|
|
@@ -235,7 +265,7 @@ func (m *Manager) Run(tick time.Duration) {
|
|
|
|
|
|
// Spawn N message workers.
|
|
// Spawn N message workers.
|
|
for i := 0; i < m.cfg.Concurrency; i++ {
|
|
for i := 0; i < m.cfg.Concurrency; i++ {
|
|
- go m.messageWorker()
|
|
|
|
|
|
+ go m.worker()
|
|
}
|
|
}
|
|
|
|
|
|
// Fetch the next set of subscribers for a campaign and process them.
|
|
// Fetch the next set of subscribers for a campaign and process them.
|
|
@@ -262,9 +292,9 @@ func (m *Manager) Run(tick time.Duration) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-// messageWorker is a blocking function that listens to the message queue
|
|
|
|
-// and pushes out incoming messages on it to the messenger.
|
|
|
|
-func (m *Manager) messageWorker() {
|
|
|
|
|
|
+// worker is a blocking function that perpetually listents to events (message, bounce) on different
|
|
|
|
+// queues and processes them.
|
|
|
|
+func (m *Manager) worker() {
|
|
// Counter to keep track of the message / sec rate limit.
|
|
// Counter to keep track of the message / sec rate limit.
|
|
numMsg := 0
|
|
numMsg := 0
|
|
for {
|
|
for {
|
|
@@ -294,14 +324,18 @@ func (m *Manager) messageWorker() {
|
|
Campaign: msg.Campaign,
|
|
Campaign: msg.Campaign,
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ h := textproto.MIMEHeader{}
|
|
|
|
+ h.Set(models.EmailHeaderCampaignUUID, msg.Campaign.UUID)
|
|
|
|
+ h.Set(models.EmailHeaderSubscriberUUID, msg.Subscriber.UUID)
|
|
|
|
+
|
|
// Attach List-Unsubscribe headers?
|
|
// Attach List-Unsubscribe headers?
|
|
if m.cfg.UnsubHeader {
|
|
if m.cfg.UnsubHeader {
|
|
- h := textproto.MIMEHeader{}
|
|
|
|
h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
|
|
h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
|
|
h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
|
|
h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
|
|
- out.Headers = h
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ out.Headers = h
|
|
|
|
+
|
|
if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
|
|
if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
|
|
m.logger.Printf("error sending message in campaign %s: subscriber %s: %v",
|
|
m.logger.Printf("error sending message in campaign %s: subscriber %s: %v",
|
|
msg.Campaign.Name, msg.Subscriber.UUID, err)
|
|
msg.Campaign.Name, msg.Subscriber.UUID, err)
|
|
@@ -331,6 +365,30 @@ func (m *Manager) messageWorker() {
|
|
if err != nil {
|
|
if err != nil {
|
|
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
|
|
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Bounce event.
|
|
|
|
+ case b, ok := <-m.bounceQueue:
|
|
|
|
+ if !ok {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ subID, count, err := m.store.RecordBounce(b)
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("error recording bounce %s / %s", b.SubscriberUUID, b.Email)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if count >= m.cfg.BounceCount {
|
|
|
|
+ switch m.cfg.BounceAction {
|
|
|
|
+ case BounceTypeBlocklist:
|
|
|
|
+ err = m.store.BlocklistSubscriber(subID)
|
|
|
|
+ case BounceTypeDelete:
|
|
|
|
+ err = m.store.DeleteSubscriber(subID)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("error executing bounce for subscriber: %s", b.SubscriberUUID)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -403,7 +461,7 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
|
|
select {
|
|
select {
|
|
// Periodically scan the data source for campaigns to process.
|
|
// Periodically scan the data source for campaigns to process.
|
|
case <-t.C:
|
|
case <-t.C:
|
|
- campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs())
|
|
|
|
|
|
+ campaigns, err := m.store.NextCampaigns(m.getPendingCampaignIDs())
|
|
if err != nil {
|
|
if err != nil {
|
|
m.logger.Printf("error fetching campaigns: %v", err)
|
|
m.logger.Printf("error fetching campaigns: %v", err)
|
|
continue
|
|
continue
|
|
@@ -457,7 +515,7 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
|
|
func (m *Manager) addCampaign(c *models.Campaign) error {
|
|
func (m *Manager) addCampaign(c *models.Campaign) error {
|
|
// Validate messenger.
|
|
// Validate messenger.
|
|
if _, ok := m.messengers[c.Messenger]; !ok {
|
|
if _, ok := m.messengers[c.Messenger]; !ok {
|
|
- m.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
|
|
|
|
|
|
+ m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
|
|
return fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
|
|
return fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -491,7 +549,7 @@ func (m *Manager) getPendingCampaignIDs() []int64 {
|
|
// have been processed, or that a campaign has been paused or cancelled.
|
|
// have been processed, or that a campaign has been paused or cancelled.
|
|
func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
|
|
func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
|
|
// Fetch a batch of subscribers.
|
|
// Fetch a batch of subscribers.
|
|
- subs, err := m.src.NextSubscribers(c.ID, batchSize)
|
|
|
|
|
|
+ subs, err := m.store.NextSubscribers(c.ID, batchSize)
|
|
if err != nil {
|
|
if err != nil {
|
|
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
|
|
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
|
|
}
|
|
}
|
|
@@ -566,7 +624,7 @@ func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Ca
|
|
// A status has been passed. Change the campaign's status
|
|
// A status has been passed. Change the campaign's status
|
|
// without further checks.
|
|
// without further checks.
|
|
if status != "" {
|
|
if status != "" {
|
|
- if err := m.src.UpdateCampaignStatus(c.ID, status); err != nil {
|
|
|
|
|
|
+ if err := m.store.UpdateCampaignStatus(c.ID, status); err != nil {
|
|
m.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
|
|
m.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
|
|
} else {
|
|
} else {
|
|
m.logger.Printf("set campaign (%s) to %s", c.Name, status)
|
|
m.logger.Printf("set campaign (%s) to %s", c.Name, status)
|
|
@@ -575,7 +633,7 @@ func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Ca
|
|
}
|
|
}
|
|
|
|
|
|
// Fetch the up-to-date campaign status from the source.
|
|
// Fetch the up-to-date campaign status from the source.
|
|
- cm, err := m.src.GetCampaign(c.ID)
|
|
|
|
|
|
+ cm, err := m.store.GetCampaign(c.ID)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
@@ -583,7 +641,7 @@ func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Ca
|
|
// If a running campaign has exhausted subscribers, it's finished.
|
|
// If a running campaign has exhausted subscribers, it's finished.
|
|
if cm.Status == models.CampaignStatusRunning {
|
|
if cm.Status == models.CampaignStatusRunning {
|
|
cm.Status = models.CampaignStatusFinished
|
|
cm.Status = models.CampaignStatusFinished
|
|
- if err := m.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
|
|
|
|
|
|
+ if err := m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
|
|
m.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
|
|
m.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
|
|
} else {
|
|
} else {
|
|
m.logger.Printf("campaign (%s) finished", c.Name)
|
|
m.logger.Printf("campaign (%s) finished", c.Name)
|
|
@@ -606,7 +664,7 @@ func (m *Manager) trackLink(url, campUUID, subUUID string) string {
|
|
m.linksMut.RUnlock()
|
|
m.linksMut.RUnlock()
|
|
|
|
|
|
// Register link.
|
|
// Register link.
|
|
- uu, err := m.src.CreateLink(url)
|
|
|
|
|
|
+ uu, err := m.store.CreateLink(url)
|
|
if err != nil {
|
|
if err != nil {
|
|
m.logger.Printf("error registering tracking for link '%s': %v", url, err)
|
|
m.logger.Printf("error registering tracking for link '%s': %v", url, err)
|
|
|
|
|