enable by default ha feature

This commit is contained in:
he2ss 2023-10-04 16:11:39 +02:00
parent 8ce3f6ed85
commit e54f0cc44e
15 changed files with 59 additions and 369 deletions

View file

@ -53,7 +53,7 @@ func NewPapiStatusCmd() *cobra.Command {
log.Fatalf("unable to initialize database client : %s", err)
}
apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists, csConfig.API.Server.HighAvailability)
apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists)
if err != nil {
log.Fatalf("unable to initialize API client : %s", err)
@ -103,7 +103,7 @@ func NewPapiSyncCmd() *cobra.Command {
log.Fatalf("unable to initialize database client : %s", err)
}
apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists, csConfig.API.Server.HighAvailability)
apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig, csConfig.API.Server.CapiWhitelists)
if err != nil {
log.Fatalf("unable to initialize API client : %s", err)

View file

@ -56,7 +56,6 @@ type apic struct {
metricsIntervalFirst time.Duration
dbClient *database.Client
apiClient *apiclient.ApiClient
isHAEnabled bool
AlertsAddChan chan []*models.Alert
mu sync.Mutex
@ -163,7 +162,7 @@ func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool)
return signal
}
func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist, haConfig *csconfig.HighAvailabilityCfg) (*apic, error) {
func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
var err error
ret := &apic{
@ -187,10 +186,6 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
whitelists: apicWhitelist,
}
if haConfig != nil && *haConfig.Enabled {
ret.isHAEnabled = *haConfig.Enabled
}
password := strfmt.Password(config.Credentials.Password)
apiURL, err := url.Parse(config.Credentials.URL)
if err != nil {
@ -576,13 +571,11 @@ func (a *apic) PullTop(forcePull bool) error {
}
}
if a.isHAEnabled {
log.Debug("Acquiring lock for pullCAPI")
err = a.dbClient.AcquirePullCAPILock()
if a.dbClient.IsLocked(err) {
log.Info("PullCAPI is already running, skipping")
return nil
}
log.Debug("Acquiring lock for pullCAPI")
err = a.dbClient.AcquirePullCAPILock()
if a.dbClient.IsLocked(err) {
log.Info("PullCAPI is already running, skipping")
return nil
}
log.Infof("Starting community-blocklist update")
@ -632,11 +625,9 @@ func (a *apic) PullTop(forcePull bool) error {
return fmt.Errorf("while updating blocklists: %w", err)
}
if a.isHAEnabled {
log.Debug("Releasing lock for pullCAPI")
if err := a.dbClient.ReleasePullCAPILock(); err != nil {
return fmt.Errorf("while releasing lock: %w", err)
}
log.Debug("Releasing lock for pullCAPI")
if err := a.dbClient.ReleasePullCAPILock(); err != nil {
return fmt.Errorf("while releasing lock: %w", err)
}
return nil
}

View file

@ -127,16 +127,15 @@ func (a *apic) SendMetrics(stop chan (bool)) {
}
case <-metTicker.C:
metTicker.Stop()
if a.isHAEnabled {
log.Debug("capi metrics: acquiring lock")
err := a.dbClient.AcquirePushMetricsLock()
if a.dbClient.IsLocked(err) {
log.Infof("another instance of crowdsec is already pushing metrics, skipping")
metTicker.Reset(nextMetInt())
}
if err != nil {
log.Errorf("unable to acquire pushMetrics lock (%s)", err)
}
log.Debug("capi metrics: acquiring lock")
err := a.dbClient.AcquirePushMetricsLock()
if a.dbClient.IsLocked(err) {
log.Infof("another instance of crowdsec is already pushing metrics, skipping")
metTicker.Reset(nextMetInt())
return
}
if err != nil {
log.Errorf("unable to acquire pushMetrics lock (%s)", err)
}
metrics, err := a.GetMetrics()
if err != nil {
@ -147,12 +146,10 @@ func (a *apic) SendMetrics(stop chan (bool)) {
if err != nil {
log.Errorf("capi metrics: failed: %s", err)
}
if a.isHAEnabled {
log.Debug("capi metrics: releasing lock")
err = a.dbClient.ReleasePushMetricsLock()
if err != nil {
log.Errorf("unable to release metrics lock (%s)", err)
}
log.Debug("capi metrics: releasing lock")
err = a.dbClient.ReleasePushMetricsLock()
if err != nil {
log.Errorf("unable to release metrics lock (%s)", err)
}
metTicker.Reset(nextMetInt())
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?

View file

@ -236,7 +236,7 @@ func TestNewAPIC(t *testing.T) {
),
))
tc.action()
_, err := NewAPIC(testConfig, tc.args.dbClient, tc.args.consoleConfig, nil, nil)
_, err := NewAPIC(testConfig, tc.args.dbClient, tc.args.consoleConfig, nil)
cstest.RequireErrorContains(t, err, tc.expectedErr)
})
}

View file

@ -213,7 +213,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
if config.OnlineClient != nil && config.OnlineClient.Credentials != nil {
log.Printf("Loading CAPI manager")
apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig, config.CapiWhitelists, config.HighAvailability)
apiClient, err = NewAPIC(config.OnlineClient, dbClient, config.ConsoleConfig, config.CapiWhitelists)
if err != nil {
return &APIServer{}, err
}

View file

@ -186,34 +186,29 @@ type CapiWhitelist struct {
/*local api service configuration*/
type LocalApiServerCfg struct {
Enable *bool `yaml:"enable"`
ListenURI string `yaml:"listen_uri,omitempty"` // 127.0.0.1:8080
TLS *TLSCfg `yaml:"tls"`
DbConfig *DatabaseCfg `yaml:"-"`
LogDir string `yaml:"-"`
LogMedia string `yaml:"-"`
OnlineClient *OnlineApiClientCfg `yaml:"online_client"`
ProfilesPath string `yaml:"profiles_path,omitempty"`
ConsoleConfigPath string `yaml:"console_path,omitempty"`
ConsoleConfig *ConsoleConfig `yaml:"-"`
Profiles []*ProfileCfg `yaml:"-"`
LogLevel *log.Level `yaml:"log_level"`
UseForwardedForHeaders bool `yaml:"use_forwarded_for_headers,omitempty"`
TrustedProxies *[]string `yaml:"trusted_proxies,omitempty"`
CompressLogs *bool `yaml:"-"`
LogMaxSize int `yaml:"-"`
LogMaxAge int `yaml:"-"`
LogMaxFiles int `yaml:"-"`
TrustedIPs []string `yaml:"trusted_ips,omitempty"`
PapiLogLevel *log.Level `yaml:"papi_log_level"`
DisableRemoteLapiRegistration bool `yaml:"disable_remote_lapi_registration,omitempty"`
CapiWhitelistsPath string `yaml:"capi_whitelists_path,omitempty"`
CapiWhitelists *CapiWhitelist `yaml:"-"`
HighAvailability *HighAvailabilityCfg `yaml:"high_availability,omitempty"`
}
type HighAvailabilityCfg struct {
Enabled *bool `yaml:"enabled"`
Enable *bool `yaml:"enable"`
ListenURI string `yaml:"listen_uri,omitempty"` // 127.0.0.1:8080
TLS *TLSCfg `yaml:"tls"`
DbConfig *DatabaseCfg `yaml:"-"`
LogDir string `yaml:"-"`
LogMedia string `yaml:"-"`
OnlineClient *OnlineApiClientCfg `yaml:"online_client"`
ProfilesPath string `yaml:"profiles_path,omitempty"`
ConsoleConfigPath string `yaml:"console_path,omitempty"`
ConsoleConfig *ConsoleConfig `yaml:"-"`
Profiles []*ProfileCfg `yaml:"-"`
LogLevel *log.Level `yaml:"log_level"`
UseForwardedForHeaders bool `yaml:"use_forwarded_for_headers,omitempty"`
TrustedProxies *[]string `yaml:"trusted_proxies,omitempty"`
CompressLogs *bool `yaml:"-"`
LogMaxSize int `yaml:"-"`
LogMaxAge int `yaml:"-"`
LogMaxFiles int `yaml:"-"`
TrustedIPs []string `yaml:"trusted_ips,omitempty"`
PapiLogLevel *log.Level `yaml:"papi_log_level"`
DisableRemoteLapiRegistration bool `yaml:"disable_remote_lapi_registration,omitempty"`
CapiWhitelistsPath string `yaml:"capi_whitelists_path,omitempty"`
CapiWhitelists *CapiWhitelist `yaml:"-"`
}
type TLSCfg struct {

View file

@ -8,7 +8,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -34,7 +33,6 @@ type AlertQuery struct {
withEvents *EventQuery
withMetas *MetaQuery
withFKs bool
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -486,9 +484,6 @@ func (aq *AlertQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Alert,
node.Edges.loadedTypes = loadedTypes
return node.assignValues(columns, values)
}
if len(aq.modifiers) > 0 {
_spec.Modifiers = aq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -641,9 +636,6 @@ func (aq *AlertQuery) loadMetas(ctx context.Context, query *MetaQuery, nodes []*
func (aq *AlertQuery) sqlCount(ctx context.Context) (int, error) {
_spec := aq.querySpec()
if len(aq.modifiers) > 0 {
_spec.Modifiers = aq.modifiers
}
_spec.Node.Columns = aq.fields
if len(aq.fields) > 0 {
_spec.Unique = aq.unique != nil && *aq.unique
@ -725,9 +717,6 @@ func (aq *AlertQuery) sqlQuery(ctx context.Context) *sql.Selector {
if aq.unique != nil && *aq.unique {
selector.Distinct()
}
for _, m := range aq.modifiers {
m(selector)
}
for _, p := range aq.predicates {
p(selector)
}
@ -745,32 +734,6 @@ func (aq *AlertQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (aq *AlertQuery) ForUpdate(opts ...sql.LockOption) *AlertQuery {
if aq.driver.Dialect() == dialect.Postgres {
aq.Unique(false)
}
aq.modifiers = append(aq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return aq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (aq *AlertQuery) ForShare(opts ...sql.LockOption) *AlertQuery {
if aq.driver.Dialect() == dialect.Postgres {
aq.Unique(false)
}
aq.modifiers = append(aq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return aq
}
// AlertGroupBy is the group-by builder for Alert entities.
type AlertGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -24,7 +23,6 @@ type BouncerQuery struct {
order []OrderFunc
fields []string
predicates []predicate.Bouncer
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -326,9 +324,6 @@ func (bq *BouncerQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Boun
nodes = append(nodes, node)
return node.assignValues(columns, values)
}
if len(bq.modifiers) > 0 {
_spec.Modifiers = bq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -343,9 +338,6 @@ func (bq *BouncerQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Boun
func (bq *BouncerQuery) sqlCount(ctx context.Context) (int, error) {
_spec := bq.querySpec()
if len(bq.modifiers) > 0 {
_spec.Modifiers = bq.modifiers
}
_spec.Node.Columns = bq.fields
if len(bq.fields) > 0 {
_spec.Unique = bq.unique != nil && *bq.unique
@ -427,9 +419,6 @@ func (bq *BouncerQuery) sqlQuery(ctx context.Context) *sql.Selector {
if bq.unique != nil && *bq.unique {
selector.Distinct()
}
for _, m := range bq.modifiers {
m(selector)
}
for _, p := range bq.predicates {
p(selector)
}
@ -447,32 +436,6 @@ func (bq *BouncerQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (bq *BouncerQuery) ForUpdate(opts ...sql.LockOption) *BouncerQuery {
if bq.driver.Dialect() == dialect.Postgres {
bq.Unique(false)
}
bq.modifiers = append(bq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return bq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (bq *BouncerQuery) ForShare(opts ...sql.LockOption) *BouncerQuery {
if bq.driver.Dialect() == dialect.Postgres {
bq.Unique(false)
}
bq.modifiers = append(bq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return bq
}
// BouncerGroupBy is the group-by builder for Bouncer entities.
type BouncerGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -24,7 +23,6 @@ type ConfigItemQuery struct {
order []OrderFunc
fields []string
predicates []predicate.ConfigItem
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -326,9 +324,6 @@ func (ciq *ConfigItemQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*
nodes = append(nodes, node)
return node.assignValues(columns, values)
}
if len(ciq.modifiers) > 0 {
_spec.Modifiers = ciq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -343,9 +338,6 @@ func (ciq *ConfigItemQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*
func (ciq *ConfigItemQuery) sqlCount(ctx context.Context) (int, error) {
_spec := ciq.querySpec()
if len(ciq.modifiers) > 0 {
_spec.Modifiers = ciq.modifiers
}
_spec.Node.Columns = ciq.fields
if len(ciq.fields) > 0 {
_spec.Unique = ciq.unique != nil && *ciq.unique
@ -427,9 +419,6 @@ func (ciq *ConfigItemQuery) sqlQuery(ctx context.Context) *sql.Selector {
if ciq.unique != nil && *ciq.unique {
selector.Distinct()
}
for _, m := range ciq.modifiers {
m(selector)
}
for _, p := range ciq.predicates {
p(selector)
}
@ -447,32 +436,6 @@ func (ciq *ConfigItemQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (ciq *ConfigItemQuery) ForUpdate(opts ...sql.LockOption) *ConfigItemQuery {
if ciq.driver.Dialect() == dialect.Postgres {
ciq.Unique(false)
}
ciq.modifiers = append(ciq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return ciq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (ciq *ConfigItemQuery) ForShare(opts ...sql.LockOption) *ConfigItemQuery {
if ciq.driver.Dialect() == dialect.Postgres {
ciq.Unique(false)
}
ciq.modifiers = append(ciq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return ciq
}
// ConfigItemGroupBy is the group-by builder for ConfigItem entities.
type ConfigItemGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -26,7 +25,6 @@ type DecisionQuery struct {
fields []string
predicates []predicate.Decision
withOwner *AlertQuery
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -366,9 +364,6 @@ func (dq *DecisionQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Dec
node.Edges.loadedTypes = loadedTypes
return node.assignValues(columns, values)
}
if len(dq.modifiers) > 0 {
_spec.Modifiers = dq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -416,9 +411,6 @@ func (dq *DecisionQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes
func (dq *DecisionQuery) sqlCount(ctx context.Context) (int, error) {
_spec := dq.querySpec()
if len(dq.modifiers) > 0 {
_spec.Modifiers = dq.modifiers
}
_spec.Node.Columns = dq.fields
if len(dq.fields) > 0 {
_spec.Unique = dq.unique != nil && *dq.unique
@ -500,9 +492,6 @@ func (dq *DecisionQuery) sqlQuery(ctx context.Context) *sql.Selector {
if dq.unique != nil && *dq.unique {
selector.Distinct()
}
for _, m := range dq.modifiers {
m(selector)
}
for _, p := range dq.predicates {
p(selector)
}
@ -520,32 +509,6 @@ func (dq *DecisionQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (dq *DecisionQuery) ForUpdate(opts ...sql.LockOption) *DecisionQuery {
if dq.driver.Dialect() == dialect.Postgres {
dq.Unique(false)
}
dq.modifiers = append(dq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return dq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (dq *DecisionQuery) ForShare(opts ...sql.LockOption) *DecisionQuery {
if dq.driver.Dialect() == dialect.Postgres {
dq.Unique(false)
}
dq.modifiers = append(dq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return dq
}
// DecisionGroupBy is the group-by builder for Decision entities.
type DecisionGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -26,7 +25,6 @@ type EventQuery struct {
fields []string
predicates []predicate.Event
withOwner *AlertQuery
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -366,9 +364,6 @@ func (eq *EventQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Event,
node.Edges.loadedTypes = loadedTypes
return node.assignValues(columns, values)
}
if len(eq.modifiers) > 0 {
_spec.Modifiers = eq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -416,9 +411,6 @@ func (eq *EventQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes []
func (eq *EventQuery) sqlCount(ctx context.Context) (int, error) {
_spec := eq.querySpec()
if len(eq.modifiers) > 0 {
_spec.Modifiers = eq.modifiers
}
_spec.Node.Columns = eq.fields
if len(eq.fields) > 0 {
_spec.Unique = eq.unique != nil && *eq.unique
@ -500,9 +492,6 @@ func (eq *EventQuery) sqlQuery(ctx context.Context) *sql.Selector {
if eq.unique != nil && *eq.unique {
selector.Distinct()
}
for _, m := range eq.modifiers {
m(selector)
}
for _, p := range eq.predicates {
p(selector)
}
@ -520,32 +509,6 @@ func (eq *EventQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (eq *EventQuery) ForUpdate(opts ...sql.LockOption) *EventQuery {
if eq.driver.Dialect() == dialect.Postgres {
eq.Unique(false)
}
eq.modifiers = append(eq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return eq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (eq *EventQuery) ForShare(opts ...sql.LockOption) *EventQuery {
if eq.driver.Dialect() == dialect.Postgres {
eq.Unique(false)
}
eq.modifiers = append(eq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return eq
}
// EventGroupBy is the group-by builder for Event entities.
type EventGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -24,7 +23,6 @@ type LockQuery struct {
order []OrderFunc
fields []string
predicates []predicate.Lock
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -326,9 +324,6 @@ func (lq *LockQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Lock, e
nodes = append(nodes, node)
return node.assignValues(columns, values)
}
if len(lq.modifiers) > 0 {
_spec.Modifiers = lq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -343,9 +338,6 @@ func (lq *LockQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Lock, e
func (lq *LockQuery) sqlCount(ctx context.Context) (int, error) {
_spec := lq.querySpec()
if len(lq.modifiers) > 0 {
_spec.Modifiers = lq.modifiers
}
_spec.Node.Columns = lq.fields
if len(lq.fields) > 0 {
_spec.Unique = lq.unique != nil && *lq.unique
@ -427,9 +419,6 @@ func (lq *LockQuery) sqlQuery(ctx context.Context) *sql.Selector {
if lq.unique != nil && *lq.unique {
selector.Distinct()
}
for _, m := range lq.modifiers {
m(selector)
}
for _, p := range lq.predicates {
p(selector)
}
@ -447,32 +436,6 @@ func (lq *LockQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (lq *LockQuery) ForUpdate(opts ...sql.LockOption) *LockQuery {
if lq.driver.Dialect() == dialect.Postgres {
lq.Unique(false)
}
lq.modifiers = append(lq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return lq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (lq *LockQuery) ForShare(opts ...sql.LockOption) *LockQuery {
if lq.driver.Dialect() == dialect.Postgres {
lq.Unique(false)
}
lq.modifiers = append(lq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return lq
}
// LockGroupBy is the group-by builder for Lock entities.
type LockGroupBy struct {
config

View file

@ -8,7 +8,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -27,7 +26,6 @@ type MachineQuery struct {
fields []string
predicates []predicate.Machine
withAlerts *AlertQuery
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -367,9 +365,6 @@ func (mq *MachineQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Mach
node.Edges.loadedTypes = loadedTypes
return node.assignValues(columns, values)
}
if len(mq.modifiers) > 0 {
_spec.Modifiers = mq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -423,9 +418,6 @@ func (mq *MachineQuery) loadAlerts(ctx context.Context, query *AlertQuery, nodes
func (mq *MachineQuery) sqlCount(ctx context.Context) (int, error) {
_spec := mq.querySpec()
if len(mq.modifiers) > 0 {
_spec.Modifiers = mq.modifiers
}
_spec.Node.Columns = mq.fields
if len(mq.fields) > 0 {
_spec.Unique = mq.unique != nil && *mq.unique
@ -507,9 +499,6 @@ func (mq *MachineQuery) sqlQuery(ctx context.Context) *sql.Selector {
if mq.unique != nil && *mq.unique {
selector.Distinct()
}
for _, m := range mq.modifiers {
m(selector)
}
for _, p := range mq.predicates {
p(selector)
}
@ -527,32 +516,6 @@ func (mq *MachineQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (mq *MachineQuery) ForUpdate(opts ...sql.LockOption) *MachineQuery {
if mq.driver.Dialect() == dialect.Postgres {
mq.Unique(false)
}
mq.modifiers = append(mq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return mq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (mq *MachineQuery) ForShare(opts ...sql.LockOption) *MachineQuery {
if mq.driver.Dialect() == dialect.Postgres {
mq.Unique(false)
}
mq.modifiers = append(mq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return mq
}
// MachineGroupBy is the group-by builder for Machine entities.
type MachineGroupBy struct {
config

View file

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/sqlgraph"
"entgo.io/ent/schema/field"
@ -26,7 +25,6 @@ type MetaQuery struct {
fields []string
predicates []predicate.Meta
withOwner *AlertQuery
modifiers []func(*sql.Selector)
// intermediate query (i.e. traversal path).
sql *sql.Selector
path func(context.Context) (*sql.Selector, error)
@ -366,9 +364,6 @@ func (mq *MetaQuery) sqlAll(ctx context.Context, hooks ...queryHook) ([]*Meta, e
node.Edges.loadedTypes = loadedTypes
return node.assignValues(columns, values)
}
if len(mq.modifiers) > 0 {
_spec.Modifiers = mq.modifiers
}
for i := range hooks {
hooks[i](ctx, _spec)
}
@ -416,9 +411,6 @@ func (mq *MetaQuery) loadOwner(ctx context.Context, query *AlertQuery, nodes []*
func (mq *MetaQuery) sqlCount(ctx context.Context) (int, error) {
_spec := mq.querySpec()
if len(mq.modifiers) > 0 {
_spec.Modifiers = mq.modifiers
}
_spec.Node.Columns = mq.fields
if len(mq.fields) > 0 {
_spec.Unique = mq.unique != nil && *mq.unique
@ -500,9 +492,6 @@ func (mq *MetaQuery) sqlQuery(ctx context.Context) *sql.Selector {
if mq.unique != nil && *mq.unique {
selector.Distinct()
}
for _, m := range mq.modifiers {
m(selector)
}
for _, p := range mq.predicates {
p(selector)
}
@ -520,32 +509,6 @@ func (mq *MetaQuery) sqlQuery(ctx context.Context) *sql.Selector {
return selector
}
// ForUpdate locks the selected rows against concurrent updates, and prevent them from being
// updated, deleted or "selected ... for update" by other sessions, until the transaction is
// either committed or rolled-back.
func (mq *MetaQuery) ForUpdate(opts ...sql.LockOption) *MetaQuery {
if mq.driver.Dialect() == dialect.Postgres {
mq.Unique(false)
}
mq.modifiers = append(mq.modifiers, func(s *sql.Selector) {
s.ForUpdate(opts...)
})
return mq
}
// ForShare behaves similarly to ForUpdate, except that it acquires a shared mode lock
// on any rows that are read. Other sessions can read the rows, but cannot modify them
// until your transaction commits.
func (mq *MetaQuery) ForShare(opts ...sql.LockOption) *MetaQuery {
if mq.driver.Dialect() == dialect.Postgres {
mq.Unique(false)
}
mq.modifiers = append(mq.modifiers, func(s *sql.Selector) {
s.ForShare(opts...)
})
return mq
}
// MetaGroupBy is the group-by builder for Meta entities.
type MetaGroupBy struct {
config

View file

@ -12,8 +12,8 @@ import (
)
const (
CAPIPullLockTimeout = 130
MetricsLockTimeout = 40
CAPIPullLockTimeout = 120
MetricsLockTimeout = 30
)
func (c *Client) AcquireLock(name string) error {
@ -22,8 +22,11 @@ func (c *Client) AcquireLock(name string) error {
SetName(name).
SetCreatedAt(types.UtcNow()).
Save(c.CTX)
if ent.IsConstraintError(err) {
return err
}
if err != nil {
return errors.Wrapf(QueryFail, "insert lock: %s", err)
return errors.Wrapf(InsertFail, "insert lock: %s", err)
}
return nil
}
@ -31,19 +34,19 @@ func (c *Client) AcquireLock(name string) error {
func (c *Client) ReleaseLock(name string) error {
_, err := c.Ent.Lock.Delete().Where(lock.NameEQ(name)).Exec(c.CTX)
if err != nil {
return errors.Wrapf(QueryFail, "delete lock: %s", err)
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
}
return nil
}
func (c *Client) ReleaseLockWithTimeout(name string, timeout int) error {
log.Debugf("releasing (%s) orphin locks", name)
log.Debugf("(%s) releasing orphin locks", name)
_, err := c.Ent.Lock.Delete().Where(
lock.NameEQ(name),
lock.CreatedAtGT(time.Now().Add(-time.Duration(timeout)*time.Minute)),
lock.CreatedAtLT(time.Now().Add(-time.Duration(timeout)*time.Minute)),
).Exec(c.CTX)
if err != nil {
return errors.Wrapf(QueryFail, "delete lock: %s", err)
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
}
return nil
}