diff --git a/vendor.conf b/vendor.conf index 5b2b04c90b..a740458cbf 100644 --- a/vendor.conf +++ b/vendor.conf @@ -101,7 +101,7 @@ github.com/docker/containerd 595e75c212d19a81d2b808a518fe1afc1391dad5 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 1f3e4e67eeac60456460a270179711d0808129f9 +github.com/docker/swarmkit f93948cb430facd540e3c65c606384a89b3ac40f github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/network.go b/vendor/github.com/docker/swarmkit/manager/allocator/network.go index e00b7de42c..f8c6a61ac5 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/network.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/network.go @@ -26,7 +26,11 @@ const ( allocatedStatusMessage = "pending task scheduling" ) -var errNoChanges = errors.New("task unchanged") +var ( + errNoChanges = errors.New("task unchanged") + + retryInterval = 5 * time.Minute +) func newIngressNetwork() *api.Network { return &api.Network{ @@ -57,19 +61,28 @@ type networkContext struct { // the actual network allocation. nwkAllocator *networkallocator.NetworkAllocator - // A table of unallocated tasks which will be revisited if any thing + // A set of tasks which are ready to be allocated as a batch. This is + // distinct from "unallocatedTasks" which are tasks that failed to + // allocate on the first try, being held for a future retry. + pendingTasks map[string]*api.Task + + // A set of unallocated tasks which will be revisited if any thing // changes in system state that might help task allocation. unallocatedTasks map[string]*api.Task - // A table of unallocated services which will be revisited if + // A set of unallocated services which will be revisited if // any thing changes in system state that might help service // allocation. unallocatedServices map[string]*api.Service - // A table of unallocated networks which will be revisited if + // A set of unallocated networks which will be revisited if // any thing changes in system state that might help network // allocation. unallocatedNetworks map[string]*api.Network + + // lastRetry is the last timestamp when unallocated + // tasks/services/networks were retried. + lastRetry time.Time } func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), unallocatedNetworks: make(map[string]*api.Network), ingressNetwork: newIngressNetwork(), + lastRetry: time.Now(), } a.netCtx = nc defer func() { @@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { } for _, t := range tasks { - if taskDead(t) { + if t.Status.State > api.TaskStateRunning { continue } @@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if err := nc.nwkAllocator.Deallocate(n); err != nil { log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID) } + + delete(nc.unallocatedNetworks, n.ID) case state.EventCreateService: s := v.Service.Copy() @@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { return a.commitAllocatedService(ctx, batch, s) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID) + nc.unallocatedServices[s.ID] = s + } else { + delete(nc.unallocatedServices, s.ID) } case state.EventDeleteService: s := v.Service.Copy() @@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask: a.doTaskAlloc(ctx, ev) case state.EventCommit: - a.procUnallocatedNetworks(ctx) - a.procUnallocatedServices(ctx) - a.procUnallocatedTasksNetwork(ctx) - return + a.procTasksNetwork(ctx, false) + + if time.Since(nc.lastRetry) > retryInterval { + a.procUnallocatedNetworks(ctx) + a.procUnallocatedServices(ctx) + a.procTasksNetwork(ctx, true) + nc.lastRetry = time.Now() + } + + // Any left over tasks are moved to the unallocated set + for _, t := range nc.pendingTasks { + nc.unallocatedTasks[t.ID] = t + } + nc.pendingTasks = make(map[string]*api.Task) } } @@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { } } -// taskRunning checks whether a task is either actively running, or in the -// process of starting up. -func taskRunning(t *api.Task) bool { - return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning -} - -// taskDead checks whether a task is not actively running as far as allocator purposes are concerned. -func taskDead(t *api.Task) bool { - return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning -} - // taskReadyForNetworkVote checks if the task is ready for a network // vote to move it to PENDING state. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool { @@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { nc := a.netCtx - // If the task has stopped running or it's being deleted then - // we should free the network resources associated with the - // task right away. - if taskDead(t) || isDelete { + // If the task has stopped running then we should free the network + // resources associated with the task right away. + if t.Status.State > api.TaskStateRunning || isDelete { if nc.nwkAllocator.IsTaskAllocated(t) { if err := nc.nwkAllocator.DeallocateTask(t); err != nil { log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID) } } - // Cleanup any task references that might exist in unallocatedTasks + // Cleanup any task references that might exist + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // If we are already in allocated state, there is // absolutely nothing else to do. if t.Status.State >= api.TaskStatePending { + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // available in store. But we still need to // cleanup network resources associated with // the task. - if taskRunning(t) && !isDelete { + if t.Status.State <= api.TaskStateRunning && !isDelete { log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID) return } @@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // based on service spec. a.taskCreateNetworkAttachments(t, s) - nc.unallocatedTasks[t.ID] = t + nc.pendingTasks[t.ID] = t } func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error { @@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { } } -func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { +func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { nc := a.netCtx - allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks)) + quiet := false + toAllocate := nc.pendingTasks + if onRetry { + toAllocate = nc.unallocatedTasks + quiet = true + } + allocatedTasks := make([]*api.Task, 0, len(toAllocate)) - for _, t := range nc.unallocatedTasks { + for _, t := range toAllocate { if err := a.allocateTask(ctx, t); err == nil { allocatedTasks = append(allocatedTasks, t) } else if err != errNoChanges { - log.G(ctx).WithError(err).Error("task allocation failure") + if quiet { + log.G(ctx).WithError(err).Debug("task allocation failure") + } else { + log.G(ctx).WithError(err).Error("task allocation failure") + } } } @@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { }) if err != nil { - log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks") + log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks") } for _, t := range allocatedTasks[:committed] { - delete(nc.unallocatedTasks, t.ID) + delete(toAllocate, t.ID) } } diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/network.go b/vendor/github.com/docker/swarmkit/manager/controlapi/network.go index 00634f184b..b5cd25fd07 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/network.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/network.go @@ -167,8 +167,10 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err) } - if len(tasks) != 0 { - return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID) + for _, t := range tasks { + if t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning { + return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, t.ID) + } } nw := store.GetNetwork(tx, request.NetworkID)