Add accurate realtime message rate counter.

The `rate` field `/api/campaigns/running/stats` returned was computed
based on the total time spent from the start of the campaign to the
current time. This meant that for large campaigns, if there were
pauses or slowdowns in between, the rate would be skewed heavily
making it useless to figure out the current send rate.

This commit introduces a realtime running rate counter in the campaign
manager that returns accurate (running) send rates for the last minute.

The `rate` field in the API now shows the live running rate and a
new `net_rate` field shows the rate from the beginning of the campaign.
This commit is contained in:
Kailash Nadh 2022-02-06 11:38:02 +05:30
parent 1b163d1895
commit 0f6a0376da
20 changed files with 74 additions and 18 deletions

View file

@ -67,7 +67,8 @@ type campaignStats struct {
Sent int `db:"sent" json:"sent"`
Started null.Time `db:"started_at" json:"started_at"`
UpdatedAt null.Time `db:"updated_at" json:"updated_at"`
Rate float64 `json:"rate"`
Rate int `json:"rate"`
NetRate int `json:"net_rate"`
}
type campsWrap struct {
@ -522,17 +523,21 @@ func handleGetRunningCampaignStats(c echo.Context) error {
// Compute rate.
for i, c := range out {
if c.Started.Valid && c.UpdatedAt.Valid {
diff := c.UpdatedAt.Time.Sub(c.Started.Time).Minutes()
if diff > 0 {
var (
sent = float64(c.Sent)
rate = sent / diff
)
if rate > sent || rate > float64(c.ToSend) {
rate = sent
}
out[i].Rate = rate
diff := int(c.UpdatedAt.Time.Sub(c.Started.Time).Minutes())
if diff < 1 {
diff = 1
}
rate := c.Sent / diff
if rate > c.Sent || rate > c.ToSend {
rate = c.Sent
}
// Rate since the starting of the campaign.
out[i].NetRate = rate
// Realtime running rate over the last minute.
out[i].Rate = app.manager.GetCampaignStats(c.ID).SendRate
}
}

View file

@ -110,7 +110,7 @@
<span>{{ $utils.niceDate(stats.updatedAt, true) }}</span>
</p>
<p v-if="stats.startedAt && stats.updatedAt"
class="is-capitalized" title="Duration">
class="is-capitalized">
<label><b-icon icon="alarm" size="is-small" /></label>
<span>{{ $utils.duration(stats.startedAt, stats.updatedAt) }}</span>
</p>
@ -142,10 +142,15 @@
</router-link>
</span>
</p>
<p title="Speed" v-if="stats.rate">
<p v-if="stats.rate">
<label><b-icon icon="speedometer" size="is-small"></b-icon></label>
<span class="send-rate">
{{ stats.rate.toFixed(0) }} / min
<b-tooltip
:label="`${stats.netRate} / ${$t('campaigns.rateMinuteShort')} @
${$utils.duration(stats.startedAt, stats.updatedAt)}`"
type="is-dark">
{{ stats.rate.toFixed(0) }} / {{ $t('campaigns.rateMinuteShort') }}
</b-tooltip>
</span>
</p>
<p v-if="isRunning(props.row.id)">

1
go.mod
View file

@ -24,6 +24,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/paulbellamy/ratecounter v0.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/rhnvrm/simples3 v0.8.1
github.com/spf13/cast v1.4.1 // indirect

4
go.sum
View file

@ -134,6 +134,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
@ -142,8 +144,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rhnvrm/simples3 v0.8.0 h1:SAjJtsqObltKkejIGl3WgyySq2xdrfwZWXi6njFluuA=
github.com/rhnvrm/simples3 v0.8.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rhnvrm/simples3 v0.8.1 h1:jL2yCi9P0pA8hFYkyVWZ4cs5RX3AMgcVsXTOqnCj0/w=
github.com/rhnvrm/simples3 v0.8.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Náhled",
"campaigns.progress": "Průběh",
"campaigns.queryPlaceholder": "Jméno nebo předmět",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Prvotní HTML",
"campaigns.removeAltText": "Odebrat alternativní zprávu ve formátu prostého textu",
"campaigns.richText": "Formátovaný text",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Vorschau",
"campaigns.progress": "Fortschritt",
"campaigns.queryPlaceholder": "Name oder Betreff",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML Code",
"campaigns.removeAltText": "Lösche den alternativen unformatierten Text",
"campaigns.richText": "Rich-Text",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Preview",
"campaigns.progress": "Progress",
"campaigns.queryPlaceholder": "Name or subject",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Raw HTML",
"campaigns.removeAltText": "Remove alternate plain text message",
"campaigns.richText": "Rich text",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Vista previa",
"campaigns.progress": "Progreso",
"campaigns.queryPlaceholder": "Nombre o asunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML crudo",
"campaigns.removeAltText": "Eliminar mensaje en texto plano alternativo",
"campaigns.richText": "Texto enriquecido",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Aperçu",
"campaigns.progress": "Avancement",
"campaigns.queryPlaceholder": "Nom ou objet",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML brut",
"campaigns.removeAltText": "Supprimer le message alternatif en texte brut",
"campaigns.richText": "Texte riche",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Előnézet",
"campaigns.progress": "Folyamatban",
"campaigns.queryPlaceholder": "Név vagy tárgy",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Nyers (Raw) HTML",
"campaigns.removeAltText": "Alternatív egyszerű szöveges üzenet eltávolítása",
"campaigns.richText": "Rich text",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Anteprima",
"campaigns.progress": "Avanzamento",
"campaigns.queryPlaceholder": "Nome o oggetto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML semplice",
"campaigns.removeAltText": "Cancellare il messaggio sostitutivo in testo semplice",
"campaigns.richText": "Testo formattato",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "പ്രിവ്യൂ",
"campaigns.progress": "പുരോഗതി",
"campaigns.queryPlaceholder": "പേരോ വിഷയമോ",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "അസംസ്കൃത എച്. ടി. എം. എൽ",
"campaigns.removeAltText": "Remove alternate plain text message",
"campaigns.richText": "റിച്ച് ടെക്സ്റ്റ്",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Voorbeeld",
"campaigns.progress": "Voortgang",
"campaigns.queryPlaceholder": "Naam of onderwerp",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML code",
"campaigns.removeAltText": "Verwijder plain text bericht",
"campaigns.richText": "Rich text",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Podgląd",
"campaigns.progress": "Postęp",
"campaigns.queryPlaceholder": "Nazwa lub temat",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Raw HTML",
"campaigns.removeAltText": "Usuń alternatywną treść typu plain text",
"campaigns.richText": "Wzbogacony format tekstowy (Rich text)",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Pré-visualizar",
"campaigns.progress": "Progresso",
"campaigns.queryPlaceholder": "Nome ou assunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Código HTML",
"campaigns.removeAltText": "Remover mensagem alternativa em texto simples",
"campaigns.richText": "Texto com formatação",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Pré-visualizar",
"campaigns.progress": "Progresso",
"campaigns.queryPlaceholder": "Nome ou assunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML simples",
"campaigns.removeAltText": "Remover mensagem alternativa em texto simples",
"campaigns.richText": "Texto rico",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Previzualizare",
"campaigns.progress": "Progres",
"campaigns.queryPlaceholder": "Numele sau subiectul",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML brut",
"campaigns.removeAltText": "Eliminați un mesaj text alternativ",
"campaigns.richText": "Text îmbogățit",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Предпросмотр",
"campaigns.progress": "Прогресс",
"campaigns.queryPlaceholder": "Имя темы",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Необработанный HTML",
"campaigns.removeAltText": "Удалить альтернативное простое текстовое сообщение",
"campaigns.richText": "Форматированный текст",

View file

@ -57,6 +57,7 @@
"campaigns.preview": "Önizleme",
"campaigns.progress": "İlerleme durumu",
"campaigns.queryPlaceholder": "İsim veya konu",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Ham HTML",
"campaigns.removeAltText": "Alternatif düz yazıyı kaldır",
"campaigns.richText": "Zengin metin",

View file

@ -15,6 +15,7 @@ import (
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
"github.com/paulbellamy/ratecounter"
)
const (
@ -39,6 +40,11 @@ type Store interface {
DeleteSubscriber(id int64) error
}
// CampStats contains campaign stats like per minute send rate.
type CampStats struct {
SendRate int
}
// Manager handles the scheduling, processing, and queuing of campaigns
// and message pushes.
type Manager struct {
@ -50,8 +56,9 @@ type Manager struct {
logger *log.Logger
// Campaigns that are currently running.
camps map[int]*models.Campaign
campsMut sync.RWMutex
camps map[int]*models.Campaign
campRates map[int]*ratecounter.RateCounter
campsMut sync.RWMutex
// Links generated using Track() are cached here so as to not query
// the database for the link UUID for every message sent. This has to
@ -153,6 +160,7 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
logger: l,
messengers: make(map[string]messenger.Messenger),
camps: make(map[int]*models.Campaign),
campRates: make(map[int]*ratecounter.RateCounter),
links: make(map[string]string),
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
@ -237,6 +245,19 @@ func (m *Manager) HasRunningCampaigns() bool {
return len(m.camps) > 0
}
// GetCampaignStats returns campaign statistics.
func (m *Manager) GetCampaignStats(id int) CampStats {
n := 0
m.campsMut.Lock()
if r, ok := m.campRates[id]; ok {
n = int(r.Rate())
}
m.campsMut.Unlock()
return CampStats{SendRate: n}
}
// Run is a blocking function (that should be invoked as a goroutine)
// that scans the data source at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of
@ -337,9 +358,16 @@ func (m *Manager) worker() {
select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
default:
continue
}
}
m.campsMut.Lock()
if r, ok := m.campRates[msg.Campaign.ID]; ok {
r.Incr(1)
}
m.campsMut.Unlock()
// Arbitrary message.
case msg, ok := <-m.msgQueue:
if !ok {
@ -497,6 +525,7 @@ func (m *Manager) addCampaign(c *models.Campaign) error {
// Add the campaign to the active map.
m.campsMut.Lock()
m.camps[c.ID] = c
m.campRates[c.ID] = ratecounter.NewRateCounter(time.Minute)
m.campsMut.Unlock()
return nil
}
@ -589,6 +618,7 @@ func (m *Manager) isCampaignProcessing(id int) bool {
func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
m.campsMut.Lock()
delete(m.camps, c.ID)
delete(m.campRates, c.ID)
m.campsMut.Unlock()
// A status has been passed. Change the campaign's status