|
@@ -5,6 +5,7 @@ import (
|
|
|
"fmt"
|
|
|
"html/template"
|
|
|
"log"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/knadh/listmonk/messenger"
|
|
@@ -30,6 +31,7 @@ type DataSource interface {
|
|
|
PauseCampaign(campID int) error
|
|
|
CancelCampaign(campID int) error
|
|
|
FinishCampaign(campID int) error
|
|
|
+ CreateLink(url string) (string, error)
|
|
|
}
|
|
|
|
|
|
// Runner handles the scheduling, processing, and queuing of campaigns
|
|
@@ -43,7 +45,13 @@ type Runner struct {
|
|
|
// Campaigns that are currently running.
|
|
|
camps map[int]*models.Campaign
|
|
|
|
|
|
- msgQueue chan Message
|
|
|
+ // Links generated using Track() are cached here so as to not query
|
|
|
+ // the database for the link UUID for every message sent. This has to
|
|
|
+ // be locked as it may be used externally when previewing campaigns.
|
|
|
+ links map[string]string
|
|
|
+ linksMutex sync.RWMutex
|
|
|
+
|
|
|
+ msgQueue chan *Message
|
|
|
subFetchQueue chan *models.Campaign
|
|
|
}
|
|
|
|
|
@@ -52,14 +60,14 @@ type Message struct {
|
|
|
Campaign *models.Campaign
|
|
|
Subscriber *models.Subscriber
|
|
|
UnsubscribeURL string
|
|
|
-
|
|
|
- body []byte
|
|
|
- to string
|
|
|
+ Body []byte
|
|
|
+ to string
|
|
|
}
|
|
|
|
|
|
// Config has parameters for configuring the runner.
|
|
|
type Config struct {
|
|
|
Concurrency int
|
|
|
+ LinkTrackURL string
|
|
|
UnsubscribeURL string
|
|
|
}
|
|
|
|
|
@@ -70,14 +78,26 @@ func New(cfg Config, src DataSource, l *log.Logger) *Runner {
|
|
|
messengers: make(map[string]messenger.Messenger),
|
|
|
src: src,
|
|
|
camps: make(map[int]*models.Campaign, 0),
|
|
|
+ links: make(map[string]string, 0),
|
|
|
logger: l,
|
|
|
subFetchQueue: make(chan *models.Campaign, 100),
|
|
|
- msgQueue: make(chan Message, cfg.Concurrency),
|
|
|
+ msgQueue: make(chan *Message, cfg.Concurrency),
|
|
|
}
|
|
|
|
|
|
return &r
|
|
|
}
|
|
|
|
|
|
+// NewMessage creates and returns a Message that is made available
|
|
|
+// to message templates while they're compiled.
|
|
|
+func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message {
|
|
|
+ return &Message{
|
|
|
+ to: s.Email,
|
|
|
+ Campaign: c,
|
|
|
+ Subscriber: s,
|
|
|
+ UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// AddMessenger adds a Messenger messaging backend to the runner process.
|
|
|
func (r *Runner) AddMessenger(msg messenger.Messenger) error {
|
|
|
id := msg.Name()
|
|
@@ -160,7 +180,7 @@ func (r *Runner) Run(tick time.Duration) {
|
|
|
// SpawnWorkers spawns workers goroutines that push out messages.
|
|
|
func (r *Runner) SpawnWorkers() {
|
|
|
for i := 0; i < r.cfg.Concurrency; i++ {
|
|
|
- go func(ch chan Message) {
|
|
|
+ go func(ch chan *Message) {
|
|
|
for {
|
|
|
select {
|
|
|
case m := <-ch:
|
|
@@ -168,21 +188,25 @@ func (r *Runner) SpawnWorkers() {
|
|
|
m.Campaign.FromEmail,
|
|
|
m.Subscriber.Email,
|
|
|
m.Campaign.Subject,
|
|
|
- m.body)
|
|
|
+ m.Body)
|
|
|
}
|
|
|
}
|
|
|
}(r.msgQueue)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TemplateFuncs returns the template functions to be applied into
|
|
|
+// compiled campaign templates.
|
|
|
+func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
|
|
|
+ return template.FuncMap{
|
|
|
+ "Track": func(url, campUUID, subUUID string) string {
|
|
|
+ return r.trackLink(url, campUUID, subUUID)
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// addCampaign adds a campaign to the process queue.
|
|
|
func (r *Runner) addCampaign(c *models.Campaign) error {
|
|
|
- var tplErr error
|
|
|
-
|
|
|
- c.Tpl, tplErr = CompileMessageTemplate(c.TemplateBody, c.Body)
|
|
|
- if tplErr != nil {
|
|
|
- return tplErr
|
|
|
- }
|
|
|
|
|
|
// Validate messenger.
|
|
|
if _, ok := r.messengers[c.MessengerID]; !ok {
|
|
@@ -190,9 +214,13 @@ func (r *Runner) addCampaign(c *models.Campaign) error {
|
|
|
return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
|
|
|
}
|
|
|
|
|
|
+ // Load the template.
|
|
|
+ if err := c.CompileTemplate(r.TemplateFuncs(c)); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
// Add the campaign to the active map.
|
|
|
r.camps[c.ID] = c
|
|
|
-
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -225,17 +253,14 @@ func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error
|
|
|
|
|
|
// Push messages.
|
|
|
for _, s := range subs {
|
|
|
- to, body, err := r.makeMessage(c, s)
|
|
|
- if err != nil {
|
|
|
- r.logger.Printf("error preparing message (%s) (%s): %v", c.Name, s.Email, err)
|
|
|
+ m := r.NewMessage(c, s)
|
|
|
+ if err := m.Render(); err != nil {
|
|
|
+ r.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
// Send the message.
|
|
|
- r.msgQueue <- Message{Campaign: c,
|
|
|
- Subscriber: s,
|
|
|
- to: to,
|
|
|
- body: body}
|
|
|
+ r.msgQueue <- m
|
|
|
}
|
|
|
|
|
|
return true, nil
|
|
@@ -263,21 +288,40 @@ func (r *Runner) processExhaustedCampaign(c *models.Campaign) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// makeMessage prepares a campaign message for a subscriber and returns
|
|
|
-// the 'to' address and the body.
|
|
|
-func (r *Runner) makeMessage(c *models.Campaign, s *models.Subscriber) (string, []byte, error) {
|
|
|
- // Render the message body.
|
|
|
- var (
|
|
|
- out = bytes.Buffer{}
|
|
|
- tplMsg = Message{Campaign: c,
|
|
|
- Subscriber: s,
|
|
|
- UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID)}
|
|
|
- )
|
|
|
- if err := c.Tpl.ExecuteTemplate(&out, BaseTPL, tplMsg); err != nil {
|
|
|
- return "", nil, err
|
|
|
+// Render takes a Message, executes its pre-compiled Campaign.Tpl
|
|
|
+// and applies the resultant bytes to Message.body to be used in messages.
|
|
|
+func (m *Message) Render() error {
|
|
|
+ out := bytes.Buffer{}
|
|
|
+ if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
+ m.Body = out.Bytes()
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// trackLink register a URL and return its UUID to be used in message templates
|
|
|
+// for tracking links.
|
|
|
+func (r *Runner) trackLink(url, campUUID, subUUID string) string {
|
|
|
+ r.linksMutex.RLock()
|
|
|
+ if uu, ok := r.links[url]; ok {
|
|
|
+ return uu
|
|
|
+ }
|
|
|
+ r.linksMutex.RUnlock()
|
|
|
+
|
|
|
+ // Register link.
|
|
|
+ uu, err := r.src.CreateLink(url)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Printf("error registering tracking for link '%s': %v", url, err)
|
|
|
+
|
|
|
+ // If the registration fails, fail over to the original URL.
|
|
|
+ return url
|
|
|
+ }
|
|
|
+
|
|
|
+ r.linksMutex.Lock()
|
|
|
+ r.links[url] = uu
|
|
|
+ r.linksMutex.Unlock()
|
|
|
|
|
|
- return s.Email, out.Bytes(), nil
|
|
|
+ return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
|
|
|
}
|
|
|
|
|
|
// CompileMessageTemplate takes a base template body string and a child (message) template
|