ignore duplicate data points
This commit is contained in:
parent
c325c2765d
commit
d9a3819ef5
4 changed files with 57 additions and 15 deletions
|
@ -131,11 +131,13 @@ func (c *Controller) UsageMetrics(gctx *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.DBClient.CreateMetric(generatedType, generatedBy, collectedAt, string(jsonPayload)); err != nil {
|
if _, err := c.DBClient.CreateMetric(generatedType, generatedBy, collectedAt, string(jsonPayload)); err != nil {
|
||||||
log.Errorf("Failed to store usage metrics: %s", err)
|
log.Error(err)
|
||||||
c.HandleDBErrors(gctx, err)
|
c.HandleDBErrors(gctx, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// empty body
|
// if CreateMetrics() returned nil, the metric was already there, we're good
|
||||||
|
// and don't split hair about 201 vs 200/204
|
||||||
|
|
||||||
gctx.Status(http.StatusCreated)
|
gctx.Status(http.StatusCreated)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,11 @@ import (
|
||||||
"github.com/crowdsecurity/go-cs-lib/ptr"
|
"github.com/crowdsecurity/go-cs-lib/ptr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMetricsInterval = 30 * time.Minute
|
||||||
|
minimumMetricsInterval = 15 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
// CrowdsecServiceCfg contains the location of parsers/scenarios/... and acquisition files
|
// CrowdsecServiceCfg contains the location of parsers/scenarios/... and acquisition files
|
||||||
type CrowdsecServiceCfg struct {
|
type CrowdsecServiceCfg struct {
|
||||||
Enable *bool `yaml:"enable"`
|
Enable *bool `yaml:"enable"`
|
||||||
|
@ -143,11 +148,6 @@ func (c *Config) LoadCrowdsec() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
defaultMetricsInterval = 30 * time.Second
|
|
||||||
minimumMetricsInterval = 15 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *CrowdsecServiceCfg) setMetricsInterval() {
|
func (c *CrowdsecServiceCfg) setMetricsInterval() {
|
||||||
switch {
|
switch {
|
||||||
case c.MetricsInterval == nil:
|
case c.MetricsInterval == nil:
|
||||||
|
|
|
@ -7,15 +7,24 @@ import (
|
||||||
"github.com/go-co-op/gocron"
|
"github.com/go-co-op/gocron"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/crowdsecurity/go-cs-lib/ptr"
|
||||||
|
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/metric"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// how long to keep metrics in the local database
|
||||||
|
defaultMetricsMaxAge = 7 * 24 * time.Hour
|
||||||
|
flushInterval = 1 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
|
func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
|
||||||
maxItems := 0
|
maxItems := 0
|
||||||
|
@ -32,7 +41,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
|
||||||
|
|
||||||
// Init & Start cronjob every minute for alerts
|
// Init & Start cronjob every minute for alerts
|
||||||
scheduler := gocron.NewScheduler(time.UTC)
|
scheduler := gocron.NewScheduler(time.UTC)
|
||||||
job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
|
job, err := scheduler.Every(flushInterval).Do(c.FlushAlerts, maxAge, maxItems)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
|
return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -77,19 +86,45 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
|
||||||
log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)")
|
log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
|
baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
|
return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
baJob.SingletonMode()
|
baJob.SingletonMode()
|
||||||
scheduler.StartAsync()
|
|
||||||
|
|
||||||
// TODO: flush metrics here (MetricsMaxAge)
|
metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsJob.SingletonMode()
|
||||||
|
|
||||||
|
scheduler.StartAsync()
|
||||||
|
|
||||||
return scheduler, nil
|
return scheduler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not
|
||||||
|
func (c *Client) flushMetrics(maxAge *time.Duration) {
|
||||||
|
if maxAge == nil {
|
||||||
|
maxAge = ptr.Of(defaultMetricsMaxAge)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Log.Debugf("flushing metrics older than %s", maxAge)
|
||||||
|
|
||||||
|
deleted, err := c.Ent.Metric.Delete().Where(
|
||||||
|
metric.CollectedAtLTE(time.Now().UTC().Add(-*maxAge)),
|
||||||
|
).Exec(c.CTX)
|
||||||
|
if err != nil {
|
||||||
|
c.Log.Errorf("while flushing metrics: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleted > 0 {
|
||||||
|
c.Log.Debugf("flushed %d metrics snapshots", deleted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) FlushOrphans() {
|
func (c *Client) FlushOrphans() {
|
||||||
/* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
|
/* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
|
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/database/ent/metric"
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/metric"
|
||||||
)
|
)
|
||||||
|
@ -26,9 +25,15 @@ func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy st
|
||||||
SetPayload(payload).
|
SetPayload(payload).
|
||||||
Save(c.CTX)
|
Save(c.CTX)
|
||||||
|
|
||||||
if err != nil {
|
switch {
|
||||||
|
case ent.IsConstraintError(err):
|
||||||
|
// pretty safe guess, it's the unique index
|
||||||
|
c.Log.Infof("storing metrics snapshot for '%s' at %s: already exists", generatedBy, collectedAt)
|
||||||
|
// it's polite to accept a duplicate snapshot without any error
|
||||||
|
return nil, nil
|
||||||
|
case err != nil:
|
||||||
c.Log.Warningf("CreateMetric: %s", err)
|
c.Log.Warningf("CreateMetric: %s", err)
|
||||||
return nil, errors.Wrapf(InsertFail, "creating metrics set for '%s' at %s", generatedBy, collectedAt)
|
return nil, fmt.Errorf("storing metrics snapshot for '%s' at %s: %w", generatedBy, collectedAt, InsertFail)
|
||||||
}
|
}
|
||||||
|
|
||||||
return metric, nil
|
return metric, nil
|
||||||
|
|
Loading…
Reference in a new issue