runner.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. package runner
  2. import (
  3. "bytes"
  4. "fmt"
  5. "html/template"
  6. "log"
  7. "sync"
  8. "time"
  9. "github.com/knadh/listmonk/messenger"
  10. "github.com/knadh/listmonk/models"
  11. )
  12. const (
  13. batchSize = 10000
  14. // BaseTPL is the name of the base template.
  15. BaseTPL = "base"
  16. // ContentTpl is the name of the compiled message.
  17. ContentTpl = "content"
  18. )
  19. // DataSource represents a data backend, such as a database,
  20. // that provides subscriber and campaign records.
  21. type DataSource interface {
  22. NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
  23. NextSubscribers(campID, limit int) ([]*models.Subscriber, error)
  24. GetCampaign(campID int) (*models.Campaign, error)
  25. UpdateCampaignStatus(campID int, status string) error
  26. CreateLink(url string) (string, error)
  27. }
  28. // Runner handles the scheduling, processing, and queuing of campaigns
  29. // and message pushes.
  30. type Runner struct {
  31. cfg Config
  32. src DataSource
  33. messengers map[string]messenger.Messenger
  34. logger *log.Logger
  35. // Campaigns that are currently running.
  36. camps map[int]*models.Campaign
  37. // Links generated using Track() are cached here so as to not query
  38. // the database for the link UUID for every message sent. This has to
  39. // be locked as it may be used externally when previewing campaigns.
  40. links map[string]string
  41. linksMutex sync.RWMutex
  42. subFetchQueue chan *models.Campaign
  43. msgQueue chan *Message
  44. msgErrorQueue chan msgError
  45. msgErrorCounts map[int]int
  46. }
  47. // Message represents an active subscriber that's being processed.
  48. type Message struct {
  49. Campaign *models.Campaign
  50. Subscriber *models.Subscriber
  51. UnsubscribeURL string
  52. Body []byte
  53. from string
  54. to string
  55. }
  56. // Config has parameters for configuring the runner.
  57. type Config struct {
  58. Concurrency int
  59. MaxSendErrors int
  60. RequeueOnError bool
  61. LinkTrackURL string
  62. UnsubscribeURL string
  63. ViewTrackURL string
  64. }
  65. type msgError struct {
  66. camp *models.Campaign
  67. err error
  68. }
  69. // New returns a new instance of Mailer.
  70. func New(cfg Config, src DataSource, l *log.Logger) *Runner {
  71. r := Runner{
  72. cfg: cfg,
  73. src: src,
  74. logger: l,
  75. messengers: make(map[string]messenger.Messenger),
  76. camps: make(map[int]*models.Campaign, 0),
  77. links: make(map[string]string, 0),
  78. subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
  79. msgQueue: make(chan *Message, cfg.Concurrency),
  80. msgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
  81. msgErrorCounts: make(map[int]int),
  82. }
  83. return &r
  84. }
  85. // NewMessage creates and returns a Message that is made available
  86. // to message templates while they're compiled.
  87. func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message {
  88. return &Message{
  89. from: c.FromEmail,
  90. to: s.Email,
  91. Campaign: c,
  92. Subscriber: s,
  93. UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID),
  94. }
  95. }
  96. // AddMessenger adds a Messenger messaging backend to the runner process.
  97. func (r *Runner) AddMessenger(msg messenger.Messenger) error {
  98. id := msg.Name()
  99. if _, ok := r.messengers[id]; ok {
  100. return fmt.Errorf("messenger '%s' is already loaded", id)
  101. }
  102. r.messengers[id] = msg
  103. return nil
  104. }
  105. // GetMessengerNames returns the list of registered messengers.
  106. func (r *Runner) GetMessengerNames() []string {
  107. var names []string
  108. for n := range r.messengers {
  109. names = append(names, n)
  110. }
  111. return names
  112. }
  113. // HasMessenger checks if a given messenger is registered.
  114. func (r *Runner) HasMessenger(id string) bool {
  115. _, ok := r.messengers[id]
  116. return ok
  117. }
  118. // Run is a blocking function (and hence should be invoked as a goroutine)
  119. // that scans the source db at regular intervals for pending campaigns,
  120. // and queues them for processing. The process queue fetches batches of
  121. // subscribers and pushes messages to them for each queued campaign
  122. // until all subscribers are exhausted, at which point, a campaign is marked
  123. // as "finished".
  124. func (r *Runner) Run(tick time.Duration) {
  125. go func() {
  126. t := time.NewTicker(tick)
  127. for {
  128. select {
  129. // Periodically scan the data source for campaigns to process.
  130. case <-t.C:
  131. campaigns, err := r.src.NextCampaigns(r.getPendingCampaignIDs())
  132. if err != nil {
  133. r.logger.Printf("error fetching campaigns: %v", err)
  134. continue
  135. }
  136. for _, c := range campaigns {
  137. if err := r.addCampaign(c); err != nil {
  138. r.logger.Printf("error processing campaign (%s): %v", c.Name, err)
  139. continue
  140. }
  141. r.logger.Printf("start processing campaign (%s)", c.Name)
  142. // If subscriber processing is busy, move on. Blocking and waiting
  143. // can end up in a race condition where the waiting campaign's
  144. // state in the data source has changed.
  145. select {
  146. case r.subFetchQueue <- c:
  147. default:
  148. }
  149. }
  150. // Aggregate errors from sending messages to check against the error threshold
  151. // after which a campaign is paused.
  152. case e := <-r.msgErrorQueue:
  153. if r.cfg.MaxSendErrors < 1 {
  154. continue
  155. }
  156. // If the error threshold is met, pause the campaign.
  157. r.msgErrorCounts[e.camp.ID]++
  158. if r.msgErrorCounts[e.camp.ID] >= r.cfg.MaxSendErrors {
  159. r.logger.Printf("error counted exceeded %d. pausing campaign %s",
  160. r.cfg.MaxSendErrors, e.camp.Name)
  161. if r.isCampaignProcessing(e.camp.ID) {
  162. r.exhaustCampaign(e.camp, models.CampaignStatusPaused)
  163. }
  164. delete(r.msgErrorCounts, e.camp.ID)
  165. }
  166. }
  167. }
  168. }()
  169. // Fetch the next set of subscribers for a campaign and process them.
  170. for c := range r.subFetchQueue {
  171. has, err := r.nextSubscribers(c, batchSize)
  172. if err != nil {
  173. r.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
  174. continue
  175. }
  176. if has {
  177. // There are more subscribers to fetch.
  178. r.subFetchQueue <- c
  179. } else {
  180. // There are no more subscribers. Either the campaign status
  181. // has changed or all subscribers have been processed.
  182. if err := r.exhaustCampaign(c, ""); err != nil {
  183. r.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
  184. }
  185. }
  186. }
  187. }
  188. // SpawnWorkers spawns workers goroutines that push out messages.
  189. func (r *Runner) SpawnWorkers() {
  190. for i := 0; i < r.cfg.Concurrency; i++ {
  191. go func() {
  192. for m := range r.msgQueue {
  193. if !r.isCampaignProcessing(m.Campaign.ID) {
  194. continue
  195. }
  196. err := r.messengers[m.Campaign.MessengerID].Push(
  197. m.from,
  198. m.to,
  199. m.Campaign.Subject,
  200. m.Body)
  201. if err != nil {
  202. r.logger.Printf("error sending message in campaign %s: %v",
  203. m.Campaign.Name, err)
  204. select {
  205. case r.msgErrorQueue <- msgError{camp: m.Campaign, err: err}:
  206. default:
  207. }
  208. }
  209. }
  210. }()
  211. }
  212. }
  213. // addCampaign adds a campaign to the process queue.
  214. func (r *Runner) addCampaign(c *models.Campaign) error {
  215. // Validate messenger.
  216. if _, ok := r.messengers[c.MessengerID]; !ok {
  217. r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
  218. return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
  219. }
  220. // Load the template.
  221. if err := c.CompileTemplate(r.TemplateFuncs(c)); err != nil {
  222. return err
  223. }
  224. // Add the campaign to the active map.
  225. r.camps[c.ID] = c
  226. return nil
  227. }
  228. // getPendingCampaignIDs returns the IDs of campaigns currently being processed.
  229. func (r *Runner) getPendingCampaignIDs() []int64 {
  230. // Needs to return an empty slice in case there are no campaigns.
  231. ids := make([]int64, 0)
  232. for _, c := range r.camps {
  233. ids = append(ids, int64(c.ID))
  234. }
  235. return ids
  236. }
  237. // nextSubscribers processes the next batch of subscribers in a given campaign.
  238. // If returns a bool indicating whether there any subscribers were processed
  239. // in the current batch or not. This can happen when all the subscribers
  240. // have been processed, or if a campaign has been paused or cancelled abruptly.
  241. func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
  242. // Fetch a batch of subscribers.
  243. subs, err := r.src.NextSubscribers(c.ID, batchSize)
  244. if err != nil {
  245. return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
  246. }
  247. // There are no subscribers.
  248. if len(subs) == 0 {
  249. return false, nil
  250. }
  251. // Push messages.
  252. for _, s := range subs {
  253. m := r.NewMessage(c, s)
  254. if err := m.Render(); err != nil {
  255. r.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
  256. continue
  257. }
  258. // Push the message to the queue while blocking and waiting until
  259. // the queue is drained.
  260. r.msgQueue <- m
  261. }
  262. return true, nil
  263. }
  264. // isCampaignProcessing checks if the campaign is bing processed.
  265. func (r *Runner) isCampaignProcessing(id int) bool {
  266. _, ok := r.camps[id]
  267. return ok
  268. }
  269. func (r *Runner) exhaustCampaign(c *models.Campaign, status string) error {
  270. delete(r.camps, c.ID)
  271. // A status has been passed. Change the campaign's status
  272. // without further checks.
  273. if status != "" {
  274. if err := r.src.UpdateCampaignStatus(c.ID, status); err != nil {
  275. r.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
  276. } else {
  277. r.logger.Printf("set campaign (%s) to %s", c.Name, status)
  278. }
  279. return nil
  280. }
  281. // Fetch the up-to-date campaign status from the source.
  282. cm, err := r.src.GetCampaign(c.ID)
  283. if err != nil {
  284. return err
  285. }
  286. // If a running campaign has exhausted subscribers, it's finished.
  287. if cm.Status == models.CampaignStatusRunning {
  288. if err := r.src.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
  289. r.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
  290. } else {
  291. r.logger.Printf("campaign (%s) finished", c.Name)
  292. }
  293. } else {
  294. r.logger.Printf("stop processing campaign (%s)", c.Name)
  295. }
  296. return nil
  297. }
  298. // Render takes a Message, executes its pre-compiled Campaign.Tpl
  299. // and applies the resultant bytes to Message.body to be used in messages.
  300. func (m *Message) Render() error {
  301. out := bytes.Buffer{}
  302. if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
  303. return err
  304. }
  305. m.Body = out.Bytes()
  306. return nil
  307. }
  308. // trackLink register a URL and return its UUID to be used in message templates
  309. // for tracking links.
  310. func (r *Runner) trackLink(url, campUUID, subUUID string) string {
  311. r.linksMutex.RLock()
  312. if uu, ok := r.links[url]; ok {
  313. r.linksMutex.RUnlock()
  314. return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
  315. }
  316. r.linksMutex.RUnlock()
  317. // Register link.
  318. uu, err := r.src.CreateLink(url)
  319. if err != nil {
  320. r.logger.Printf("error registering tracking for link '%s': %v", url, err)
  321. // If the registration fails, fail over to the original URL.
  322. return url
  323. }
  324. r.linksMutex.Lock()
  325. r.links[url] = uu
  326. r.linksMutex.Unlock()
  327. return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
  328. }
  329. // TemplateFuncs returns the template functions to be applied into
  330. // compiled campaign templates.
  331. func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
  332. return template.FuncMap{
  333. "TrackLink": func(url, campUUID, subUUID string) string {
  334. return r.trackLink(url, campUUID, subUUID)
  335. },
  336. "TrackView": func(campUUID, subUUID string) template.HTML {
  337. return template.HTML(fmt.Sprintf(`<img src="%s" alt="campaign" />`,
  338. fmt.Sprintf(r.cfg.ViewTrackURL, campUUID, subUUID)))
  339. },
  340. "Date": func(layout string) string {
  341. if layout == "" {
  342. layout = time.ANSIC
  343. }
  344. return time.Now().Format(layout)
  345. },
  346. }
  347. }