Merge pull request #37399 from cyli/bump-swarmkit

Bump swarmkit to include task reaper fixes and more metrics.
This commit is contained in:
Sebastiaan van Stijn 2018-07-06 09:30:40 +02:00 committed by GitHub
commit 13f9a25a42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 548 additions and 36 deletions

View file

@ -125,7 +125,7 @@ github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577
github.com/gogo/googleapis 08a7655d27152912db7aaf4f983275eaf8d128ef
# cluster
github.com/docker/swarmkit edd5641391926a50bc5f7040e20b7efc05003c26
github.com/docker/swarmkit 199cf49cd99690135d99e52a1907ec82e8113c4f
github.com/gogo/protobuf v1.0.0
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/fernet/fernet-go 1b2437bc582b3cfbb341ee5a29f8ef5b42912ff2

View file

@ -2014,6 +2014,10 @@ func sozObjects(x uint64) (n int) {
type NodeCheckFunc func(t1, t2 *Node) bool
type EventNode interface {
IsEventNode() bool
}
type EventCreateNode struct {
Node *Node
Checks []NodeCheckFunc
@ -2033,6 +2037,14 @@ func (e EventCreateNode) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateNode) IsEventCreate() bool {
return true
}
func (e EventCreateNode) IsEventNode() bool {
return true
}
type EventUpdateNode struct {
Node *Node
OldNode *Node
@ -2053,6 +2065,14 @@ func (e EventUpdateNode) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateNode) IsEventUpdate() bool {
return true
}
func (e EventUpdateNode) IsEventNode() bool {
return true
}
type EventDeleteNode struct {
Node *Node
Checks []NodeCheckFunc
@ -2071,6 +2091,15 @@ func (e EventDeleteNode) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteNode) IsEventDelete() bool {
return true
}
func (e EventDeleteNode) IsEventNode() bool {
return true
}
func (m *Node) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -2261,6 +2290,10 @@ func (indexer NodeCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, er
type ServiceCheckFunc func(t1, t2 *Service) bool
type EventService interface {
IsEventService() bool
}
type EventCreateService struct {
Service *Service
Checks []ServiceCheckFunc
@ -2280,6 +2313,14 @@ func (e EventCreateService) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateService) IsEventCreate() bool {
return true
}
func (e EventCreateService) IsEventService() bool {
return true
}
type EventUpdateService struct {
Service *Service
OldService *Service
@ -2300,6 +2341,14 @@ func (e EventUpdateService) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateService) IsEventUpdate() bool {
return true
}
func (e EventUpdateService) IsEventService() bool {
return true
}
type EventDeleteService struct {
Service *Service
Checks []ServiceCheckFunc
@ -2318,6 +2367,15 @@ func (e EventDeleteService) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteService) IsEventDelete() bool {
return true
}
func (e EventDeleteService) IsEventService() bool {
return true
}
func (m *Service) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -2478,6 +2536,10 @@ func (indexer ServiceCustomIndexer) FromObject(obj interface{}) (bool, [][]byte,
type TaskCheckFunc func(t1, t2 *Task) bool
type EventTask interface {
IsEventTask() bool
}
type EventCreateTask struct {
Task *Task
Checks []TaskCheckFunc
@ -2497,6 +2559,14 @@ func (e EventCreateTask) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateTask) IsEventCreate() bool {
return true
}
func (e EventCreateTask) IsEventTask() bool {
return true
}
type EventUpdateTask struct {
Task *Task
OldTask *Task
@ -2517,6 +2587,14 @@ func (e EventUpdateTask) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateTask) IsEventUpdate() bool {
return true
}
func (e EventUpdateTask) IsEventTask() bool {
return true
}
type EventDeleteTask struct {
Task *Task
Checks []TaskCheckFunc
@ -2535,6 +2613,15 @@ func (e EventDeleteTask) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteTask) IsEventDelete() bool {
return true
}
func (e EventDeleteTask) IsEventTask() bool {
return true
}
func (m *Task) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -2738,6 +2825,10 @@ func (indexer TaskCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, er
type NetworkCheckFunc func(t1, t2 *Network) bool
type EventNetwork interface {
IsEventNetwork() bool
}
type EventCreateNetwork struct {
Network *Network
Checks []NetworkCheckFunc
@ -2757,6 +2848,14 @@ func (e EventCreateNetwork) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateNetwork) IsEventCreate() bool {
return true
}
func (e EventCreateNetwork) IsEventNetwork() bool {
return true
}
type EventUpdateNetwork struct {
Network *Network
OldNetwork *Network
@ -2777,6 +2876,14 @@ func (e EventUpdateNetwork) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateNetwork) IsEventUpdate() bool {
return true
}
func (e EventUpdateNetwork) IsEventNetwork() bool {
return true
}
type EventDeleteNetwork struct {
Network *Network
Checks []NetworkCheckFunc
@ -2795,6 +2902,15 @@ func (e EventDeleteNetwork) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteNetwork) IsEventDelete() bool {
return true
}
func (e EventDeleteNetwork) IsEventNetwork() bool {
return true
}
func (m *Network) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -2955,6 +3071,10 @@ func (indexer NetworkCustomIndexer) FromObject(obj interface{}) (bool, [][]byte,
type ClusterCheckFunc func(t1, t2 *Cluster) bool
type EventCluster interface {
IsEventCluster() bool
}
type EventCreateCluster struct {
Cluster *Cluster
Checks []ClusterCheckFunc
@ -2974,6 +3094,14 @@ func (e EventCreateCluster) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateCluster) IsEventCreate() bool {
return true
}
func (e EventCreateCluster) IsEventCluster() bool {
return true
}
type EventUpdateCluster struct {
Cluster *Cluster
OldCluster *Cluster
@ -2994,6 +3122,14 @@ func (e EventUpdateCluster) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateCluster) IsEventUpdate() bool {
return true
}
func (e EventUpdateCluster) IsEventCluster() bool {
return true
}
type EventDeleteCluster struct {
Cluster *Cluster
Checks []ClusterCheckFunc
@ -3012,6 +3148,15 @@ func (e EventDeleteCluster) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteCluster) IsEventDelete() bool {
return true
}
func (e EventDeleteCluster) IsEventCluster() bool {
return true
}
func (m *Cluster) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -3172,6 +3317,10 @@ func (indexer ClusterCustomIndexer) FromObject(obj interface{}) (bool, [][]byte,
type SecretCheckFunc func(t1, t2 *Secret) bool
type EventSecret interface {
IsEventSecret() bool
}
type EventCreateSecret struct {
Secret *Secret
Checks []SecretCheckFunc
@ -3191,6 +3340,14 @@ func (e EventCreateSecret) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateSecret) IsEventCreate() bool {
return true
}
func (e EventCreateSecret) IsEventSecret() bool {
return true
}
type EventUpdateSecret struct {
Secret *Secret
OldSecret *Secret
@ -3211,6 +3368,14 @@ func (e EventUpdateSecret) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateSecret) IsEventUpdate() bool {
return true
}
func (e EventUpdateSecret) IsEventSecret() bool {
return true
}
type EventDeleteSecret struct {
Secret *Secret
Checks []SecretCheckFunc
@ -3229,6 +3394,15 @@ func (e EventDeleteSecret) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteSecret) IsEventDelete() bool {
return true
}
func (e EventDeleteSecret) IsEventSecret() bool {
return true
}
func (m *Secret) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -3389,6 +3563,10 @@ func (indexer SecretCustomIndexer) FromObject(obj interface{}) (bool, [][]byte,
type ConfigCheckFunc func(t1, t2 *Config) bool
type EventConfig interface {
IsEventConfig() bool
}
type EventCreateConfig struct {
Config *Config
Checks []ConfigCheckFunc
@ -3408,6 +3586,14 @@ func (e EventCreateConfig) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateConfig) IsEventCreate() bool {
return true
}
func (e EventCreateConfig) IsEventConfig() bool {
return true
}
type EventUpdateConfig struct {
Config *Config
OldConfig *Config
@ -3428,6 +3614,14 @@ func (e EventUpdateConfig) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateConfig) IsEventUpdate() bool {
return true
}
func (e EventUpdateConfig) IsEventConfig() bool {
return true
}
type EventDeleteConfig struct {
Config *Config
Checks []ConfigCheckFunc
@ -3446,6 +3640,15 @@ func (e EventDeleteConfig) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteConfig) IsEventDelete() bool {
return true
}
func (e EventDeleteConfig) IsEventConfig() bool {
return true
}
func (m *Config) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -3606,6 +3809,10 @@ func (indexer ConfigCustomIndexer) FromObject(obj interface{}) (bool, [][]byte,
type ResourceCheckFunc func(t1, t2 *Resource) bool
type EventResource interface {
IsEventResource() bool
}
type EventCreateResource struct {
Resource *Resource
Checks []ResourceCheckFunc
@ -3625,6 +3832,14 @@ func (e EventCreateResource) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateResource) IsEventCreate() bool {
return true
}
func (e EventCreateResource) IsEventResource() bool {
return true
}
type EventUpdateResource struct {
Resource *Resource
OldResource *Resource
@ -3645,6 +3860,14 @@ func (e EventUpdateResource) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateResource) IsEventUpdate() bool {
return true
}
func (e EventUpdateResource) IsEventResource() bool {
return true
}
type EventDeleteResource struct {
Resource *Resource
Checks []ResourceCheckFunc
@ -3663,6 +3886,15 @@ func (e EventDeleteResource) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteResource) IsEventDelete() bool {
return true
}
func (e EventDeleteResource) IsEventResource() bool {
return true
}
func (m *Resource) CopyStoreObject() StoreObject {
return m.Copy()
}
@ -3829,6 +4061,10 @@ func (indexer ResourceCustomIndexer) FromObject(obj interface{}) (bool, [][]byte
type ExtensionCheckFunc func(t1, t2 *Extension) bool
type EventExtension interface {
IsEventExtension() bool
}
type EventCreateExtension struct {
Extension *Extension
Checks []ExtensionCheckFunc
@ -3848,6 +4084,14 @@ func (e EventCreateExtension) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventCreateExtension) IsEventCreate() bool {
return true
}
func (e EventCreateExtension) IsEventExtension() bool {
return true
}
type EventUpdateExtension struct {
Extension *Extension
OldExtension *Extension
@ -3868,6 +4112,14 @@ func (e EventUpdateExtension) Matches(apiEvent go_events.Event) bool {
return true
}
func (e EventUpdateExtension) IsEventUpdate() bool {
return true
}
func (e EventUpdateExtension) IsEventExtension() bool {
return true
}
type EventDeleteExtension struct {
Extension *Extension
Checks []ExtensionCheckFunc
@ -3886,6 +4138,15 @@ func (e EventDeleteExtension) Matches(apiEvent go_events.Event) bool {
}
return true
}
func (e EventDeleteExtension) IsEventDelete() bool {
return true
}
func (e EventDeleteExtension) IsEventExtension() bool {
return true
}
func (m *Extension) CopyStoreObject() StoreObject {
return m.Copy()
}

View file

@ -38,6 +38,21 @@ type Event interface {
Matches(events.Event) bool
}
// EventCreate is an interface implemented by every creation event type
type EventCreate interface {
IsEventCreate() bool
}
// EventUpdate is an interface impelemented by every update event type
type EventUpdate interface {
IsEventUpdate() bool
}
// EventDelete is an interface implemented by every delete event type
type EventDelete interface {
IsEventDelete()
}
func customIndexer(kind string, annotations *Annotations) (bool, [][]byte, error) {
var converted [][]byte

View file

@ -1145,11 +1145,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()
// Its OK to call isRunning() here instead of isRunningLocked()
// because of the rpcRW readlock above.
// TODO(anshul) other uses of isRunningLocked() can probably
// also be removed.
if !d.isRunning() {
// TODO(anshul) Explore if its possible to check context here without locking.
if _, err := d.isRunningLocked(); err != nil {
return nil, status.Errorf(codes.Aborted, "dispatcher is stopped")
}

View file

@ -5,6 +5,7 @@ import (
"strings"
"github.com/docker/go-events"
metrics "github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/store"
@ -12,14 +13,28 @@ import (
var (
ns = metrics.NewNamespace("swarm", "manager", nil)
// counts of the various objects in swarmkit
nodesMetric metrics.LabeledGauge
tasksMetric metrics.LabeledGauge
// none of these objects have state, so they're just regular gauges
servicesMetric metrics.Gauge
networksMetric metrics.Gauge
secretsMetric metrics.Gauge
configsMetric metrics.Gauge
)
func init() {
nodesMetric = ns.NewLabeledGauge("nodes", "The number of nodes", "", "state")
for _, state := range api.NodeStatus_State_name {
nodesMetric.WithValues(strings.ToLower(state)).Set(0)
}
tasksMetric = ns.NewLabeledGauge("tasks", "The number of tasks in the cluster object store", metrics.Total, "state")
servicesMetric = ns.NewGauge("services", "The number of services in the cluster object store", metrics.Total)
networksMetric = ns.NewGauge("networks", "The number of networks in the cluster object store", metrics.Total)
secretsMetric = ns.NewGauge("secrets", "The number of secrets in the cluster object store", metrics.Total)
configsMetric = ns.NewGauge("configs", "The number of configs in the cluster object store", metrics.Total)
resetMetrics()
metrics.Register(ns)
}
@ -42,20 +57,6 @@ func NewCollector(store *store.MemoryStore) *Collector {
}
}
func (c *Collector) updateNodeState(prevNode, newNode *api.Node) {
// Skip updates if nothing changed.
if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State {
return
}
if prevNode != nil {
nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1)
}
if newNode != nil {
nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1)
}
}
// Run contains the collector event loop
func (c *Collector) Run(ctx context.Context) error {
defer close(c.doneChan)
@ -65,9 +66,46 @@ func (c *Collector) Run(ctx context.Context) error {
if err != nil {
return err
}
for _, node := range nodes {
c.updateNodeState(nil, node)
tasks, err := store.FindTasks(readTx, store.All)
if err != nil {
return err
}
services, err := store.FindServices(readTx, store.All)
if err != nil {
return err
}
networks, err := store.FindNetworks(readTx, store.All)
if err != nil {
return err
}
secrets, err := store.FindSecrets(readTx, store.All)
if err != nil {
return err
}
configs, err := store.FindConfigs(readTx, store.All)
if err != nil {
return err
}
for _, obj := range nodes {
c.handleEvent(obj.EventCreate())
}
for _, obj := range tasks {
c.handleEvent(obj.EventCreate())
}
for _, obj := range services {
c.handleEvent(obj.EventCreate())
}
for _, obj := range networks {
c.handleEvent(obj.EventCreate())
}
for _, obj := range secrets {
c.handleEvent(obj.EventCreate())
}
for _, obj := range configs {
c.handleEvent(obj.EventCreate())
}
return nil
})
if err != nil {
@ -78,14 +116,7 @@ func (c *Collector) Run(ctx context.Context) error {
for {
select {
case event := <-watcher:
switch v := event.(type) {
case api.EventCreateNode:
c.updateNodeState(nil, v.Node)
case api.EventUpdateNode:
c.updateNodeState(v.OldNode, v.Node)
case api.EventDeleteNode:
c.updateNodeState(v.Node, nil)
}
c.handleEvent(event)
case <-c.stopChan:
return nil
}
@ -98,7 +129,131 @@ func (c *Collector) Stop() {
<-c.doneChan
// Clean the metrics on exit.
resetMetrics()
}
// resetMetrics resets all metrics to their default (base) value
func resetMetrics() {
for _, state := range api.NodeStatus_State_name {
nodesMetric.WithValues(strings.ToLower(state)).Set(0)
}
for _, state := range api.TaskState_name {
tasksMetric.WithValues(strings.ToLower(state)).Set(0)
}
servicesMetric.Set(0)
networksMetric.Set(0)
secretsMetric.Set(0)
configsMetric.Set(0)
}
// handleEvent handles a single incoming cluster event.
func (c *Collector) handleEvent(event events.Event) {
switch event.(type) {
case api.EventNode:
c.handleNodeEvent(event)
case api.EventTask:
c.handleTaskEvent(event)
case api.EventService:
c.handleServiceEvent(event)
case api.EventNetwork:
c.handleNetworkEvent(event)
case api.EventSecret:
c.handleSecretsEvent(event)
case api.EventConfig:
c.handleConfigsEvent(event)
}
}
func (c *Collector) handleNodeEvent(event events.Event) {
var prevNode, newNode *api.Node
switch v := event.(type) {
case api.EventCreateNode:
prevNode, newNode = nil, v.Node
case api.EventUpdateNode:
prevNode, newNode = v.OldNode, v.Node
case api.EventDeleteNode:
prevNode, newNode = v.Node, nil
}
// Skip updates if nothing changed.
if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State {
return
}
if prevNode != nil {
nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1)
}
if newNode != nil {
nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1)
}
return
}
func (c *Collector) handleTaskEvent(event events.Event) {
var prevTask, newTask *api.Task
switch v := event.(type) {
case api.EventCreateTask:
prevTask, newTask = nil, v.Task
case api.EventUpdateTask:
prevTask, newTask = v.OldTask, v.Task
case api.EventDeleteTask:
prevTask, newTask = v.Task, nil
}
// Skip updates if nothing changed.
if prevTask != nil && newTask != nil && prevTask.Status.State == newTask.Status.State {
return
}
if prevTask != nil {
tasksMetric.WithValues(
strings.ToLower(prevTask.Status.State.String()),
).Dec(1)
}
if newTask != nil {
tasksMetric.WithValues(
strings.ToLower(newTask.Status.State.String()),
).Inc(1)
}
return
}
func (c *Collector) handleServiceEvent(event events.Event) {
switch event.(type) {
case api.EventCreateService:
servicesMetric.Inc(1)
case api.EventDeleteService:
servicesMetric.Dec(1)
}
}
func (c *Collector) handleNetworkEvent(event events.Event) {
switch event.(type) {
case api.EventCreateNetwork:
networksMetric.Inc(1)
case api.EventDeleteNetwork:
networksMetric.Dec(1)
}
}
func (c *Collector) handleSecretsEvent(event events.Event) {
switch event.(type) {
case api.EventCreateSecret:
secretsMetric.Inc(1)
case api.EventDeleteSecret:
secretsMetric.Dec(1)
}
}
func (c *Collector) handleConfigsEvent(event events.Event) {
switch event.(type) {
case api.EventCreateConfig:
configsMetric.Inc(1)
case api.EventDeleteConfig:
configsMetric.Dec(1)
}
}

View file

@ -40,6 +40,12 @@ type TaskReaper struct {
cleanup []string
stopChan chan struct{}
doneChan chan struct{}
// tickSignal is a channel that, if non-nil and available, will be written
// to to signal that a tick has occurred. its sole purpose is for testing
// code, to verify that take cleanup attempts are happening when they
// should be.
tickSignal chan struct{}
}
// New creates a new TaskReaper.
@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) {
// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
// whichever happens first.
//
// Specifically, the way this should work:
// - Create a timer and immediately stop it. We don't want to fire the
// cleanup routine yet, because we just did a cleanup as part of the
// initialization above.
// - Launch into an event loop
// - When we receive an event, handle the event as needed
// - After receiving the event:
// - If minimum batch size (maxDirty) is exceeded with dirty + cleanup,
// then immediately launch into the cleanup routine
// - Otherwise, if the timer is stopped, start it (reset).
// - If the timer expires and the timer channel is signaled, then Stop the
// timer (so that it will be ready to be started again as needed), and
// execute the cleanup routine (tick)
timer := time.NewTimer(reaperBatchingInterval)
timer.Stop()
// If stop is somehow called AFTER the timer has expired, there will be a
// value in the timer.C channel. If there is such a value, we should drain
// it out. This select statement allows us to drain that value if it's
// present, or continue straight through otherwise.
select {
case <-timer.C:
default:
}
// keep track with a boolean of whether the timer is currently stopped
isTimerStopped := true
// Watch for:
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
@ -153,16 +186,35 @@ func (tr *TaskReaper) Run(ctx context.Context) {
}
if len(tr.dirty)+len(tr.cleanup) > maxDirty {
// stop the timer, so we don't fire it. if we get another event
// after we do this cleaning, we will reset the timer then
timer.Stop()
// if the timer had fired, drain out the value.
select {
case <-timer.C:
default:
}
isTimerStopped = true
tr.tick()
} else {
if isTimerStopped {
timer.Reset(reaperBatchingInterval)
isTimerStopped = false
}
}
case <-timer.C:
timer.Stop()
// we can safely ignore draining off of the timer channel, because
// we already know that the timer has expired.
isTimerStopped = true
tr.tick()
case <-tr.stopChan:
// even though this doesn't really matter in this context, it's
// good hygiene to drain the value.
timer.Stop()
select {
case <-timer.C:
default:
}
return
}
}
@ -170,6 +222,16 @@ func (tr *TaskReaper) Run(ctx context.Context) {
// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
// this signals that a tick has occurred. it exists solely for testing.
if tr.tickSignal != nil {
// try writing to this channel, but if it's full, fall straight through
// and ignore it.
select {
case tr.tickSignal <- struct{}{}:
default:
}
}
if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
return
}
@ -184,10 +246,24 @@ func (tr *TaskReaper) tick() {
}
// Check history of dirty tasks for cleanup.
// Note: Clean out the dirty set at the end of this tick iteration
// in all but one scenarios (documented below).
// When tick() finishes, the tasks in the slot were either cleaned up,
// or it was skipped because it didn't meet the criteria for cleaning.
// Either way, we can discard the dirty set because future events on
// that slot will cause the task to be readded to the dirty set
// at that point.
//
// The only case when we keep the slot dirty is when there are more
// than one running tasks present for a given slot.
// In that case, we need to keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more the tasks
// in that slot have stopped running.
tr.store.View(func(tx store.ReadTx) {
for dirty := range tr.dirty {
service := store.GetService(tx, dirty.ServiceID)
if service == nil {
delete(tr.dirty, dirty)
continue
}
@ -211,6 +287,7 @@ func (tr *TaskReaper) tick() {
// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
if taskHistory < 0 {
delete(tr.dirty, dirty)
continue
}
@ -240,6 +317,7 @@ func (tr *TaskReaper) tick() {
}
if int64(len(historicTasks)) <= taskHistory {
delete(tr.dirty, dirty)
continue
}
@ -270,6 +348,12 @@ func (tr *TaskReaper) tick() {
}
}
// The only case when we keep the slot dirty at the end of tick()
// is when there are more than one running tasks present
// for a given slot.
// In that case, we keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more of
// the tasks in that slot have stopped running.
if runningTasks <= 1 {
delete(tr.dirty, dirty)
}