Alerts flush: Optimization of the flush mechanism (batch and limit to one job) + add cscli alerts flush
command (#1024)
- Don't allow running more than one alert flush job at a time to prevent runaway CPU usage in some case. (fix High CPU after Upgrade to 1.2.0 #1022) - Add a cscli alerts flush command to manually flush the alerts in the database (fixes Improvement/Manual flush mechanism #1023 ). - Enable cascading deletion on alerts as we upgraded ent: Deleting an alert in the database will automatically delete all related decisions, events and meta - Add an index on alerts.id to try to improve flush performance with very big sqlite database. - Flush alert now operates in batch
This commit is contained in:
parent
76a80380e7
commit
25a2d528b0
7 changed files with 121 additions and 26 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/database"
|
||||
"github.com/crowdsecurity/crowdsec/pkg/models"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/olekukonko/tablewriter"
|
||||
|
@ -469,5 +470,40 @@ cscli alerts delete -s crowdsecurity/ssh-bf"`,
|
|||
|
||||
cmdAlerts.AddCommand(cmdAlertsInspect)
|
||||
|
||||
var maxItems int
|
||||
var maxAge string
|
||||
var cmdAlertsFlush = &cobra.Command{
|
||||
Use: `flush`,
|
||||
Short: `Flush alerts
|
||||
/!\ This command can be used only on the same machine than the local API`,
|
||||
Example: `cscli alerts flush --max-items 1000 --max-age 7d`,
|
||||
DisableAutoGenTag: true,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
var err error
|
||||
if err := csConfig.LoadAPIServer(); err != nil || csConfig.DisableAPI {
|
||||
log.Fatal("Local API is disabled, please run this command on the local API machine")
|
||||
}
|
||||
if err := csConfig.LoadDBConfig(); err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
dbClient, err = database.NewClient(csConfig.DbConfig)
|
||||
if err != nil {
|
||||
log.Fatalf("unable to create new database client: %s", err)
|
||||
}
|
||||
log.Info("Flushing alerts. !! This may take a long time !!")
|
||||
err = dbClient.FlushAlerts(maxAge, maxItems)
|
||||
if err != nil {
|
||||
log.Fatalf("unable to flush alerts: %s", err)
|
||||
}
|
||||
log.Info("Alerts flushed")
|
||||
},
|
||||
}
|
||||
|
||||
cmdAlertsFlush.Flags().SortFlags = false
|
||||
cmdAlertsFlush.Flags().IntVar(&maxItems, "max-items", 5000, "Maximum number of alert items to keep in the database")
|
||||
cmdAlertsFlush.Flags().StringVar(&maxAge, "max-age", "7d", "Maximum age of alert items to keep in the database")
|
||||
|
||||
cmdAlerts.AddCommand(cmdAlertsFlush)
|
||||
|
||||
return cmdAlerts
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -26,7 +26,7 @@ require (
|
|||
github.com/enescakir/emoji v1.0.0
|
||||
github.com/fsnotify/fsnotify v1.4.9
|
||||
github.com/gin-gonic/gin v1.6.3
|
||||
github.com/go-co-op/gocron v0.5.1
|
||||
github.com/go-co-op/gocron v1.9.0
|
||||
github.com/go-openapi/errors v0.19.9
|
||||
github.com/go-openapi/strfmt v0.19.11
|
||||
github.com/go-openapi/swag v0.19.12
|
||||
|
|
10
go.sum
10
go.sum
|
@ -169,8 +169,8 @@ github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv
|
|||
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
|
||||
github.com/go-bindata/go-bindata v1.0.1-0.20190711162640-ee3c2418e368/go.mod h1:7xCgX1lzlrXPHkfvn3EhumqHkmSlzt8at9q7v0ax19c=
|
||||
github.com/go-co-op/gocron v0.5.1 h1:Cni1V7mt184+HnYTDYe6MH7siofCvf94PrGyIDI1v1U=
|
||||
github.com/go-co-op/gocron v0.5.1/go.mod h1:6Btk4lVj3bnFAgbVfr76W8impTyhYrEi1pV5Pt4Tp/M=
|
||||
github.com/go-co-op/gocron v1.9.0 h1:+V+DDenw3ryB7B+tK1bAIC5p0ruw4oX9IqAsdRnGIf0=
|
||||
github.com/go-co-op/gocron v1.9.0/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k=
|
||||
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
|
||||
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
|
@ -632,6 +632,8 @@ github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShE
|
|||
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
|
@ -815,6 +817,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -1004,8 +1007,9 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
|||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
|
||||
gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
|
||||
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
|
||||
|
|
|
@ -730,6 +730,24 @@ func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert,
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) {
|
||||
idList := make([]int, 0)
|
||||
for _, alert := range alertItems {
|
||||
idList = append(idList, int(alert.ID))
|
||||
}
|
||||
|
||||
deleted, err := c.Ent.Alert.Delete().
|
||||
Where(alert.IDIn(idList...)).Exec(c.CTX)
|
||||
if err != nil {
|
||||
c.Log.Warningf("DeleteAlertGraph : %s", err)
|
||||
return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch")
|
||||
}
|
||||
|
||||
c.Log.Debug("Done batch delete alerts")
|
||||
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error {
|
||||
// delete the associated events
|
||||
_, err := c.Ent.Event.Delete().
|
||||
|
@ -810,12 +828,15 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
|
|||
var totalAlerts int
|
||||
var err error
|
||||
|
||||
c.Log.Info("Flushing orphan alerts")
|
||||
c.FlushOrphans()
|
||||
c.Log.Info("Done flushing orphan alerts")
|
||||
totalAlerts, err = c.TotalAlerts()
|
||||
if err != nil {
|
||||
c.Log.Warningf("FlushAlerts (max items count) : %s", err)
|
||||
return errors.Wrap(err, "unable to get alerts count")
|
||||
}
|
||||
c.Log.Infof("FlushAlerts (Total alerts): %d", totalAlerts)
|
||||
if MaxAge != "" {
|
||||
filter := map[string][]string{
|
||||
"created_before": {MaxAge},
|
||||
|
@ -825,29 +846,38 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
|
|||
c.Log.Warningf("FlushAlerts (max age) : %s", err)
|
||||
return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge)
|
||||
}
|
||||
c.Log.Infof("FlushAlerts (deleted max age alerts): %d", nbDeleted)
|
||||
deletedByAge = nbDeleted
|
||||
}
|
||||
if MaxItems > 0 {
|
||||
if totalAlerts > MaxItems {
|
||||
nbToDelete := totalAlerts - MaxItems
|
||||
batchSize := 500
|
||||
if batchSize > nbToDelete {
|
||||
batchSize = nbToDelete
|
||||
}
|
||||
deleted := 0
|
||||
for deleted < nbToDelete {
|
||||
c.Log.Infof("FlushAlerts (before query with filter) to delete: %d", nbToDelete)
|
||||
alerts, err := c.QueryAlertWithFilter(map[string][]string{
|
||||
"sort": {"ASC"},
|
||||
"limit": {strconv.Itoa(nbToDelete)},
|
||||
"limit": {strconv.Itoa(batchSize)},
|
||||
}) // we want to delete older alerts if we reach the max number of items
|
||||
if err != nil {
|
||||
c.Log.Warningf("FlushAlerts (max items query) : %s", err)
|
||||
return errors.Wrap(err, "unable to get all alerts")
|
||||
}
|
||||
for itemNb, alert := range alerts {
|
||||
if itemNb < nbToDelete {
|
||||
err := c.DeleteAlertGraph(alert)
|
||||
deletedAlerts, err := c.DeleteAlertGraphBatch(alerts)
|
||||
if err != nil {
|
||||
c.Log.Warningf("FlushAlerts : %s", err)
|
||||
return errors.Wrap(err, "unable to flush alert")
|
||||
c.Log.Warningf("FlushAlerts (max items query) : %s", err)
|
||||
return errors.Wrap(err, "unable to delete alerts")
|
||||
}
|
||||
deletedByNbItem++
|
||||
deleted += deletedAlerts
|
||||
if nbToDelete-deleted < batchSize {
|
||||
batchSize = nbToDelete - deleted
|
||||
}
|
||||
}
|
||||
deletedByNbItem = deleted
|
||||
}
|
||||
}
|
||||
if deletedByNbItem > 0 {
|
||||
|
|
|
@ -100,7 +100,8 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
|
|||
}
|
||||
// Init & Start cronjob every minute
|
||||
scheduler := gocron.NewScheduler(time.UTC)
|
||||
scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
|
||||
job, _ := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
|
||||
job.SingletonMode()
|
||||
scheduler.StartAsync()
|
||||
|
||||
return scheduler, nil
|
||||
|
|
|
@ -48,6 +48,13 @@ var (
|
|||
OnDelete: schema.SetNull,
|
||||
},
|
||||
},
|
||||
Indexes: []*schema.Index{
|
||||
{
|
||||
Name: "alert_id",
|
||||
Unique: false,
|
||||
Columns: []*schema.Column{AlertsColumns[0]},
|
||||
},
|
||||
},
|
||||
}
|
||||
// BouncersColumns holds the columns for the "bouncers" table.
|
||||
BouncersColumns = []*schema.Column{
|
||||
|
@ -98,7 +105,7 @@ var (
|
|||
Symbol: "decisions_alerts_decisions",
|
||||
Columns: []*schema.Column{DecisionsColumns[15]},
|
||||
RefColumns: []*schema.Column{AlertsColumns[0]},
|
||||
OnDelete: schema.SetNull,
|
||||
OnDelete: schema.Cascade,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -121,7 +128,7 @@ var (
|
|||
Symbol: "events_alerts_events",
|
||||
Columns: []*schema.Column{EventsColumns[5]},
|
||||
RefColumns: []*schema.Column{AlertsColumns[0]},
|
||||
OnDelete: schema.SetNull,
|
||||
OnDelete: schema.Cascade,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -163,7 +170,7 @@ var (
|
|||
Symbol: "meta_alerts_metas",
|
||||
Columns: []*schema.Column{MetaColumns[5]},
|
||||
RefColumns: []*schema.Column{AlertsColumns[0]},
|
||||
OnDelete: schema.SetNull,
|
||||
OnDelete: schema.Cascade,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -4,8 +4,10 @@ import (
|
|||
"time"
|
||||
|
||||
"entgo.io/ent"
|
||||
"entgo.io/ent/dialect/entsql"
|
||||
"entgo.io/ent/schema/edge"
|
||||
"entgo.io/ent/schema/field"
|
||||
"entgo.io/ent/schema/index"
|
||||
)
|
||||
|
||||
// Alert holds the schema definition for the Alert entity.
|
||||
|
@ -56,8 +58,23 @@ func (Alert) Edges() []ent.Edge {
|
|||
edge.From("owner", Machine.Type).
|
||||
Ref("alerts").
|
||||
Unique(),
|
||||
edge.To("decisions", Decision.Type),
|
||||
edge.To("events", Event.Type),
|
||||
edge.To("metas", Meta.Type),
|
||||
edge.To("decisions", Decision.Type).
|
||||
Annotations(entsql.Annotation{
|
||||
OnDelete: entsql.Cascade,
|
||||
}),
|
||||
edge.To("events", Event.Type).
|
||||
Annotations(entsql.Annotation{
|
||||
OnDelete: entsql.Cascade,
|
||||
}),
|
||||
edge.To("metas", Meta.Type).
|
||||
Annotations(entsql.Annotation{
|
||||
OnDelete: entsql.Cascade,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (Alert) Indexes() []ent.Index {
|
||||
return []ent.Index{
|
||||
index.Fields("id"),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue