|
@@ -47,20 +47,26 @@ type Manager struct {
|
|
|
logger *log.Logger
|
|
|
|
|
|
// Campaigns that are currently running.
|
|
|
- camps map[int]*models.Campaign
|
|
|
- campsMutex sync.RWMutex
|
|
|
+ camps map[int]*models.Campaign
|
|
|
+ campsMut sync.RWMutex
|
|
|
|
|
|
// Links generated using Track() are cached here so as to not query
|
|
|
// the database for the link UUID for every message sent. This has to
|
|
|
// be locked as it may be used externally when previewing campaigns.
|
|
|
- links map[string]string
|
|
|
- linksMutex sync.RWMutex
|
|
|
+ links map[string]string
|
|
|
+ linksMut sync.RWMutex
|
|
|
|
|
|
subFetchQueue chan *models.Campaign
|
|
|
campMsgQueue chan CampaignMessage
|
|
|
campMsgErrorQueue chan msgError
|
|
|
campMsgErrorCounts map[int]int
|
|
|
msgQueue chan Message
|
|
|
+
|
|
|
+ // Sliding window keeps track of the total number of messages sent in a period
|
|
|
+ // and on reaching the specified limit, waits until the window is over before
|
|
|
+ // sending further messages.
|
|
|
+ slidingWindowNumMsg int
|
|
|
+ slidingWindowStart time.Time
|
|
|
}
|
|
|
|
|
|
// CampaignMessage represents an instance of campaign message to be pushed out,
|
|
@@ -90,18 +96,21 @@ type Config struct {
|
|
|
// Number of subscribers to pull from the DB in a single iteration.
|
|
|
BatchSize int
|
|
|
|
|
|
- Concurrency int
|
|
|
- MessageRate int
|
|
|
- MaxSendErrors int
|
|
|
- RequeueOnError bool
|
|
|
- FromEmail string
|
|
|
- IndividualTracking bool
|
|
|
- LinkTrackURL string
|
|
|
- UnsubURL string
|
|
|
- OptinURL string
|
|
|
- MessageURL string
|
|
|
- ViewTrackURL string
|
|
|
- UnsubHeader bool
|
|
|
+ Concurrency int
|
|
|
+ MessageRate int
|
|
|
+ MaxSendErrors int
|
|
|
+ SlidingWindow bool
|
|
|
+ SlidingWindowDuration time.Duration
|
|
|
+ SlidingWindowRate int
|
|
|
+ RequeueOnError bool
|
|
|
+ FromEmail string
|
|
|
+ IndividualTracking bool
|
|
|
+ LinkTrackURL string
|
|
|
+ UnsubURL string
|
|
|
+ OptinURL string
|
|
|
+ MessageURL string
|
|
|
+ ViewTrackURL string
|
|
|
+ UnsubHeader bool
|
|
|
}
|
|
|
|
|
|
type msgError struct {
|
|
@@ -135,6 +144,7 @@ func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, i *i18n.
|
|
|
msgQueue: make(chan Message, cfg.Concurrency),
|
|
|
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
|
|
|
campMsgErrorCounts: make(map[int]int),
|
|
|
+ slidingWindowStart: time.Now(),
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -185,8 +195,8 @@ func (m *Manager) HasMessenger(id string) bool {
|
|
|
|
|
|
// HasRunningCampaigns checks if there are any active campaigns.
|
|
|
func (m *Manager) HasRunningCampaigns() bool {
|
|
|
- m.campsMutex.Lock()
|
|
|
- defer m.campsMutex.Unlock()
|
|
|
+ m.campsMut.Lock()
|
|
|
+ defer m.campsMut.Unlock()
|
|
|
return len(m.camps) > 0
|
|
|
}
|
|
|
|
|
@@ -422,28 +432,28 @@ func (m *Manager) addCampaign(c *models.Campaign) error {
|
|
|
}
|
|
|
|
|
|
// Add the campaign to the active map.
|
|
|
- m.campsMutex.Lock()
|
|
|
+ m.campsMut.Lock()
|
|
|
m.camps[c.ID] = c
|
|
|
- m.campsMutex.Unlock()
|
|
|
+ m.campsMut.Unlock()
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// getPendingCampaignIDs returns the IDs of campaigns currently being processed.
|
|
|
func (m *Manager) getPendingCampaignIDs() []int64 {
|
|
|
// Needs to return an empty slice in case there are no campaigns.
|
|
|
- m.campsMutex.RLock()
|
|
|
+ m.campsMut.RLock()
|
|
|
ids := make([]int64, 0, len(m.camps))
|
|
|
for _, c := range m.camps {
|
|
|
ids = append(ids, int64(c.ID))
|
|
|
}
|
|
|
- m.campsMutex.RUnlock()
|
|
|
+ m.campsMut.RUnlock()
|
|
|
return ids
|
|
|
}
|
|
|
|
|
|
// nextSubscribers processes the next batch of subscribers in a given campaign.
|
|
|
-// If returns a bool indicating whether there any subscribers were processed
|
|
|
-// in the current batch or not. This can happen when all the subscribers
|
|
|
-// have been processed, or if a campaign has been paused or cancelled abruptly.
|
|
|
+// It returns a bool indicating whether any subscribers were processed
|
|
|
+// in the current batch or not. A false indicates that all subscribers
|
|
|
+// have been processed, or that a campaign has been paused or cancelled.
|
|
|
func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
|
|
|
// Fetch a batch of subscribers.
|
|
|
subs, err := m.src.NextSubscribers(c.ID, batchSize)
|
|
@@ -456,8 +466,14 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
|
|
|
return false, nil
|
|
|
}
|
|
|
|
|
|
+ // Is there a sliding window limit configured?
|
|
|
+ hasSliding := m.cfg.SlidingWindow &&
|
|
|
+ m.cfg.SlidingWindowRate > 0 &&
|
|
|
+ m.cfg.SlidingWindowDuration.Seconds() > 1
|
|
|
+
|
|
|
// Push messages.
|
|
|
for _, s := range subs {
|
|
|
+ // Send the message.
|
|
|
msg := m.NewCampaignMessage(c, s)
|
|
|
if err := msg.Render(); err != nil {
|
|
|
m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
|
|
@@ -467,6 +483,33 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
|
|
|
// Push the message to the queue while blocking and waiting until
|
|
|
// the queue is drained.
|
|
|
m.campMsgQueue <- msg
|
|
|
+
|
|
|
+ // Check if the sliding window is active.
|
|
|
+ if hasSliding {
|
|
|
+ diff := time.Now().Sub(m.slidingWindowStart)
|
|
|
+
|
|
|
+ // Window has expired. Reset the clock.
|
|
|
+ if diff >= m.cfg.SlidingWindowDuration {
|
|
|
+ m.slidingWindowStart = time.Now()
|
|
|
+ m.slidingWindowNumMsg = 0
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Have the messages exceeded the limit?
|
|
|
+ m.slidingWindowNumMsg++
|
|
|
+ if m.slidingWindowNumMsg >= m.cfg.SlidingWindowRate {
|
|
|
+ wait := m.cfg.SlidingWindowDuration - diff
|
|
|
+
|
|
|
+ m.logger.Printf("messages exceeded (%d) for the window (%v since %s). Sleeping for %s.",
|
|
|
+ m.slidingWindowNumMsg,
|
|
|
+ m.cfg.SlidingWindowDuration,
|
|
|
+ m.slidingWindowStart.Format(time.RFC822Z),
|
|
|
+ wait.Round(time.Second)*1)
|
|
|
+
|
|
|
+ m.slidingWindowNumMsg = 0
|
|
|
+ time.Sleep(wait)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return true, nil
|
|
@@ -474,16 +517,16 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro
|
|
|
|
|
|
// isCampaignProcessing checks if the campaign is bing processed.
|
|
|
func (m *Manager) isCampaignProcessing(id int) bool {
|
|
|
- m.campsMutex.RLock()
|
|
|
+ m.campsMut.RLock()
|
|
|
_, ok := m.camps[id]
|
|
|
- m.campsMutex.RUnlock()
|
|
|
+ m.campsMut.RUnlock()
|
|
|
return ok
|
|
|
}
|
|
|
|
|
|
func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
|
|
|
- m.campsMutex.Lock()
|
|
|
+ m.campsMut.Lock()
|
|
|
delete(m.camps, c.ID)
|
|
|
- m.campsMutex.Unlock()
|
|
|
+ m.campsMut.Unlock()
|
|
|
|
|
|
// A status has been passed. Change the campaign's status
|
|
|
// without further checks.
|
|
@@ -520,12 +563,12 @@ func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Ca
|
|
|
// trackLink register a URL and return its UUID to be used in message templates
|
|
|
// for tracking links.
|
|
|
func (m *Manager) trackLink(url, campUUID, subUUID string) string {
|
|
|
- m.linksMutex.RLock()
|
|
|
+ m.linksMut.RLock()
|
|
|
if uu, ok := m.links[url]; ok {
|
|
|
- m.linksMutex.RUnlock()
|
|
|
+ m.linksMut.RUnlock()
|
|
|
return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
|
|
|
}
|
|
|
- m.linksMutex.RUnlock()
|
|
|
+ m.linksMut.RUnlock()
|
|
|
|
|
|
// Register link.
|
|
|
uu, err := m.src.CreateLink(url)
|
|
@@ -536,9 +579,9 @@ func (m *Manager) trackLink(url, campUUID, subUUID string) string {
|
|
|
return url
|
|
|
}
|
|
|
|
|
|
- m.linksMutex.Lock()
|
|
|
+ m.linksMut.Lock()
|
|
|
m.links[url] = uu
|
|
|
- m.linksMutex.Unlock()
|
|
|
+ m.linksMut.Unlock()
|
|
|
|
|
|
return fmt.Sprintf(m.cfg.LinkTrackURL, uu, campUUID, subUUID)
|
|
|
}
|