manager.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package manager
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "html/template"
  7. "log"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/knadh/listmonk/internal/messenger"
  12. "github.com/knadh/listmonk/models"
  13. )
  14. const (
  15. batchSize = 10000
  16. // BaseTPL is the name of the base template.
  17. BaseTPL = "base"
  18. // ContentTpl is the name of the compiled message.
  19. ContentTpl = "content"
  20. )
  21. // DataSource represents a data backend, such as a database,
  22. // that provides subscriber and campaign records.
  23. type DataSource interface {
  24. NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
  25. NextSubscribers(campID, limit int) ([]models.Subscriber, error)
  26. GetCampaign(campID int) (*models.Campaign, error)
  27. UpdateCampaignStatus(campID int, status string) error
  28. CreateLink(url string) (string, error)
  29. }
  30. // Manager handles the scheduling, processing, and queuing of campaigns
  31. // and message pushes.
  32. type Manager struct {
  33. cfg Config
  34. src DataSource
  35. messengers map[string]messenger.Messenger
  36. notifCB models.AdminNotifCallback
  37. logger *log.Logger
  38. // Campaigns that are currently running.
  39. camps map[int]*models.Campaign
  40. campsMutex sync.RWMutex
  41. // Links generated using Track() are cached here so as to not query
  42. // the database for the link UUID for every message sent. This has to
  43. // be locked as it may be used externally when previewing campaigns.
  44. links map[string]string
  45. linksMutex sync.RWMutex
  46. subFetchQueue chan *models.Campaign
  47. campMsgQueue chan CampaignMessage
  48. campMsgErrorQueue chan msgError
  49. campMsgErrorCounts map[int]int
  50. msgQueue chan Message
  51. }
  52. // CampaignMessage represents an instance of campaign message to be pushed out,
  53. // specific to a subscriber, via the campaign's messenger.
  54. type CampaignMessage struct {
  55. Campaign *models.Campaign
  56. Subscriber models.Subscriber
  57. from string
  58. to string
  59. body []byte
  60. unsubURL string
  61. }
  62. // Message represents a generic message to be pushed to a messenger.
  63. type Message struct {
  64. From string
  65. To []string
  66. Subject string
  67. Body []byte
  68. Messenger string
  69. }
  70. // Config has parameters for configuring the manager.
  71. type Config struct {
  72. Concurrency int
  73. MaxSendErrors int
  74. RequeueOnError bool
  75. FromEmail string
  76. LinkTrackURL string
  77. UnsubURL string
  78. OptinURL string
  79. ViewTrackURL string
  80. }
  81. type msgError struct {
  82. camp *models.Campaign
  83. err error
  84. }
  85. // New returns a new instance of Mailer.
  86. func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager {
  87. return &Manager{
  88. cfg: cfg,
  89. src: src,
  90. notifCB: notifCB,
  91. logger: l,
  92. messengers: make(map[string]messenger.Messenger),
  93. camps: make(map[int]*models.Campaign),
  94. links: make(map[string]string),
  95. subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
  96. campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
  97. msgQueue: make(chan Message, cfg.Concurrency),
  98. campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
  99. campMsgErrorCounts: make(map[int]int),
  100. }
  101. }
  102. // NewCampaignMessage creates and returns a CampaignMessage that is made available
  103. // to message templates while they're compiled. It represents a message from
  104. // a campaign that's bound to a single Subscriber.
  105. func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) CampaignMessage {
  106. return CampaignMessage{
  107. Campaign: c,
  108. Subscriber: s,
  109. from: c.FromEmail,
  110. to: s.Email,
  111. unsubURL: fmt.Sprintf(m.cfg.UnsubURL, c.UUID, s.UUID),
  112. }
  113. }
  114. // AddMessenger adds a Messenger messaging backend to the manager.
  115. func (m *Manager) AddMessenger(msg messenger.Messenger) error {
  116. id := msg.Name()
  117. if _, ok := m.messengers[id]; ok {
  118. return fmt.Errorf("messenger '%s' is already loaded", id)
  119. }
  120. m.messengers[id] = msg
  121. return nil
  122. }
  123. // PushMessage pushes a Message to be sent out by the workers.
  124. func (m *Manager) PushMessage(msg Message) error {
  125. select {
  126. case m.msgQueue <- msg:
  127. case <-time.After(time.Second * 3):
  128. m.logger.Println("message push timed out: %'s'", msg.Subject)
  129. return errors.New("message push timed out")
  130. }
  131. return nil
  132. }
  133. // GetMessengerNames returns the list of registered messengers.
  134. func (m *Manager) GetMessengerNames() []string {
  135. names := make([]string, 0, len(m.messengers))
  136. for n := range m.messengers {
  137. names = append(names, n)
  138. }
  139. return names
  140. }
  141. // HasMessenger checks if a given messenger is registered.
  142. func (m *Manager) HasMessenger(id string) bool {
  143. _, ok := m.messengers[id]
  144. return ok
  145. }
  146. // Run is a blocking function (and hence should be invoked as a goroutine)
  147. // that scans the source db at regular intervals for pending campaigns,
  148. // and queues them for processing. The process queue fetches batches of
  149. // subscribers and pushes messages to them for each queued campaign
  150. // until all subscribers are exhausted, at which point, a campaign is marked
  151. // as "finished".
  152. func (m *Manager) Run(tick time.Duration) {
  153. go func() {
  154. t := time.NewTicker(tick)
  155. for {
  156. select {
  157. // Periodically scan the data source for campaigns to process.
  158. case <-t.C:
  159. campaigns, err := m.src.NextCampaigns(m.getPendingCampaignIDs())
  160. if err != nil {
  161. m.logger.Printf("error fetching campaigns: %v", err)
  162. continue
  163. }
  164. for _, c := range campaigns {
  165. if err := m.addCampaign(c); err != nil {
  166. m.logger.Printf("error processing campaign (%s): %v", c.Name, err)
  167. continue
  168. }
  169. m.logger.Printf("start processing campaign (%s)", c.Name)
  170. // If subscriber processing is busy, move on. Blocking and waiting
  171. // can end up in a race condition where the waiting campaign's
  172. // state in the data source has changed.
  173. select {
  174. case m.subFetchQueue <- c:
  175. default:
  176. }
  177. }
  178. // Aggregate errors from sending messages to check against the error threshold
  179. // after which a campaign is paused.
  180. case e := <-m.campMsgErrorQueue:
  181. if m.cfg.MaxSendErrors < 1 {
  182. continue
  183. }
  184. // If the error threshold is met, pause the campaign.
  185. m.campMsgErrorCounts[e.camp.ID]++
  186. if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
  187. m.logger.Printf("error counted exceeded %d. pausing campaign %s",
  188. m.cfg.MaxSendErrors, e.camp.Name)
  189. if m.isCampaignProcessing(e.camp.ID) {
  190. m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
  191. }
  192. delete(m.campMsgErrorCounts, e.camp.ID)
  193. // Notify admins.
  194. m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
  195. }
  196. }
  197. }
  198. }()
  199. // Fetch the next set of subscribers for a campaign and process them.
  200. for c := range m.subFetchQueue {
  201. has, err := m.nextSubscribers(c, batchSize)
  202. if err != nil {
  203. m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
  204. continue
  205. }
  206. if has {
  207. // There are more subscribers to fetch.
  208. m.subFetchQueue <- c
  209. } else if m.isCampaignProcessing(c.ID) {
  210. // There are no more subscribers. Either the campaign status
  211. // has changed or all subscribers have been processed.
  212. newC, err := m.exhaustCampaign(c, "")
  213. if err != nil {
  214. m.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
  215. continue
  216. }
  217. m.sendNotif(newC, newC.Status, "")
  218. }
  219. }
  220. }
  221. // SpawnWorkers spawns workers goroutines that push out messages.
  222. func (m *Manager) SpawnWorkers() {
  223. for i := 0; i < m.cfg.Concurrency; i++ {
  224. go func() {
  225. for {
  226. select {
  227. // Campaign message.
  228. case msg := <-m.campMsgQueue:
  229. if !m.isCampaignProcessing(msg.Campaign.ID) {
  230. continue
  231. }
  232. err := m.messengers[msg.Campaign.MessengerID].Push(
  233. msg.from, []string{msg.to}, msg.Campaign.Subject, msg.body, nil)
  234. if err != nil {
  235. m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err)
  236. select {
  237. case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
  238. default:
  239. }
  240. }
  241. // Arbitrary message.
  242. case msg := <-m.msgQueue:
  243. err := m.messengers[msg.Messenger].Push(
  244. msg.From, msg.To, msg.Subject, msg.Body, nil)
  245. if err != nil {
  246. m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
  247. }
  248. }
  249. }
  250. }()
  251. }
  252. }
  253. // TemplateFuncs returns the template functions to be applied into
  254. // compiled campaign templates.
  255. func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap {
  256. return template.FuncMap{
  257. "TrackLink": func(url string, msg *CampaignMessage) string {
  258. return m.trackLink(url, msg.Campaign.UUID, msg.Subscriber.UUID)
  259. },
  260. "TrackView": func(msg *CampaignMessage) template.HTML {
  261. return template.HTML(fmt.Sprintf(`<img src="%s" alt="" />`,
  262. fmt.Sprintf(m.cfg.ViewTrackURL, msg.Campaign.UUID, msg.Subscriber.UUID)))
  263. },
  264. "UnsubscribeURL": func(msg *CampaignMessage) string {
  265. return msg.unsubURL
  266. },
  267. "OptinURL": func(msg *CampaignMessage) string {
  268. // Add list IDs.
  269. // TODO: Show private lists list on optin e-mail
  270. return fmt.Sprintf(m.cfg.OptinURL, msg.Subscriber.UUID, "")
  271. },
  272. "Date": func(layout string) string {
  273. if layout == "" {
  274. layout = time.ANSIC
  275. }
  276. return time.Now().Format(layout)
  277. },
  278. }
  279. }
  280. // addCampaign adds a campaign to the process queue.
  281. func (m *Manager) addCampaign(c *models.Campaign) error {
  282. // Validate messenger.
  283. if _, ok := m.messengers[c.MessengerID]; !ok {
  284. m.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
  285. return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
  286. }
  287. // Load the template.
  288. if err := c.CompileTemplate(m.TemplateFuncs(c)); err != nil {
  289. return err
  290. }
  291. // Add the campaign to the active map.
  292. m.campsMutex.Lock()
  293. m.camps[c.ID] = c
  294. m.campsMutex.Unlock()
  295. return nil
  296. }
  297. // getPendingCampaignIDs returns the IDs of campaigns currently being processed.
  298. func (m *Manager) getPendingCampaignIDs() []int64 {
  299. // Needs to return an empty slice in case there are no campaigns.
  300. m.campsMutex.RLock()
  301. ids := make([]int64, 0, len(m.camps))
  302. for _, c := range m.camps {
  303. ids = append(ids, int64(c.ID))
  304. }
  305. m.campsMutex.RUnlock()
  306. return ids
  307. }
  308. // nextSubscribers processes the next batch of subscribers in a given campaign.
  309. // If returns a bool indicating whether there any subscribers were processed
  310. // in the current batch or not. This can happen when all the subscribers
  311. // have been processed, or if a campaign has been paused or cancelled abruptly.
  312. func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
  313. // Fetch a batch of subscribers.
  314. subs, err := m.src.NextSubscribers(c.ID, batchSize)
  315. if err != nil {
  316. return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
  317. }
  318. // There are no subscribers.
  319. if len(subs) == 0 {
  320. return false, nil
  321. }
  322. // Push messages.
  323. for _, s := range subs {
  324. msg := m.NewCampaignMessage(c, s)
  325. if err := msg.Render(); err != nil {
  326. m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
  327. continue
  328. }
  329. // Push the message to the queue while blocking and waiting until
  330. // the queue is drained.
  331. m.campMsgQueue <- msg
  332. }
  333. return true, nil
  334. }
  335. // isCampaignProcessing checks if the campaign is bing processed.
  336. func (m *Manager) isCampaignProcessing(id int) bool {
  337. m.campsMutex.RLock()
  338. _, ok := m.camps[id]
  339. m.campsMutex.RUnlock()
  340. return ok
  341. }
  342. func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
  343. m.campsMutex.Lock()
  344. delete(m.camps, c.ID)
  345. m.campsMutex.Unlock()
  346. // A status has been passed. Change the campaign's status
  347. // without further checks.
  348. if status != "" {
  349. if err := m.src.UpdateCampaignStatus(c.ID, status); err != nil {
  350. m.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
  351. } else {
  352. m.logger.Printf("set campaign (%s) to %s", c.Name, status)
  353. }
  354. return c, nil
  355. }
  356. // Fetch the up-to-date campaign status from the source.
  357. cm, err := m.src.GetCampaign(c.ID)
  358. if err != nil {
  359. return nil, err
  360. }
  361. // If a running campaign has exhausted subscribers, it's finished.
  362. if cm.Status == models.CampaignStatusRunning {
  363. cm.Status = models.CampaignStatusFinished
  364. if err := m.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
  365. m.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
  366. } else {
  367. m.logger.Printf("campaign (%s) finished", c.Name)
  368. }
  369. } else {
  370. m.logger.Printf("stop processing campaign (%s)", c.Name)
  371. }
  372. return cm, nil
  373. }
  374. // trackLink register a URL and return its UUID to be used in message templates
  375. // for tracking links.
  376. func (m *Manager) trackLink(url, campUUID, subUUID string) string {
  377. m.linksMutex.RLock()
  378. if uu, ok := m.links[url]; ok {
  379. m.linksMutex.RUnlock()
  380. return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
  381. }
  382. m.linksMutex.RUnlock()
  383. // Register link.
  384. uu, err := m.src.CreateLink(url)
  385. if err != nil {
  386. m.logger.Printf("error registering tracking for link '%s': %v", url, err)
  387. // If the registration fails, fail over to the original URL.
  388. return url
  389. }
  390. m.linksMutex.Lock()
  391. m.links[url] = uu
  392. m.linksMutex.Unlock()
  393. return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
  394. }
  395. // sendNotif sends a notification to registered admin e-mails.
  396. func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error {
  397. var (
  398. subject = fmt.Sprintf("%s: %s", strings.Title(status), c.Name)
  399. data = map[string]interface{}{
  400. "ID": c.ID,
  401. "Name": c.Name,
  402. "Status": status,
  403. "Sent": c.Sent,
  404. "ToSend": c.ToSend,
  405. "Reason": reason,
  406. }
  407. )
  408. return m.notifCB(subject, data)
  409. }
  410. // Render takes a Message, executes its pre-compiled Campaign.Tpl
  411. // and applies the resultant bytes to Message.body to be used in messages.
  412. func (m *CampaignMessage) Render() error {
  413. out := bytes.Buffer{}
  414. if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
  415. return err
  416. }
  417. m.body = out.Bytes()
  418. return nil
  419. }
  420. // Body returns a copy of the message body.
  421. func (m *CampaignMessage) Body() []byte {
  422. out := make([]byte, len(m.body))
  423. copy(out, m.body)
  424. return out
  425. }