Browse Source

Make DB pull batch size in campaign manager configurable

Kailash Nadh 5 years ago
parent
commit
79dd916d09
3 changed files with 20 additions and 3 deletions
  1. 5 0
      config.toml.sample
  2. 1 0
      init.go
  3. 14 3
      internal/manager/manager.go

+ 5 - 0
config.toml.sample

@@ -42,6 +42,11 @@ message_rate = 5
 # investigation or intervention. Set to 0 to never pause.
 # investigation or intervention. Set to 0 to never pause.
 max_send_errors = 1000
 max_send_errors = 1000
 
 
+# The number of subscribers to pull from the databse in a single iteration.
+# Each iteration pulls subscribers from the database, sends messages to them,
+# and then moves on to the next iteration to pull the next batch.
+# This should ideally be higher than the maximum achievable throughput (concurrency * message_rate)
+batch_size = 1000
 
 
 [privacy]
 [privacy]
 # Allow subscribers to unsubscribe from all mailing lists and mark themselves
 # Allow subscribers to unsubscribe from all mailing lists and mark themselves

+ 1 - 0
init.go

@@ -195,6 +195,7 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
 	}
 	}
 
 
 	return manager.New(manager.Config{
 	return manager.New(manager.Config{
+		BatchSize:     ko.Int("app.batch_size"),
 		Concurrency:   ko.Int("app.concurrency"),
 		Concurrency:   ko.Int("app.concurrency"),
 		MessageRate:   ko.Int("app.message_rate"),
 		MessageRate:   ko.Int("app.message_rate"),
 		MaxSendErrors: ko.Int("app.max_send_errors"),
 		MaxSendErrors: ko.Int("app.max_send_errors"),

+ 14 - 3
internal/manager/manager.go

@@ -15,8 +15,6 @@ import (
 )
 )
 
 
 const (
 const (
-	batchSize = 10000
-
 	// BaseTPL is the name of the base template.
 	// BaseTPL is the name of the base template.
 	BaseTPL = "base"
 	BaseTPL = "base"
 
 
@@ -84,6 +82,9 @@ type Message struct {
 
 
 // Config has parameters for configuring the manager.
 // Config has parameters for configuring the manager.
 type Config struct {
 type Config struct {
+	// Number of subscribers to pull from the DB in a single iteration.
+	BatchSize int
+
 	Concurrency    int
 	Concurrency    int
 	MessageRate    int
 	MessageRate    int
 	MaxSendErrors  int
 	MaxSendErrors  int
@@ -103,6 +104,16 @@ type msgError struct {
 
 
 // New returns a new instance of Mailer.
 // New returns a new instance of Mailer.
 func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager {
 func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager {
+	if cfg.BatchSize < 1 {
+		cfg.BatchSize = 1000
+	}
+	if cfg.Concurrency < 1 {
+		cfg.Concurrency = 1
+	}
+	if cfg.MessageRate < 1 {
+		cfg.MessageRate = 1
+	}
+
 	return &Manager{
 	return &Manager{
 		cfg:                cfg,
 		cfg:                cfg,
 		src:                src,
 		src:                src,
@@ -232,7 +243,7 @@ func (m *Manager) Run(tick time.Duration) {
 
 
 	// Fetch the next set of subscribers for a campaign and process them.
 	// Fetch the next set of subscribers for a campaign and process them.
 	for c := range m.subFetchQueue {
 	for c := range m.subFetchQueue {
-		has, err := m.nextSubscribers(c, batchSize)
+		has, err := m.nextSubscribers(c, m.cfg.BatchSize)
 		if err != nil {
 		if err != nil {
 			m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
 			m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
 			continue
 			continue