|
@@ -5,6 +5,7 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"html/template"
|
|
"html/template"
|
|
"log"
|
|
"log"
|
|
|
|
+ "strings"
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
@@ -38,6 +39,7 @@ type Runner struct {
|
|
cfg Config
|
|
cfg Config
|
|
src DataSource
|
|
src DataSource
|
|
messengers map[string]messenger.Messenger
|
|
messengers map[string]messenger.Messenger
|
|
|
|
+ notifCB models.AdminNotifCallback
|
|
logger *log.Logger
|
|
logger *log.Logger
|
|
|
|
|
|
// Campaigns that are currently running.
|
|
// Campaigns that are currently running.
|
|
@@ -70,6 +72,7 @@ type Config struct {
|
|
Concurrency int
|
|
Concurrency int
|
|
MaxSendErrors int
|
|
MaxSendErrors int
|
|
RequeueOnError bool
|
|
RequeueOnError bool
|
|
|
|
+ FromEmail string
|
|
LinkTrackURL string
|
|
LinkTrackURL string
|
|
UnsubscribeURL string
|
|
UnsubscribeURL string
|
|
ViewTrackURL string
|
|
ViewTrackURL string
|
|
@@ -81,10 +84,11 @@ type msgError struct {
|
|
}
|
|
}
|
|
|
|
|
|
// New returns a new instance of Mailer.
|
|
// New returns a new instance of Mailer.
|
|
-func New(cfg Config, src DataSource, l *log.Logger) *Runner {
|
|
|
|
|
|
+func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Runner {
|
|
r := Runner{
|
|
r := Runner{
|
|
cfg: cfg,
|
|
cfg: cfg,
|
|
src: src,
|
|
src: src,
|
|
|
|
+ notifCB: notifCB,
|
|
logger: l,
|
|
logger: l,
|
|
messengers: make(map[string]messenger.Messenger),
|
|
messengers: make(map[string]messenger.Messenger),
|
|
camps: make(map[int]*models.Campaign, 0),
|
|
camps: make(map[int]*models.Campaign, 0),
|
|
@@ -189,6 +193,11 @@ func (r *Runner) Run(tick time.Duration) {
|
|
r.exhaustCampaign(e.camp, models.CampaignStatusPaused)
|
|
r.exhaustCampaign(e.camp, models.CampaignStatusPaused)
|
|
}
|
|
}
|
|
delete(r.msgErrorCounts, e.camp.ID)
|
|
delete(r.msgErrorCounts, e.camp.ID)
|
|
|
|
+
|
|
|
|
+ // Notify admins.
|
|
|
|
+ r.sendNotif(e.camp,
|
|
|
|
+ models.CampaignStatusPaused,
|
|
|
|
+ "Too many errors")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -205,12 +214,15 @@ func (r *Runner) Run(tick time.Duration) {
|
|
if has {
|
|
if has {
|
|
// There are more subscribers to fetch.
|
|
// There are more subscribers to fetch.
|
|
r.subFetchQueue <- c
|
|
r.subFetchQueue <- c
|
|
- } else {
|
|
|
|
|
|
+ } else if r.isCampaignProcessing(c.ID) {
|
|
// There are no more subscribers. Either the campaign status
|
|
// There are no more subscribers. Either the campaign status
|
|
// has changed or all subscribers have been processed.
|
|
// has changed or all subscribers have been processed.
|
|
- if err := r.exhaustCampaign(c, ""); err != nil {
|
|
|
|
|
|
+ newC, err := r.exhaustCampaign(c, "")
|
|
|
|
+ if err != nil {
|
|
r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
|
|
r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
|
|
+ r.sendNotif(newC, newC.Status, "")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -227,7 +239,7 @@ func (r *Runner) SpawnWorkers() {
|
|
|
|
|
|
err := r.messengers[m.Campaign.MessengerID].Push(
|
|
err := r.messengers[m.Campaign.MessengerID].Push(
|
|
m.from,
|
|
m.from,
|
|
- m.to,
|
|
|
|
|
|
+ []string{m.to},
|
|
m.Campaign.Subject,
|
|
m.Campaign.Subject,
|
|
m.Body)
|
|
m.Body)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -311,7 +323,7 @@ func (r *Runner) isCampaignProcessing(id int) bool {
|
|
return ok
|
|
return ok
|
|
}
|
|
}
|
|
|
|
|
|
-func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error {
|
|
|
|
|
|
+func (r *Runner) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
|
|
delete(r.camps, c.ID)
|
|
delete(r.camps, c.ID)
|
|
|
|
|
|
// A status has been passed. Change the campaign's status
|
|
// A status has been passed. Change the campaign's status
|
|
@@ -322,18 +334,18 @@ func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error {
|
|
} else {
|
|
} else {
|
|
r.logger.Printf("set campaign (%s) to %s", c.Name, status)
|
|
r.logger.Printf("set campaign (%s) to %s", c.Name, status)
|
|
}
|
|
}
|
|
-
|
|
|
|
- return nil
|
|
|
|
|
|
+ return c, nil
|
|
}
|
|
}
|
|
|
|
|
|
// Fetch the up-to-date campaign status from the source.
|
|
// Fetch the up-to-date campaign status from the source.
|
|
cm, err := r.src.GetCampaign(c.ID)
|
|
cm, err := r.src.GetCampaign(c.ID)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return err
|
|
|
|
|
|
+ return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// If a running campaign has exhausted subscribers, it's finished.
|
|
// If a running campaign has exhausted subscribers, it's finished.
|
|
if cm.Status == models.CampaignStatusRunning {
|
|
if cm.Status == models.CampaignStatusRunning {
|
|
|
|
+ cm.Status = models.CampaignStatusFinished
|
|
if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
|
|
if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
|
|
r.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
|
|
r.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
|
|
} else {
|
|
} else {
|
|
@@ -343,7 +355,7 @@ func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error {
|
|
r.logger.Printf("stop processing campaign (%s)", c.Name)
|
|
r.logger.Printf("stop processing campaign (%s)", c.Name)
|
|
}
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
|
|
|
+ return cm, nil
|
|
}
|
|
}
|
|
|
|
|
|
// Render takes a Message, executes its pre-compiled Campaign.Tpl
|
|
// Render takes a Message, executes its pre-compiled Campaign.Tpl
|
|
@@ -383,6 +395,23 @@ func (r *Runner) trackLink(url, campUUID, subUUID string) string {
|
|
return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
|
|
return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// sendNotif sends a notification to registered admin e-mails.
|
|
|
|
+func (r *Runner) sendNotif(c *models.Campaign, status, reason string) error {
|
|
|
|
+ var (
|
|
|
|
+ subject = fmt.Sprintf("%s: %s", strings.Title(status), c.Name)
|
|
|
|
+ data = map[string]interface{}{
|
|
|
|
+ "ID": c.ID,
|
|
|
|
+ "Name": c.Name,
|
|
|
|
+ "Status": status,
|
|
|
|
+ "Sent": c.Sent,
|
|
|
|
+ "ToSend": c.ToSend,
|
|
|
|
+ "Reason": reason,
|
|
|
|
+ }
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ return r.notifCB(subject, data)
|
|
|
|
+}
|
|
|
|
+
|
|
// TemplateFuncs returns the template functions to be applied into
|
|
// TemplateFuncs returns the template functions to be applied into
|
|
// compiled campaign templates.
|
|
// compiled campaign templates.
|
|
func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
|
|
func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
|