manager.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. package manager
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "html/template"
  7. "log"
  8. "net/textproto"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/Masterminds/sprig/v3"
  13. "github.com/knadh/listmonk/internal/i18n"
  14. "github.com/knadh/listmonk/internal/messenger"
  15. "github.com/knadh/listmonk/models"
  16. )
  17. const (
  18. // BaseTPL is the name of the base template.
  19. BaseTPL = "base"
  20. BounceTypeBlocklist = "blocklist"
  21. BounceTypeDelete = "delete"
  22. // ContentTpl is the name of the compiled message.
  23. ContentTpl = "content"
  24. dummyUUID = "00000000-0000-0000-0000-000000000000"
  25. )
  26. // Store represents a data backend, such as a database,
  27. // that provides subscriber and campaign records.
  28. type Store interface {
  29. NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
  30. NextSubscribers(campID, limit int) ([]models.Subscriber, error)
  31. GetCampaign(campID int) (*models.Campaign, error)
  32. UpdateCampaignStatus(campID int, status string) error
  33. CreateLink(url string) (string, error)
  34. // RecordBounce records an external bounce event identified by
  35. // a user's UUID/e-mail and a campaign UUID.
  36. RecordBounce(b models.Bounce) (int64, int, error)
  37. BlocklistSubscriber(id int64) error
  38. DeleteSubscriber(id int64) error
  39. }
  40. // Manager handles the scheduling, processing, and queuing of campaigns
  41. // and message pushes.
  42. type Manager struct {
  43. cfg Config
  44. store Store
  45. i18n *i18n.I18n
  46. messengers map[string]messenger.Messenger
  47. notifCB models.AdminNotifCallback
  48. logger *log.Logger
  49. // Campaigns that are currently running.
  50. camps map[int]*models.Campaign
  51. campsMut sync.RWMutex
  52. // Links generated using Track() are cached here so as to not query
  53. // the database for the link UUID for every message sent. This has to
  54. // be locked as it may be used externally when previewing campaigns.
  55. links map[string]string
  56. linksMut sync.RWMutex
  57. subFetchQueue chan *models.Campaign
  58. campMsgQueue chan CampaignMessage
  59. campMsgErrorQueue chan msgError
  60. campMsgErrorCounts map[int]int
  61. msgQueue chan Message
  62. bounceQueue chan models.Bounce
  63. // Sliding window keeps track of the total number of messages sent in a period
  64. // and on reaching the specified limit, waits until the window is over before
  65. // sending further messages.
  66. slidingWindowNumMsg int
  67. slidingWindowStart time.Time
  68. }
  69. // CampaignMessage represents an instance of campaign message to be pushed out,
  70. // specific to a subscriber, via the campaign's messenger.
  71. type CampaignMessage struct {
  72. Campaign *models.Campaign
  73. Subscriber models.Subscriber
  74. from string
  75. to string
  76. subject string
  77. body []byte
  78. altBody []byte
  79. unsubURL string
  80. }
  81. // Message represents a generic message to be pushed to a messenger.
  82. type Message struct {
  83. messenger.Message
  84. Subscriber models.Subscriber
  85. // Messenger is the messenger backend to use: email|postback.
  86. Messenger string
  87. }
  88. // Config has parameters for configuring the manager.
  89. type Config struct {
  90. // Number of subscribers to pull from the DB in a single iteration.
  91. BatchSize int
  92. Concurrency int
  93. MessageRate int
  94. MaxSendErrors int
  95. SlidingWindow bool
  96. SlidingWindowDuration time.Duration
  97. SlidingWindowRate int
  98. RequeueOnError bool
  99. FromEmail string
  100. IndividualTracking bool
  101. LinkTrackURL string
  102. UnsubURL string
  103. OptinURL string
  104. MessageURL string
  105. ViewTrackURL string
  106. UnsubHeader bool
  107. BounceCount int
  108. BounceAction string
  109. }
  110. type msgError struct {
  111. camp *models.Campaign
  112. err error
  113. }
  114. var pushTimeout = time.Second * 3
  115. // New returns a new instance of Mailer.
  116. func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18n, l *log.Logger) *Manager {
  117. if cfg.BatchSize < 1 {
  118. cfg.BatchSize = 1000
  119. }
  120. if cfg.Concurrency < 1 {
  121. cfg.Concurrency = 1
  122. }
  123. if cfg.MessageRate < 1 {
  124. cfg.MessageRate = 1
  125. }
  126. return &Manager{
  127. cfg: cfg,
  128. store: store,
  129. i18n: i,
  130. notifCB: notifCB,
  131. logger: l,
  132. messengers: make(map[string]messenger.Messenger),
  133. camps: make(map[int]*models.Campaign),
  134. links: make(map[string]string),
  135. subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
  136. campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
  137. msgQueue: make(chan Message, cfg.Concurrency),
  138. bounceQueue: make(chan models.Bounce, cfg.Concurrency),
  139. campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
  140. campMsgErrorCounts: make(map[int]int),
  141. slidingWindowStart: time.Now(),
  142. }
  143. }
  144. // NewCampaignMessage creates and returns a CampaignMessage that is made available
  145. // to message templates while they're compiled. It represents a message from
  146. // a campaign that's bound to a single Subscriber.
  147. func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) (CampaignMessage, error) {
  148. msg := CampaignMessage{
  149. Campaign: c,
  150. Subscriber: s,
  151. subject: c.Subject,
  152. from: c.FromEmail,
  153. to: s.Email,
  154. unsubURL: fmt.Sprintf(m.cfg.UnsubURL, c.UUID, s.UUID),
  155. }
  156. if err := msg.render(); err != nil {
  157. return msg, err
  158. }
  159. return msg, nil
  160. }
  161. // AddMessenger adds a Messenger messaging backend to the manager.
  162. func (m *Manager) AddMessenger(msg messenger.Messenger) error {
  163. id := msg.Name()
  164. if _, ok := m.messengers[id]; ok {
  165. return fmt.Errorf("messenger '%s' is already loaded", id)
  166. }
  167. m.messengers[id] = msg
  168. return nil
  169. }
  170. // PushMessage pushes an arbitrary non-campaign Message to be sent out by the workers.
  171. // It times out if the queue is busy.
  172. func (m *Manager) PushMessage(msg Message) error {
  173. t := time.NewTicker(pushTimeout)
  174. defer t.Stop()
  175. select {
  176. case m.msgQueue <- msg:
  177. case <-t.C:
  178. m.logger.Printf("message push timed out: '%s'", msg.Subject)
  179. return errors.New("message push timed out")
  180. }
  181. return nil
  182. }
  183. // PushCampaignMessage pushes a campaign messages to be sent out by the workers.
  184. // It times out if the queue is busy.
  185. func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
  186. t := time.NewTicker(pushTimeout)
  187. defer t.Stop()
  188. select {
  189. case m.campMsgQueue <- msg:
  190. case <-t.C:
  191. m.logger.Printf("message push timed out: '%s'", msg.Subject())
  192. return errors.New("message push timed out")
  193. }
  194. return nil
  195. }
  196. // HasMessenger checks if a given messenger is registered.
  197. func (m *Manager) HasMessenger(id string) bool {
  198. _, ok := m.messengers[id]
  199. return ok
  200. }
  201. // HasRunningCampaigns checks if there are any active campaigns.
  202. func (m *Manager) HasRunningCampaigns() bool {
  203. m.campsMut.Lock()
  204. defer m.campsMut.Unlock()
  205. return len(m.camps) > 0
  206. }
  207. // PushBounce records a bounce event.
  208. func (m *Manager) PushBounce(b models.Bounce) error {
  209. t := time.NewTicker(pushTimeout)
  210. defer t.Stop()
  211. select {
  212. case m.bounceQueue <- b:
  213. case <-t.C:
  214. m.logger.Printf("bounce pushed timed out: %s / %s", b.SubscriberUUID, b.Email)
  215. return errors.New("bounce push timed out")
  216. }
  217. return nil
  218. }
  219. // Run is a blocking function (that should be invoked as a goroutine)
  220. // that scans the data source at regular intervals for pending campaigns,
  221. // and queues them for processing. The process queue fetches batches of
  222. // subscribers and pushes messages to them for each queued campaign
  223. // until all subscribers are exhausted, at which point, a campaign is marked
  224. // as "finished".
  225. func (m *Manager) Run(tick time.Duration) {
  226. go m.scanCampaigns(tick)
  227. // Spawn N message workers.
  228. for i := 0; i < m.cfg.Concurrency; i++ {
  229. go m.worker()
  230. }
  231. // Fetch the next set of subscribers for a campaign and process them.
  232. for c := range m.subFetchQueue {
  233. has, err := m.nextSubscribers(c, m.cfg.BatchSize)
  234. if err != nil {
  235. m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
  236. continue
  237. }
  238. if has {
  239. // There are more subscribers to fetch.
  240. m.subFetchQueue <- c
  241. } else if m.isCampaignProcessing(c.ID) {
  242. // There are no more subscribers. Either the campaign status
  243. // has changed or all subscribers have been processed.
  244. newC, err := m.exhaustCampaign(c, "")
  245. if err != nil {
  246. m.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
  247. continue
  248. }
  249. m.sendNotif(newC, newC.Status, "")
  250. }
  251. }
  252. }
  253. // worker is a blocking function that perpetually listents to events (message, bounce) on different
  254. // queues and processes them.
  255. func (m *Manager) worker() {
  256. // Counter to keep track of the message / sec rate limit.
  257. numMsg := 0
  258. for {
  259. select {
  260. // Campaign message.
  261. case msg, ok := <-m.campMsgQueue:
  262. if !ok {
  263. return
  264. }
  265. // Pause on hitting the message rate.
  266. if numMsg >= m.cfg.MessageRate {
  267. time.Sleep(time.Second)
  268. numMsg = 0
  269. }
  270. numMsg++
  271. // Outgoing message.
  272. out := messenger.Message{
  273. From: msg.from,
  274. To: []string{msg.to},
  275. Subject: msg.subject,
  276. ContentType: msg.Campaign.ContentType,
  277. Body: msg.body,
  278. AltBody: msg.altBody,
  279. Subscriber: msg.Subscriber,
  280. Campaign: msg.Campaign,
  281. }
  282. h := textproto.MIMEHeader{}
  283. h.Set(models.EmailHeaderCampaignUUID, msg.Campaign.UUID)
  284. h.Set(models.EmailHeaderSubscriberUUID, msg.Subscriber.UUID)
  285. // Attach List-Unsubscribe headers?
  286. if m.cfg.UnsubHeader {
  287. h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
  288. h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
  289. }
  290. out.Headers = h
  291. if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
  292. m.logger.Printf("error sending message in campaign %s: subscriber %s: %v",
  293. msg.Campaign.Name, msg.Subscriber.UUID, err)
  294. select {
  295. case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
  296. default:
  297. }
  298. }
  299. // Arbitrary message.
  300. case msg, ok := <-m.msgQueue:
  301. if !ok {
  302. return
  303. }
  304. err := m.messengers[msg.Messenger].Push(messenger.Message{
  305. From: msg.From,
  306. To: msg.To,
  307. Subject: msg.Subject,
  308. ContentType: msg.ContentType,
  309. Body: msg.Body,
  310. AltBody: msg.AltBody,
  311. Subscriber: msg.Subscriber,
  312. Campaign: msg.Campaign,
  313. })
  314. if err != nil {
  315. m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
  316. }
  317. // Bounce event.
  318. case b, ok := <-m.bounceQueue:
  319. if !ok {
  320. return
  321. }
  322. subID, count, err := m.store.RecordBounce(b)
  323. if err != nil {
  324. m.logger.Printf("error recording bounce %s / %s", b.SubscriberUUID, b.Email)
  325. }
  326. if count >= m.cfg.BounceCount {
  327. switch m.cfg.BounceAction {
  328. case BounceTypeBlocklist:
  329. err = m.store.BlocklistSubscriber(subID)
  330. case BounceTypeDelete:
  331. err = m.store.DeleteSubscriber(subID)
  332. }
  333. if err != nil {
  334. m.logger.Printf("error executing bounce for subscriber: %s", b.SubscriberUUID)
  335. }
  336. }
  337. }
  338. }
  339. }
  340. // TemplateFuncs returns the template functions to be applied into
  341. // compiled campaign templates.
  342. func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap {
  343. f := template.FuncMap{
  344. "TrackLink": func(url string, msg *CampaignMessage) string {
  345. subUUID := msg.Subscriber.UUID
  346. if !m.cfg.IndividualTracking {
  347. subUUID = dummyUUID
  348. }
  349. return m.trackLink(url, msg.Campaign.UUID, subUUID)
  350. },
  351. "TrackView": func(msg *CampaignMessage) template.HTML {
  352. subUUID := msg.Subscriber.UUID
  353. if !m.cfg.IndividualTracking {
  354. subUUID = dummyUUID
  355. }
  356. return template.HTML(fmt.Sprintf(`<img src="%s" alt="" />`,
  357. fmt.Sprintf(m.cfg.ViewTrackURL, msg.Campaign.UUID, subUUID)))
  358. },
  359. "UnsubscribeURL": func(msg *CampaignMessage) string {
  360. return msg.unsubURL
  361. },
  362. "OptinURL": func(msg *CampaignMessage) string {
  363. // Add list IDs.
  364. // TODO: Show private lists list on optin e-mail
  365. return fmt.Sprintf(m.cfg.OptinURL, msg.Subscriber.UUID, "")
  366. },
  367. "MessageURL": func(msg *CampaignMessage) string {
  368. return fmt.Sprintf(m.cfg.MessageURL, c.UUID, msg.Subscriber.UUID)
  369. },
  370. "Date": func(layout string) string {
  371. if layout == "" {
  372. layout = time.ANSIC
  373. }
  374. return time.Now().Format(layout)
  375. },
  376. "L": func() *i18n.I18n {
  377. return m.i18n
  378. },
  379. "Safe": func(safeHTML string) template.HTML {
  380. return template.HTML(safeHTML)
  381. },
  382. }
  383. for k, v := range sprig.GenericFuncMap() {
  384. f[k] = v
  385. }
  386. return f
  387. }
  388. // Close closes and exits the campaign manager.
  389. func (m *Manager) Close() {
  390. close(m.subFetchQueue)
  391. close(m.campMsgErrorQueue)
  392. close(m.msgQueue)
  393. }
  394. // scanCampaigns is a blocking function that periodically scans the data source
  395. // for campaigns to process and dispatches them to the manager.
  396. func (m *Manager) scanCampaigns(tick time.Duration) {
  397. t := time.NewTicker(tick)
  398. defer t.Stop()
  399. for {
  400. select {
  401. // Periodically scan the data source for campaigns to process.
  402. case <-t.C:
  403. campaigns, err := m.store.NextCampaigns(m.getPendingCampaignIDs())
  404. if err != nil {
  405. m.logger.Printf("error fetching campaigns: %v", err)
  406. continue
  407. }
  408. for _, c := range campaigns {
  409. if err := m.addCampaign(c); err != nil {
  410. m.logger.Printf("error processing campaign (%s): %v", c.Name, err)
  411. continue
  412. }
  413. m.logger.Printf("start processing campaign (%s)", c.Name)
  414. // If subscriber processing is busy, move on. Blocking and waiting
  415. // can end up in a race condition where the waiting campaign's
  416. // state in the data source has changed.
  417. select {
  418. case m.subFetchQueue <- c:
  419. default:
  420. }
  421. }
  422. // Aggregate errors from sending messages to check against the error threshold
  423. // after which a campaign is paused.
  424. case e, ok := <-m.campMsgErrorQueue:
  425. if !ok {
  426. return
  427. }
  428. if m.cfg.MaxSendErrors < 1 {
  429. continue
  430. }
  431. // If the error threshold is met, pause the campaign.
  432. m.campMsgErrorCounts[e.camp.ID]++
  433. if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
  434. m.logger.Printf("error counted exceeded %d. pausing campaign %s",
  435. m.cfg.MaxSendErrors, e.camp.Name)
  436. if m.isCampaignProcessing(e.camp.ID) {
  437. m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
  438. }
  439. delete(m.campMsgErrorCounts, e.camp.ID)
  440. // Notify admins.
  441. m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
  442. }
  443. }
  444. }
  445. }
  446. // addCampaign adds a campaign to the process queue.
  447. func (m *Manager) addCampaign(c *models.Campaign) error {
  448. // Validate messenger.
  449. if _, ok := m.messengers[c.Messenger]; !ok {
  450. m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
  451. return fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
  452. }
  453. // Load the template.
  454. if err := c.CompileTemplate(m.TemplateFuncs(c)); err != nil {
  455. return err
  456. }
  457. // Add the campaign to the active map.
  458. m.campsMut.Lock()
  459. m.camps[c.ID] = c
  460. m.campsMut.Unlock()
  461. return nil
  462. }
  463. // getPendingCampaignIDs returns the IDs of campaigns currently being processed.
  464. func (m *Manager) getPendingCampaignIDs() []int64 {
  465. // Needs to return an empty slice in case there are no campaigns.
  466. m.campsMut.RLock()
  467. ids := make([]int64, 0, len(m.camps))
  468. for _, c := range m.camps {
  469. ids = append(ids, int64(c.ID))
  470. }
  471. m.campsMut.RUnlock()
  472. return ids
  473. }
  474. // nextSubscribers processes the next batch of subscribers in a given campaign.
  475. // It returns a bool indicating whether any subscribers were processed
  476. // in the current batch or not. A false indicates that all subscribers
  477. // have been processed, or that a campaign has been paused or cancelled.
  478. func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
  479. // Fetch a batch of subscribers.
  480. subs, err := m.store.NextSubscribers(c.ID, batchSize)
  481. if err != nil {
  482. return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
  483. }
  484. // There are no subscribers.
  485. if len(subs) == 0 {
  486. return false, nil
  487. }
  488. // Is there a sliding window limit configured?
  489. hasSliding := m.cfg.SlidingWindow &&
  490. m.cfg.SlidingWindowRate > 0 &&
  491. m.cfg.SlidingWindowDuration.Seconds() > 1
  492. // Push messages.
  493. for _, s := range subs {
  494. // Send the message.
  495. msg, err := m.NewCampaignMessage(c, s)
  496. if err != nil {
  497. m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
  498. continue
  499. }
  500. // Push the message to the queue while blocking and waiting until
  501. // the queue is drained.
  502. m.campMsgQueue <- msg
  503. // Check if the sliding window is active.
  504. if hasSliding {
  505. diff := time.Now().Sub(m.slidingWindowStart)
  506. // Window has expired. Reset the clock.
  507. if diff >= m.cfg.SlidingWindowDuration {
  508. m.slidingWindowStart = time.Now()
  509. m.slidingWindowNumMsg = 0
  510. continue
  511. }
  512. // Have the messages exceeded the limit?
  513. m.slidingWindowNumMsg++
  514. if m.slidingWindowNumMsg >= m.cfg.SlidingWindowRate {
  515. wait := m.cfg.SlidingWindowDuration - diff
  516. m.logger.Printf("messages exceeded (%d) for the window (%v since %s). Sleeping for %s.",
  517. m.slidingWindowNumMsg,
  518. m.cfg.SlidingWindowDuration,
  519. m.slidingWindowStart.Format(time.RFC822Z),
  520. wait.Round(time.Second)*1)
  521. m.slidingWindowNumMsg = 0
  522. time.Sleep(wait)
  523. }
  524. }
  525. }
  526. return true, nil
  527. }
  528. // isCampaignProcessing checks if the campaign is bing processed.
  529. func (m *Manager) isCampaignProcessing(id int) bool {
  530. m.campsMut.RLock()
  531. _, ok := m.camps[id]
  532. m.campsMut.RUnlock()
  533. return ok
  534. }
  535. func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
  536. m.campsMut.Lock()
  537. delete(m.camps, c.ID)
  538. m.campsMut.Unlock()
  539. // A status has been passed. Change the campaign's status
  540. // without further checks.
  541. if status != "" {
  542. if err := m.store.UpdateCampaignStatus(c.ID, status); err != nil {
  543. m.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
  544. } else {
  545. m.logger.Printf("set campaign (%s) to %s", c.Name, status)
  546. }
  547. return c, nil
  548. }
  549. // Fetch the up-to-date campaign status from the source.
  550. cm, err := m.store.GetCampaign(c.ID)
  551. if err != nil {
  552. return nil, err
  553. }
  554. // If a running campaign has exhausted subscribers, it's finished.
  555. if cm.Status == models.CampaignStatusRunning {
  556. cm.Status = models.CampaignStatusFinished
  557. if err := m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
  558. m.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
  559. } else {
  560. m.logger.Printf("campaign (%s) finished", c.Name)
  561. }
  562. } else {
  563. m.logger.Printf("stop processing campaign (%s)", c.Name)
  564. }
  565. return cm, nil
  566. }
  567. // trackLink register a URL and return its UUID to be used in message templates
  568. // for tracking links.
  569. func (m *Manager) trackLink(url, campUUID, subUUID string) string {
  570. m.linksMut.RLock()
  571. if uu, ok := m.links[url]; ok {
  572. m.linksMut.RUnlock()
  573. return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
  574. }
  575. m.linksMut.RUnlock()
  576. // Register link.
  577. uu, err := m.store.CreateLink(url)
  578. if err != nil {
  579. m.logger.Printf("error registering tracking for link '%s': %v", url, err)
  580. // If the registration fails, fail over to the original URL.
  581. return url
  582. }
  583. m.linksMut.Lock()
  584. m.links[url] = uu
  585. m.linksMut.Unlock()
  586. return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
  587. }
  588. // sendNotif sends a notification to registered admin e-mails.
  589. func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error {
  590. var (
  591. subject = fmt.Sprintf("%s: %s", strings.Title(status), c.Name)
  592. data = map[string]interface{}{
  593. "ID": c.ID,
  594. "Name": c.Name,
  595. "Status": status,
  596. "Sent": c.Sent,
  597. "ToSend": c.ToSend,
  598. "Reason": reason,
  599. }
  600. )
  601. return m.notifCB(subject, data)
  602. }
  603. // render takes a Message, executes its pre-compiled Campaign.Tpl
  604. // and applies the resultant bytes to Message.body to be used in messages.
  605. func (m *CampaignMessage) render() error {
  606. out := bytes.Buffer{}
  607. // Render the subject if it's a template.
  608. if m.Campaign.SubjectTpl != nil {
  609. if err := m.Campaign.SubjectTpl.ExecuteTemplate(&out, models.ContentTpl, m); err != nil {
  610. return err
  611. }
  612. m.subject = out.String()
  613. out.Reset()
  614. }
  615. // Compile the main template.
  616. if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
  617. return err
  618. }
  619. m.body = out.Bytes()
  620. // Is there an alt body?
  621. if m.Campaign.ContentType != models.CampaignContentTypePlain && m.Campaign.AltBody.Valid {
  622. if m.Campaign.AltBodyTpl != nil {
  623. b := bytes.Buffer{}
  624. if err := m.Campaign.AltBodyTpl.ExecuteTemplate(&b, models.ContentTpl, m); err != nil {
  625. return err
  626. }
  627. m.altBody = b.Bytes()
  628. } else {
  629. m.altBody = []byte(m.Campaign.AltBody.String)
  630. }
  631. }
  632. return nil
  633. }
  634. // Subject returns a copy of the message subject
  635. func (m *CampaignMessage) Subject() string {
  636. return m.subject
  637. }
  638. // Body returns a copy of the message body.
  639. func (m *CampaignMessage) Body() []byte {
  640. out := make([]byte, len(m.body))
  641. copy(out, m.body)
  642. return out
  643. }
  644. // AltBody returns a copy of the message's alt body.
  645. func (m *CampaignMessage) AltBody() []byte {
  646. out := make([]byte, len(m.altBody))
  647. copy(out, m.altBody)
  648. return out
  649. }