Merge pull request #31742 from aaronlehmann/vendor-swarmkit-f93948c

[17.03.x] Vendor swarmkit f93948c
This commit is contained in:
Victor Vieux 2017-03-10 14:10:41 -08:00 committed by GitHub
commit 3c3f3204bf
3 changed files with 68 additions and 36 deletions

View file

@ -101,7 +101,7 @@ github.com/docker/containerd 595e75c212d19a81d2b808a518fe1afc1391dad5
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 1f3e4e67eeac60456460a270179711d0808129f9
github.com/docker/swarmkit f93948cb430facd540e3c65c606384a89b3ac40f
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -26,7 +26,11 @@ const (
allocatedStatusMessage = "pending task scheduling"
)
var errNoChanges = errors.New("task unchanged")
var (
errNoChanges = errors.New("task unchanged")
retryInterval = 5 * time.Minute
)
func newIngressNetwork() *api.Network {
return &api.Network{
@ -57,19 +61,28 @@ type networkContext struct {
// the actual network allocation.
nwkAllocator *networkallocator.NetworkAllocator
// A table of unallocated tasks which will be revisited if any thing
// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
pendingTasks map[string]*api.Task
// A set of unallocated tasks which will be revisited if any thing
// changes in system state that might help task allocation.
unallocatedTasks map[string]*api.Task
// A table of unallocated services which will be revisited if
// A set of unallocated services which will be revisited if
// any thing changes in system state that might help service
// allocation.
unallocatedServices map[string]*api.Service
// A table of unallocated networks which will be revisited if
// A set of unallocated networks which will be revisited if
// any thing changes in system state that might help network
// allocation.
unallocatedNetworks map[string]*api.Network
// lastRetry is the last timestamp when unallocated
// tasks/services/networks were retried.
lastRetry time.Time
}
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
nc := &networkContext{
nwkAllocator: na,
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
lastRetry: time.Now(),
}
a.netCtx = nc
defer func() {
@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
}
for _, t := range tasks {
if taskDead(t) {
if t.Status.State > api.TaskStateRunning {
continue
}
@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
if err := nc.nwkAllocator.Deallocate(n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
}
delete(nc.unallocatedNetworks, n.ID)
case state.EventCreateService:
s := v.Service.Copy()
@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
return a.commitAllocatedService(ctx, batch, s)
}); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
nc.unallocatedServices[s.ID] = s
} else {
delete(nc.unallocatedServices, s.ID)
}
case state.EventDeleteService:
s := v.Service.Copy()
@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
a.procTasksNetwork(ctx, false)
if time.Since(nc.lastRetry) > retryInterval {
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procTasksNetwork(ctx, true)
nc.lastRetry = time.Now()
}
// Any left over tasks are moved to the unallocated set
for _, t := range nc.pendingTasks {
nc.unallocatedTasks[t.ID] = t
}
nc.pendingTasks = make(map[string]*api.Task)
}
}
@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
}
}
// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
}
// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
func taskDead(t *api.Task) bool {
return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning
}
// taskReadyForNetworkVote checks if the task is ready for a network
// vote to move it to PENDING state.
func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
nc := a.netCtx
// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
if taskDead(t) || isDelete {
// If the task has stopped running then we should free the network
// resources associated with the task right away.
if t.Status.State > api.TaskStateRunning || isDelete {
if nc.nwkAllocator.IsTaskAllocated(t) {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
}
}
// Cleanup any task references that might exist in unallocatedTasks
// Cleanup any task references that might exist
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// If we are already in allocated state, there is
// absolutely nothing else to do.
if t.Status.State >= api.TaskStatePending {
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// available in store. But we still need to
// cleanup network resources associated with
// the task.
if taskRunning(t) && !isDelete {
if t.Status.State <= api.TaskStateRunning && !isDelete {
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
return
}
@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// based on service spec.
a.taskCreateNetworkAttachments(t, s)
nc.unallocatedTasks[t.ID] = t
nc.pendingTasks[t.ID] = t
}
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
}
}
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
nc := a.netCtx
allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
quiet := false
toAllocate := nc.pendingTasks
if onRetry {
toAllocate = nc.unallocatedTasks
quiet = true
}
allocatedTasks := make([]*api.Task, 0, len(toAllocate))
for _, t := range nc.unallocatedTasks {
for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
log.G(ctx).WithError(err).Error("task allocation failure")
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
}
}
@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
})
if err != nil {
log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
}
for _, t := range allocatedTasks[:committed] {
delete(nc.unallocatedTasks, t.ID)
delete(toAllocate, t.ID)
}
}

View file

@ -167,8 +167,10 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe
return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err)
}
if len(tasks) != 0 {
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID)
for _, t := range tasks {
if t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, t.ID)
}
}
nw := store.GetNetwork(tx, request.NetworkID)