|
@@ -153,8 +153,8 @@ func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, i *i18n.
|
|
|
// NewCampaignMessage creates and returns a CampaignMessage that is made available
|
|
|
// to message templates while they're compiled. It represents a message from
|
|
|
// a campaign that's bound to a single Subscriber.
|
|
|
-func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) CampaignMessage {
|
|
|
- return CampaignMessage{
|
|
|
+func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) (CampaignMessage, error) {
|
|
|
+ msg := CampaignMessage{
|
|
|
Campaign: c,
|
|
|
Subscriber: s,
|
|
|
|
|
@@ -163,6 +163,12 @@ func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) Ca
|
|
|
to: s.Email,
|
|
|
unsubURL: fmt.Sprintf(m.cfg.UnsubURL, c.UUID, s.UUID),
|
|
|
}
|
|
|
+
|
|
|
+ if err := msg.render(); err != nil {
|
|
|
+ return msg, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return msg, nil
|
|
|
}
|
|
|
|
|
|
// AddMessenger adds a Messenger messaging backend to the manager.
|
|
@@ -175,7 +181,8 @@ func (m *Manager) AddMessenger(msg messenger.Messenger) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// PushMessage pushes a Message to be sent out by the workers.
|
|
|
+// PushMessage pushes an arbitrary non-campaign Message to be sent out by the workers.
|
|
|
+// It times out if the queue is busy.
|
|
|
func (m *Manager) PushMessage(msg Message) error {
|
|
|
t := time.NewTicker(time.Second * 3)
|
|
|
defer t.Stop()
|
|
@@ -183,7 +190,22 @@ func (m *Manager) PushMessage(msg Message) error {
|
|
|
select {
|
|
|
case m.msgQueue <- msg:
|
|
|
case <-t.C:
|
|
|
- m.logger.Println("message push timed out: %'s'", msg.Subject)
|
|
|
+ m.logger.Printf("message push timed out: '%s'", msg.Subject)
|
|
|
+ return errors.New("message push timed out")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// PushCampaignMessage pushes a campaign messages to be sent out by the workers.
|
|
|
+// It times out if the queue is busy.
|
|
|
+func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
|
|
|
+ t := time.NewTicker(time.Second * 3)
|
|
|
+ defer t.Stop()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case m.campMsgQueue <- msg:
|
|
|
+ case <-t.C:
|
|
|
+ m.logger.Printf("message push timed out: '%s'", msg.Subject())
|
|
|
return errors.New("message push timed out")
|
|
|
}
|
|
|
return nil
|
|
@@ -487,8 +509,8 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
|
|
|
// Push messages.
|
|
|
for _, s := range subs {
|
|
|
// Send the message.
|
|
|
- msg := m.NewCampaignMessage(c, s)
|
|
|
- if err := msg.Render(); err != nil {
|
|
|
+ msg, err := m.NewCampaignMessage(c, s)
|
|
|
+ if err != nil {
|
|
|
m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
|
|
|
continue
|
|
|
}
|
|
@@ -615,9 +637,9 @@ func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error {
|
|
|
return m.notifCB(subject, data)
|
|
|
}
|
|
|
|
|
|
-// Render takes a Message, executes its pre-compiled Campaign.Tpl
|
|
|
+// render takes a Message, executes its pre-compiled Campaign.Tpl
|
|
|
// and applies the resultant bytes to Message.body to be used in messages.
|
|
|
-func (m *CampaignMessage) Render() error {
|
|
|
+func (m *CampaignMessage) render() error {
|
|
|
out := bytes.Buffer{}
|
|
|
|
|
|
// Render the subject if it's a template.
|