From e54f0cc44e58d896ea191a6e45246f54f73cad2c Mon Sep 17 00:00:00 2001 From: he2ss Date: Wed, 4 Oct 2023 16:11:39 +0200 Subject: [PATCH] enable by default ha feature --- cmd/crowdsec-cli/papi.go | 4 +-- pkg/apiserver/apic.go | 27 +++++---------- pkg/apiserver/apic_metrics.go | 29 +++++++--------- pkg/apiserver/apic_test.go | 2 +- pkg/apiserver/apiserver.go | 2 +- pkg/csconfig/api.go | 51 +++++++++++++--------------- pkg/database/ent/alert_query.go | 37 -------------------- pkg/database/ent/bouncer_query.go | 37 -------------------- pkg/database/ent/configitem_query.go | 37 -------------------- pkg/database/ent/decision_query.go | 37 -------------------- pkg/database/ent/event_query.go | 37 -------------------- pkg/database/ent/lock_query.go | 37 -------------------- pkg/database/ent/machine_query.go | 37 -------------------- pkg/database/ent/meta_query.go | 37 -------------------- pkg/database/lock.go | 17 ++++++---- 15 files changed, 59 insertions(+), 369 deletions(-) diff --git a/cmd/crowdsec-cli/papi.go b/cmd/crowdsec-cli/papi.go index b3405f1aa..d38da0df9 100644 --- a/cmd/crowdsec-cli/papi.go +++ b/cmd/crowdsec-cli/papi.go @@ -53,7 +53,7 @@ func NewPapiStatusCmd() *cobra.Command { log.Fatalf("unable to initialize database client : %s", err) } - apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists, csConfig.API.Server.HighAvailability) + apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists) if err != nil { log.Fatalf("unable to initialize API client : %s", err) @@ -103,7 +103,7 @@ func NewPapiSyncCmd() *cobra.Command { log.Fatalf("unable to initialize database client : %s", err) } - apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists, csConfig.API.Server.HighAvailability) + apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists) if err != nil { log.Fatalf("unable to initialize API client : %s", err) diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index b913305d5..b67f8e359 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -56,7 +56,6 @@ type apic struct { metricsIntervalFirst time.Duration dbClient *database.Client apiClient *apiclient.ApiClient - isHAEnabled bool AlertsAddChan chan []*models.Alert mu sync.Mutex @@ -163,7 +162,7 @@ func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) return signal } -func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist, haConfig *csconfig.HighAvailabilityCfg) (*apic, error) { +func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) { var err error ret := &apic{ @@ -187,10 +186,6 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con whitelists: apicWhitelist, } - if haConfig != nil && *haConfig.Enabled { - ret.isHAEnabled = *haConfig.Enabled - } - password := strfmt.Password(config.Credentials.Password) apiURL, err := url.Parse(config.Credentials.URL) if err != nil { @@ -576,13 +571,11 @@ func (a *apic) PullTop(forcePull bool) error { } } - if a.isHAEnabled { - log.Debug("Acquiring lock for pullCAPI") - err = a.dbClient.AcquirePullCAPILock() - if a.dbClient.IsLocked(err) { - log.Info("PullCAPI is already running, skipping") - return nil - } + log.Debug("Acquiring lock for pullCAPI") + err = a.dbClient.AcquirePullCAPILock() + if a.dbClient.IsLocked(err) { + log.Info("PullCAPI is already running, skipping") + return nil } log.Infof("Starting community-blocklist update") @@ -632,11 +625,9 @@ func (a *apic) PullTop(forcePull bool) error { return fmt.Errorf("while updating blocklists: %w", err) } - if a.isHAEnabled { - log.Debug("Releasing lock for pullCAPI") - if err := a.dbClient.ReleasePullCAPILock(); err != nil { - return fmt.Errorf("while releasing lock: %w", err) - } + log.Debug("Releasing lock for pullCAPI") + if err := a.dbClient.ReleasePullCAPILock(); err != nil { + return fmt.Errorf("while releasing lock: %w", err) } return nil } diff --git a/pkg/apiserver/apic_metrics.go b/pkg/apiserver/apic_metrics.go index ad90efb08..53ef0d7d6 100644 --- a/pkg/apiserver/apic_metrics.go +++ b/pkg/apiserver/apic_metrics.go @@ -127,16 +127,15 @@ func (a *apic) SendMetrics(stop chan (bool)) { } case <-metTicker.C: metTicker.Stop() - if a.isHAEnabled { - log.Debug("capi metrics: acquiring lock") - err := a.dbClient.AcquirePushMetricsLock() - if a.dbClient.IsLocked(err) { - log.Infof("another instance of crowdsec is already pushing metrics, skipping") - metTicker.Reset(nextMetInt()) - } - if err != nil { - log.Errorf("unable to acquire pushMetrics lock (%s)", err) - } + log.Debug("capi metrics: acquiring lock") + err := a.dbClient.AcquirePushMetricsLock() + if a.dbClient.IsLocked(err) { + log.Infof("another instance of crowdsec is already pushing metrics, skipping") + metTicker.Reset(nextMetInt()) + return + } + if err != nil { + log.Errorf("unable to acquire pushMetrics lock (%s)", err) } metrics, err := a.GetMetrics() if err != nil { @@ -147,12 +146,10 @@ func (a *apic) SendMetrics(stop chan (bool)) { if err != nil { log.Errorf("capi metrics: failed: %s", err) } - if a.isHAEnabled { - log.Debug("capi metrics: releasing lock") - err = a.dbClient.ReleasePushMetricsLock() - if err != nil { - log.Errorf("unable to release metrics lock (%s)", err) - } + log.Debug("capi metrics: releasing lock") + err = a.dbClient.ReleasePushMetricsLock() + if err != nil { + log.Errorf("unable to release metrics lock (%s)", err) } metTicker.Reset(nextMetInt()) case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? diff --git a/pkg/apiserver/apic_test.go b/pkg/apiserver/apic_test.go index a9d6460cf..8aeb092cd 100644 --- a/pkg/apiserver/apic_test.go +++ b/pkg/apiserver/apic_test.go @@ -236,7 +236,7 @@ func TestNewAPIC(t *testing.T) { ), )) tc.action() - _, err := NewAPIC(testConfig, tc.args.dbClient, tc.args.consoleConfig, nil, nil) + _, err := NewAPIC(testConfig, tc.args.dbClient, tc.args.consoleConfig, nil) cstest.RequireErrorContains(t, err, tc.expectedErr) }) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 3f6b43d37..d802822f8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -213,7 +213,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) { if config.OnlineClient != nil && config.OnlineClient.Credentials != nil { log.Printf("Loading CAPI manager") - apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig, config.CapiWhitelists, config.HighAvailability) + apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig, config.CapiWhitelists) if err != nil { return &APIServer{}, err } diff --git a/pkg/csconfig/api.go b/pkg/csconfig/api.go index 77916f061..06d6a9712 100644 --- a/pkg/csconfig/api.go +++ b/pkg/csconfig/api.go @@ -186,34 +186,29 @@ type CapiWhitelist struct { /*local api service configuration*/ type LocalApiServerCfg struct { - Enable *bool `yaml:"enable"` - ListenURI string `yaml:"listen_uri,omitempty"` // 127.0.0.1:8080 - TLS *TLSCfg `yaml:"tls"` - DbConfig *DatabaseCfg `yaml:"-"` - LogDir string `yaml:"-"` - LogMedia string `yaml:"-"` - OnlineClient *OnlineApiClientCfg `yaml:"online_client"` - ProfilesPath string `yaml:"profiles_path,omitempty"` - ConsoleConfigPath string `yaml:"console_path,omitempty"` - ConsoleConfig *ConsoleConfig `yaml:"-"` - Profiles []*ProfileCfg `yaml:"-"` - LogLevel *log.Level `yaml:"log_level"` - UseForwardedForHeaders bool `yaml:"use_forwarded_for_headers,omitempty"` - TrustedProxies *[]string `yaml:"trusted_proxies,omitempty"` - CompressLogs *bool `yaml:"-"` - LogMaxSize int `yaml:"-"` - LogMaxAge int `yaml:"-"` - LogMaxFiles int `yaml:"-"` - TrustedIPs []string `yaml:"trusted_ips,omitempty"` - PapiLogLevel *log.Level `yaml:"papi_log_level"` - DisableRemoteLapiRegistration bool `yaml:"disable_remote_lapi_registration,omitempty"` - CapiWhitelistsPath string `yaml:"capi_whitelists_path,omitempty"` - CapiWhitelists *CapiWhitelist `yaml:"-"` - HighAvailability *HighAvailabilityCfg `yaml:"high_availability,omitempty"` -} - -type HighAvailabilityCfg struct { - Enabled *bool `yaml:"enabled"` + Enable *bool `yaml:"enable"` + ListenURI string `yaml:"listen_uri,omitempty"` // 127.0.0.1:8080 + TLS *TLSCfg `yaml:"tls"` + DbConfig *DatabaseCfg `yaml:"-"` + LogDir string `yaml:"-"` + LogMedia string `yaml:"-"` + OnlineClient *OnlineApiClientCfg `yaml:"online_client"` + ProfilesPath string `yaml:"profiles_path,omitempty"` + ConsoleConfigPath string `yaml:"console_path,omitempty"` + ConsoleConfig *ConsoleConfig `yaml:"-"` + Profiles []*ProfileCfg `yaml:"-"` + LogLevel *log.Level `yaml:"log_level"` + UseForwardedForHeaders bool `yaml:"use_forwarded_for_headers,omitempty"` + TrustedProxies *[]string `yaml:"trusted_proxies,omitempty"` + CompressLogs *bool `yaml:"-"` + LogMaxSize int `yaml:"-"` + LogMaxAge int `yaml:"-"` + LogMaxFiles int `yaml:"-"` + TrustedIPs []string `yaml:"trusted_ips,omitempty"` + PapiLogLevel *log.Level `yaml:"papi_log_level"` + DisableRemoteLapiRegistration bool `yaml:"disable_remote_lapi_registration,omitempty"` + CapiWhitelistsPath string `yaml:"capi_whitelists_path,omitempty"` + CapiWhitelists *CapiWhitelist `yaml:"-"` } type TLSCfg struct { diff --git a/pkg/database/ent/alert_query.go b/pkg/database/ent/alert_query.go index 42be7d4ae..68789196d 100644 --- a/pkg/database/ent/alert_query.go +++ b/pkg/database/ent/alert_query.go @@ -8,7 +8,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -34,7 +33,6 @@ type AlertQuery struct { withEvents *EventQuery withMetas *MetaQuery withFKs bool - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -486,9 +484,6 @@ func (aq *AlertQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Alert, node.Edges.loadedTypes = loadedTypes return node.assignValues(columns, values) } - if len(aq.modifiers) > 0 { - _spec.Modifiers = aq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -641,9 +636,6 @@ func (aq *AlertQuery) loadMetas(ctx context.Context, query *MetaQuery, nodes []* func (aq *AlertQuery) sqlCount(ctx context.Context) (int, error) { _spec := aq.querySpec() - if len(aq.modifiers) > 0 { - _spec.Modifiers = aq.modifiers - } _spec.Node.Columns = aq.fields if len(aq.fields) > 0 { _spec.Unique = aq.unique != nil && *aq.unique @@ -725,9 +717,6 @@ func (aq *AlertQuery) sqlQuery(ctx context.Context) *sql.Selector { if aq.unique != nil && *aq.unique { selector.Distinct() } - for _, m := range aq.modifiers { - m(selector) - } for _, p := range aq.predicates { p(selector) } @@ -745,32 +734,6 @@ func (aq *AlertQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (aq *AlertQuery) ForUpdate(opts ...sql.LockOption) *AlertQuery { - if aq.driver.Dialect() == dialect.Postgres { - aq.Unique(false) - } - aq.modifiers = append(aq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return aq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (aq *AlertQuery) ForShare(opts ...sql.LockOption) *AlertQuery { - if aq.driver.Dialect() == dialect.Postgres { - aq.Unique(false) - } - aq.modifiers = append(aq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return aq -} - // AlertGroupBy is the group-by builder for Alert entities. type AlertGroupBy struct { config diff --git a/pkg/database/ent/bouncer_query.go b/pkg/database/ent/bouncer_query.go index a07bc1ccd..2747a3e0b 100644 --- a/pkg/database/ent/bouncer_query.go +++ b/pkg/database/ent/bouncer_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -24,7 +23,6 @@ type BouncerQuery struct { order []OrderFunc fields []string predicates []predicate.Bouncer - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -326,9 +324,6 @@ func (bq *BouncerQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Boun nodes = append(nodes, node) return node.assignValues(columns, values) } - if len(bq.modifiers) > 0 { - _spec.Modifiers = bq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -343,9 +338,6 @@ func (bq *BouncerQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Boun func (bq *BouncerQuery) sqlCount(ctx context.Context) (int, error) { _spec := bq.querySpec() - if len(bq.modifiers) > 0 { - _spec.Modifiers = bq.modifiers - } _spec.Node.Columns = bq.fields if len(bq.fields) > 0 { _spec.Unique = bq.unique != nil && *bq.unique @@ -427,9 +419,6 @@ func (bq *BouncerQuery) sqlQuery(ctx context.Context) *sql.Selector { if bq.unique != nil && *bq.unique { selector.Distinct() } - for _, m := range bq.modifiers { - m(selector) - } for _, p := range bq.predicates { p(selector) } @@ -447,32 +436,6 @@ func (bq *BouncerQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (bq *BouncerQuery) ForUpdate(opts ...sql.LockOption) *BouncerQuery { - if bq.driver.Dialect() == dialect.Postgres { - bq.Unique(false) - } - bq.modifiers = append(bq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return bq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (bq *BouncerQuery) ForShare(opts ...sql.LockOption) *BouncerQuery { - if bq.driver.Dialect() == dialect.Postgres { - bq.Unique(false) - } - bq.modifiers = append(bq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return bq -} - // BouncerGroupBy is the group-by builder for Bouncer entities. type BouncerGroupBy struct { config diff --git a/pkg/database/ent/configitem_query.go b/pkg/database/ent/configitem_query.go index d8e00a975..6c9e6732a 100644 --- a/pkg/database/ent/configitem_query.go +++ b/pkg/database/ent/configitem_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -24,7 +23,6 @@ type ConfigItemQuery struct { order []OrderFunc fields []string predicates []predicate.ConfigItem - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -326,9 +324,6 @@ func (ciq *ConfigItemQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]* nodes = append(nodes, node) return node.assignValues(columns, values) } - if len(ciq.modifiers) > 0 { - _spec.Modifiers = ciq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -343,9 +338,6 @@ func (ciq *ConfigItemQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]* func (ciq *ConfigItemQuery) sqlCount(ctx context.Context) (int, error) { _spec := ciq.querySpec() - if len(ciq.modifiers) > 0 { - _spec.Modifiers = ciq.modifiers - } _spec.Node.Columns = ciq.fields if len(ciq.fields) > 0 { _spec.Unique = ciq.unique != nil && *ciq.unique @@ -427,9 +419,6 @@ func (ciq *ConfigItemQuery) sqlQuery(ctx context.Context) *sql.Selector { if ciq.unique != nil && *ciq.unique { selector.Distinct() } - for _, m := range ciq.modifiers { - m(selector) - } for _, p := range ciq.predicates { p(selector) } @@ -447,32 +436,6 @@ func (ciq *ConfigItemQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (ciq *ConfigItemQuery) ForUpdate(opts ...sql.LockOption) *ConfigItemQuery { - if ciq.driver.Dialect() == dialect.Postgres { - ciq.Unique(false) - } - ciq.modifiers = append(ciq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return ciq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (ciq *ConfigItemQuery) ForShare(opts ...sql.LockOption) *ConfigItemQuery { - if ciq.driver.Dialect() == dialect.Postgres { - ciq.Unique(false) - } - ciq.modifiers = append(ciq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return ciq -} - // ConfigItemGroupBy is the group-by builder for ConfigItem entities. type ConfigItemGroupBy struct { config diff --git a/pkg/database/ent/decision_query.go b/pkg/database/ent/decision_query.go index e0fa924ed..91aebded9 100644 --- a/pkg/database/ent/decision_query.go +++ b/pkg/database/ent/decision_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -26,7 +25,6 @@ type DecisionQuery struct { fields []string predicates []predicate.Decision withOwner *AlertQuery - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -366,9 +364,6 @@ func (dq *DecisionQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Dec node.Edges.loadedTypes = loadedTypes return node.assignValues(columns, values) } - if len(dq.modifiers) > 0 { - _spec.Modifiers = dq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -416,9 +411,6 @@ func (dq *DecisionQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes func (dq *DecisionQuery) sqlCount(ctx context.Context) (int, error) { _spec := dq.querySpec() - if len(dq.modifiers) > 0 { - _spec.Modifiers = dq.modifiers - } _spec.Node.Columns = dq.fields if len(dq.fields) > 0 { _spec.Unique = dq.unique != nil && *dq.unique @@ -500,9 +492,6 @@ func (dq *DecisionQuery) sqlQuery(ctx context.Context) *sql.Selector { if dq.unique != nil && *dq.unique { selector.Distinct() } - for _, m := range dq.modifiers { - m(selector) - } for _, p := range dq.predicates { p(selector) } @@ -520,32 +509,6 @@ func (dq *DecisionQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (dq *DecisionQuery) ForUpdate(opts ...sql.LockOption) *DecisionQuery { - if dq.driver.Dialect() == dialect.Postgres { - dq.Unique(false) - } - dq.modifiers = append(dq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return dq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (dq *DecisionQuery) ForShare(opts ...sql.LockOption) *DecisionQuery { - if dq.driver.Dialect() == dialect.Postgres { - dq.Unique(false) - } - dq.modifiers = append(dq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return dq -} - // DecisionGroupBy is the group-by builder for Decision entities. type DecisionGroupBy struct { config diff --git a/pkg/database/ent/event_query.go b/pkg/database/ent/event_query.go index 974c6d4d6..045d750f8 100644 --- a/pkg/database/ent/event_query.go +++ b/pkg/database/ent/event_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -26,7 +25,6 @@ type EventQuery struct { fields []string predicates []predicate.Event withOwner *AlertQuery - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -366,9 +364,6 @@ func (eq *EventQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Event, node.Edges.loadedTypes = loadedTypes return node.assignValues(columns, values) } - if len(eq.modifiers) > 0 { - _spec.Modifiers = eq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -416,9 +411,6 @@ func (eq *EventQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes [] func (eq *EventQuery) sqlCount(ctx context.Context) (int, error) { _spec := eq.querySpec() - if len(eq.modifiers) > 0 { - _spec.Modifiers = eq.modifiers - } _spec.Node.Columns = eq.fields if len(eq.fields) > 0 { _spec.Unique = eq.unique != nil && *eq.unique @@ -500,9 +492,6 @@ func (eq *EventQuery) sqlQuery(ctx context.Context) *sql.Selector { if eq.unique != nil && *eq.unique { selector.Distinct() } - for _, m := range eq.modifiers { - m(selector) - } for _, p := range eq.predicates { p(selector) } @@ -520,32 +509,6 @@ func (eq *EventQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (eq *EventQuery) ForUpdate(opts ...sql.LockOption) *EventQuery { - if eq.driver.Dialect() == dialect.Postgres { - eq.Unique(false) - } - eq.modifiers = append(eq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return eq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (eq *EventQuery) ForShare(opts ...sql.LockOption) *EventQuery { - if eq.driver.Dialect() == dialect.Postgres { - eq.Unique(false) - } - eq.modifiers = append(eq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return eq -} - // EventGroupBy is the group-by builder for Event entities. type EventGroupBy struct { config diff --git a/pkg/database/ent/lock_query.go b/pkg/database/ent/lock_query.go index 655b458c0..cd97d6e59 100644 --- a/pkg/database/ent/lock_query.go +++ b/pkg/database/ent/lock_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -24,7 +23,6 @@ type LockQuery struct { order []OrderFunc fields []string predicates []predicate.Lock - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -326,9 +324,6 @@ func (lq *LockQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Lock, e nodes = append(nodes, node) return node.assignValues(columns, values) } - if len(lq.modifiers) > 0 { - _spec.Modifiers = lq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -343,9 +338,6 @@ func (lq *LockQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Lock, e func (lq *LockQuery) sqlCount(ctx context.Context) (int, error) { _spec := lq.querySpec() - if len(lq.modifiers) > 0 { - _spec.Modifiers = lq.modifiers - } _spec.Node.Columns = lq.fields if len(lq.fields) > 0 { _spec.Unique = lq.unique != nil && *lq.unique @@ -427,9 +419,6 @@ func (lq *LockQuery) sqlQuery(ctx context.Context) *sql.Selector { if lq.unique != nil && *lq.unique { selector.Distinct() } - for _, m := range lq.modifiers { - m(selector) - } for _, p := range lq.predicates { p(selector) } @@ -447,32 +436,6 @@ func (lq *LockQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (lq *LockQuery) ForUpdate(opts ...sql.LockOption) *LockQuery { - if lq.driver.Dialect() == dialect.Postgres { - lq.Unique(false) - } - lq.modifiers = append(lq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return lq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (lq *LockQuery) ForShare(opts ...sql.LockOption) *LockQuery { - if lq.driver.Dialect() == dialect.Postgres { - lq.Unique(false) - } - lq.modifiers = append(lq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return lq -} - // LockGroupBy is the group-by builder for Lock entities. type LockGroupBy struct { config diff --git a/pkg/database/ent/machine_query.go b/pkg/database/ent/machine_query.go index ea2d32208..283914219 100644 --- a/pkg/database/ent/machine_query.go +++ b/pkg/database/ent/machine_query.go @@ -8,7 +8,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -27,7 +26,6 @@ type MachineQuery struct { fields []string predicates []predicate.Machine withAlerts *AlertQuery - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -367,9 +365,6 @@ func (mq *MachineQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Mach node.Edges.loadedTypes = loadedTypes return node.assignValues(columns, values) } - if len(mq.modifiers) > 0 { - _spec.Modifiers = mq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -423,9 +418,6 @@ func (mq *MachineQuery) loadAlerts(ctx context.Context, query *AlertQuery, nodes func (mq *MachineQuery) sqlCount(ctx context.Context) (int, error) { _spec := mq.querySpec() - if len(mq.modifiers) > 0 { - _spec.Modifiers = mq.modifiers - } _spec.Node.Columns = mq.fields if len(mq.fields) > 0 { _spec.Unique = mq.unique != nil && *mq.unique @@ -507,9 +499,6 @@ func (mq *MachineQuery) sqlQuery(ctx context.Context) *sql.Selector { if mq.unique != nil && *mq.unique { selector.Distinct() } - for _, m := range mq.modifiers { - m(selector) - } for _, p := range mq.predicates { p(selector) } @@ -527,32 +516,6 @@ func (mq *MachineQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (mq *MachineQuery) ForUpdate(opts ...sql.LockOption) *MachineQuery { - if mq.driver.Dialect() == dialect.Postgres { - mq.Unique(false) - } - mq.modifiers = append(mq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return mq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (mq *MachineQuery) ForShare(opts ...sql.LockOption) *MachineQuery { - if mq.driver.Dialect() == dialect.Postgres { - mq.Unique(false) - } - mq.modifiers = append(mq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return mq -} - // MachineGroupBy is the group-by builder for Machine entities. type MachineGroupBy struct { config diff --git a/pkg/database/ent/meta_query.go b/pkg/database/ent/meta_query.go index 8259d48c9..d6fd4f3d5 100644 --- a/pkg/database/ent/meta_query.go +++ b/pkg/database/ent/meta_query.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" @@ -26,7 +25,6 @@ type MetaQuery struct { fields []string predicates []predicate.Meta withOwner *AlertQuery - modifiers []func(*sql.Selector) // intermediate query (i.e. traversal path). sql *sql.Selector path func(context.Context) (*sql.Selector, error) @@ -366,9 +364,6 @@ func (mq *MetaQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Meta, e node.Edges.loadedTypes = loadedTypes return node.assignValues(columns, values) } - if len(mq.modifiers) > 0 { - _spec.Modifiers = mq.modifiers - } for i := range hooks { hooks[i](ctx, _spec) } @@ -416,9 +411,6 @@ func (mq *MetaQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes []* func (mq *MetaQuery) sqlCount(ctx context.Context) (int, error) { _spec := mq.querySpec() - if len(mq.modifiers) > 0 { - _spec.Modifiers = mq.modifiers - } _spec.Node.Columns = mq.fields if len(mq.fields) > 0 { _spec.Unique = mq.unique != nil && *mq.unique @@ -500,9 +492,6 @@ func (mq *MetaQuery) sqlQuery(ctx context.Context) *sql.Selector { if mq.unique != nil && *mq.unique { selector.Distinct() } - for _, m := range mq.modifiers { - m(selector) - } for _, p := range mq.predicates { p(selector) } @@ -520,32 +509,6 @@ func (mq *MetaQuery) sqlQuery(ctx context.Context) *sql.Selector { return selector } -// ForUpdate locks the selected rows against concurrent updates, and prevent them from being -// updated, deleted or "selected ... for update" by other sessions, until the transaction is -// either committed or rolled-back. -func (mq *MetaQuery) ForUpdate(opts ...sql.LockOption) *MetaQuery { - if mq.driver.Dialect() == dialect.Postgres { - mq.Unique(false) - } - mq.modifiers = append(mq.modifiers, func(s *sql.Selector) { - s.ForUpdate(opts...) - }) - return mq -} - -// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock -// on any rows that are read. Other sessions can read the rows, but cannot modify them -// until your transaction commits. -func (mq *MetaQuery) ForShare(opts ...sql.LockOption) *MetaQuery { - if mq.driver.Dialect() == dialect.Postgres { - mq.Unique(false) - } - mq.modifiers = append(mq.modifiers, func(s *sql.Selector) { - s.ForShare(opts...) - }) - return mq -} - // MetaGroupBy is the group-by builder for Meta entities. type MetaGroupBy struct { config diff --git a/pkg/database/lock.go b/pkg/database/lock.go index 2f7bf34d6..bf9f6d3ed 100644 --- a/pkg/database/lock.go +++ b/pkg/database/lock.go @@ -12,8 +12,8 @@ import ( ) const ( - CAPIPullLockTimeout = 130 - MetricsLockTimeout = 40 + CAPIPullLockTimeout = 120 + MetricsLockTimeout = 30 ) func (c *Client) AcquireLock(name string) error { @@ -22,8 +22,11 @@ func (c *Client) AcquireLock(name string) error { SetName(name). SetCreatedAt(types.UtcNow()). Save(c.CTX) + if ent.IsConstraintError(err) { + return err + } if err != nil { - return errors.Wrapf(QueryFail, "insert lock: %s", err) + return errors.Wrapf(InsertFail, "insert lock: %s", err) } return nil } @@ -31,19 +34,19 @@ func (c *Client) AcquireLock(name string) error { func (c *Client) ReleaseLock(name string) error { _, err := c.Ent.Lock.Delete().Where(lock.NameEQ(name)).Exec(c.CTX) if err != nil { - return errors.Wrapf(QueryFail, "delete lock: %s", err) + return errors.Wrapf(DeleteFail, "delete lock: %s", err) } return nil } func (c *Client) ReleaseLockWithTimeout(name string, timeout int) error { - log.Debugf("releasing (%s) orphin locks", name) + log.Debugf("(%s) releasing orphin locks", name) _, err := c.Ent.Lock.Delete().Where( lock.NameEQ(name), - lock.CreatedAtGT(time.Now().Add(-time.Duration(timeout)*time.Minute)), + lock.CreatedAtLT(time.Now().Add(-time.Duration(timeout)*time.Minute)), ).Exec(c.CTX) if err != nil { - return errors.Wrapf(QueryFail, "delete lock: %s", err) + return errors.Wrapf(DeleteFail, "delete lock: %s", err) } return nil }