diff --git a/hack/vendor.sh b/hack/vendor.sh index 665aa079cea889b52b73c82ae5b8483f2ec18aef..7bbee137280937acfad4b2fe6f56c7cd8ed345db 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267 # cluster -clone git github.com/docker/swarmkit bfbec9f2b6a487100a80027f6ee16cc0c646a8f5 +clone git github.com/docker/swarmkit de507ff6b0ee99002d56a784e095c753eab1ad61 clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028 clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b diff --git a/vendor/src/github.com/docker/swarmkit/agent/agent.go b/vendor/src/github.com/docker/swarmkit/agent/agent.go index cd1b61dd7e5693c71007d11a0d0bcc394dbae7a9..a85eafcf3bc0727026501258f29ab32f1b1e2ddf 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/src/github.com/docker/swarmkit/agent/agent.go @@ -319,7 +319,8 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api if err == errTaskUnknown { err = nil // dispatcher no longer cares about this task. } else { - log.G(ctx).WithError(err).Error("sending task status update failed") + log.G(ctx).WithError(err).Error("closing session after fatal error") + session.close() } } else { log.G(ctx).Debug("task status reported") diff --git a/vendor/src/github.com/docker/swarmkit/agent/session.go b/vendor/src/github.com/docker/swarmkit/agent/session.go index d091a560786a99332f0b57b12667aa0d3463445b..6ad6a4f22ed65738fbe1614ef694d62fd1672bbb 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/session.go +++ b/vendor/src/github.com/docker/swarmkit/agent/session.go @@ -2,6 +2,7 @@ package agent import ( "errors" + "sync" "time" "github.com/docker/swarmkit/api" @@ -40,6 +41,7 @@ type session struct { registered chan struct{} // closed registration closed chan struct{} + closeOnce sync.Once } func newSession(ctx context.Context, agent *Agent, delay time.Duration) *session { @@ -300,15 +302,14 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa } func (s *session) close() error { - select { - case <-s.closed: - return errSessionClosed - default: + s.closeOnce.Do(func() { if s.conn != nil { s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight) s.conn.Close() } + close(s.closed) - return nil - } + }) + + return nil } diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/network.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/network.go index 44ab90595e6a829847b6be46a06b66a2f5917fb3..1ff6a02a1ae5f8ea70904861daf2c1cbd4315154 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/allocator/network.go +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/network.go @@ -552,7 +552,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node * func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error { if s.Spec.Endpoint != nil { + // service has user-defined endpoint if s.Endpoint == nil { + // service currently has no allocated endpoint, need allocated. s.Endpoint = &api.Endpoint{ Spec: s.Spec.Endpoint.Copy(), } @@ -575,6 +577,12 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s * &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID}) } } + } else if s.Endpoint != nil { + // service has no user-defined endpoints while has already allocated network resources, + // need deallocated. + if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil { + return err + } } if err := nc.nwkAllocator.ServiceAllocate(s); err != nil { diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go index 10843b73e407630b774e9c123fcb50b7f90f9c60..cbc12575f102c646d253ec04e37be5bb15368299 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go @@ -155,7 +155,18 @@ func (pa *portAllocator) serviceDeallocatePorts(s *api.Service) { } func (pa *portAllocator) isPortsAllocated(s *api.Service) bool { - if s.Endpoint == nil { + // If service has no user-defined endpoint and allocated endpoint, + // we assume it is allocated and return true. + if s.Endpoint == nil && s.Spec.Endpoint == nil { + return true + } + + // If service has allocated endpoint while has no user-defined endpoint, + // we assume allocated endpoints are redudant, and they need deallocated. + // If service has no allocated endpoint while has user-defined endpoint, + // we assume it is not allocated. + if (s.Endpoint != nil && s.Spec.Endpoint == nil) || + (s.Endpoint == nil && s.Spec.Endpoint != nil) { return false } diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go index 13cfe2ed1dd589e54bb573fa19e25bdae2397630..e7bfe172834c3e24fb66f7f3f03b43f362040054 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go @@ -149,13 +149,13 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error { return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: ports can't be used with dnsrr mode") } - portSet := make(map[api.PortConfig]struct{}) + portSet := make(map[uint32]struct{}) for _, port := range epSpec.Ports { - if _, ok := portSet[*port]; ok { - return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: duplicate ports provided") + if _, ok := portSet[port.PublishedPort]; ok { + return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: duplicate published ports provided") } - portSet[*port] = struct{}{} + portSet[port.PublishedPort] = struct{}{} } return nil diff --git a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go index 8bee4776b6fb68b1828b9d2119bc2dcd7c1894a0..c917087c03bb7018bc5d07a06255526f577b34ea 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go +++ b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go @@ -44,7 +44,7 @@ func (u *UpdateSupervisor) Update(ctx context.Context, cluster *api.Cluster, ser id := service.ID if update, ok := u.updates[id]; ok { - if !update.isServiceDirty(service) { + if reflect.DeepEqual(service.Spec, update.newService.Spec) { // There's already an update working towards this goal. return } @@ -297,11 +297,6 @@ func (u *Updater) isTaskDirty(t *api.Task) bool { (t.Endpoint != nil && !reflect.DeepEqual(u.newService.Spec.Endpoint, t.Endpoint.Spec)) } -func (u *Updater) isServiceDirty(service *api.Service) bool { - return !reflect.DeepEqual(u.newService.Spec.Task, service.Spec.Task) || - !reflect.DeepEqual(u.newService.Spec.Endpoint, service.Spec.Endpoint) -} - func (u *Updater) startUpdate(ctx context.Context, serviceID string) { err := u.store.Update(func(tx store.Tx) error { service := store.GetService(tx, serviceID) diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go index fcc633127f585662d0cc25aaf24dd9bc44cf6386..cbda2c1db2b1da8eb90d368d11ead91e84dc875b 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -363,39 +363,55 @@ func (n *Node) Run(ctx context.Context) error { n.confState = rd.Snapshot.Metadata.ConfState } - // Process committed entries - for _, entry := range rd.CommittedEntries { - if err := n.processCommitted(entry); err != nil { - n.Config.Logger.Error(err) - } - } + // If we cease to be the leader, we must cancel any + // proposals that are currently waiting for a quorum to + // acknowledge them. It is still possible for these to + // become committed, but if that happens we will apply + // them as any follower would. - // Trigger a snapshot every once in awhile - if n.snapshotInProgress == nil && - raftConfig.SnapshotInterval > 0 && - n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval { - n.doSnapshot(&raftConfig) - } + // It is important that we cancel these proposals before + // calling processCommitted, so processCommitted does + // not deadlock. - // If we cease to be the leader, we must cancel - // any proposals that are currently waiting for - // a quorum to acknowledge them. It is still - // possible for these to become committed, but - // if that happens we will apply them as any - // follower would. if rd.SoftState != nil { if wasLeader && rd.SoftState.RaftState != raft.StateLeader { wasLeader = false - n.wait.cancelAll() if atomic.LoadUint32(&n.signalledLeadership) == 1 { atomic.StoreUint32(&n.signalledLeadership, 0) n.leadershipBroadcast.Write(IsFollower) } + + // It is important that we set n.signalledLeadership to 0 + // before calling n.wait.cancelAll. When a new raft + // request is registered, it checks n.signalledLeadership + // afterwards, and cancels the registration if it is 0. + // If cancelAll was called first, this call might run + // before the new request registers, but + // signalledLeadership would be set after the check. + // Setting signalledLeadership before calling cancelAll + // ensures that if a new request is registered during + // this transition, it will either be cancelled by + // cancelAll, or by its own check of signalledLeadership. + n.wait.cancelAll() } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader { wasLeader = true } } + // Process committed entries + for _, entry := range rd.CommittedEntries { + if err := n.processCommitted(entry); err != nil { + n.Config.Logger.Error(err) + } + } + + // Trigger a snapshot every once in awhile + if n.snapshotInProgress == nil && + raftConfig.SnapshotInterval > 0 && + n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval { + n.doSnapshot(&raftConfig) + } + if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 { // If all the entries in the log have become // committed, broadcast our leadership status. @@ -1129,7 +1145,11 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa r.ID = n.reqIDGen.Next() - ch := n.wait.register(r.ID, cb) + // This must be derived from the context which is cancelled by stop() + // to avoid a deadlock on shutdown. + waitCtx, cancel := context.WithCancel(n.Ctx) + + ch := n.wait.register(r.ID, cb, cancel) // Do this check after calling register to avoid a race. if atomic.LoadUint32(&n.signalledLeadership) != 1 { @@ -1148,24 +1168,19 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa return nil, ErrRequestTooLarge } - // This must use the context which is cancelled by stop() to avoid a - // deadlock on shutdown. - err = n.Propose(n.Ctx, data) + err = n.Propose(waitCtx, data) if err != nil { n.wait.cancel(r.ID) return nil, err } select { - case x, ok := <-ch: - if ok { - res := x.(*applyResult) - return res.resp, res.err - } - return nil, ErrLostLeadership - case <-n.Ctx.Done(): + case x := <-ch: + res := x.(*applyResult) + return res.resp, res.err + case <-waitCtx.Done(): n.wait.cancel(r.ID) - return nil, ErrStopped + return nil, ErrLostLeadership case <-ctx.Done(): n.wait.cancel(r.ID) return nil, ctx.Err() @@ -1177,10 +1192,12 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa // until the change is performed or there is an error. func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error { cc.ID = n.reqIDGen.Next() - ch := n.wait.register(cc.ID, nil) + + ctx, cancel := context.WithCancel(ctx) + ch := n.wait.register(cc.ID, nil, cancel) if err := n.ProposeConfChange(ctx, cc); err != nil { - n.wait.trigger(cc.ID, nil) + n.wait.cancel(cc.ID) return err } @@ -1194,7 +1211,7 @@ func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error { } return nil case <-ctx.Done(): - n.wait.trigger(cc.ID, nil) + n.wait.cancel(cc.ID) return ctx.Err() case <-n.Ctx.Done(): return ErrStopped @@ -1237,6 +1254,11 @@ func (n *Node) processEntry(entry raftpb.Entry) error { // position and cancelling the transaction. Create a new // transaction to commit the data. + // It should not be possible for processInternalRaftRequest + // to be running in this situation, but out of caution we + // cancel any current invocations to avoid a deadlock. + n.wait.cancelAll() + err := n.memoryStore.ApplyStoreActions(r.Action) if err != nil { log.G(context.Background()).Errorf("error applying actions from raft: %v", err) diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/wait.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/wait.go index 297f0cf970014b83dbb7a6e70f65104460ce80d7..ecd39284c450babc1aad9e95a08a4aee2c070c84 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/wait.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/wait.go @@ -10,6 +10,8 @@ type waitItem struct { ch chan interface{} // callback which is called synchronously when the wait is triggered cb func() + // callback which is called to cancel a waiter + cancel func() } type wait struct { @@ -21,13 +23,13 @@ func newWait() *wait { return &wait{m: make(map[uint64]waitItem)} } -func (w *wait) register(id uint64, cb func()) <-chan interface{} { +func (w *wait) register(id uint64, cb func(), cancel func()) <-chan interface{} { w.l.Lock() defer w.l.Unlock() _, ok := w.m[id] if !ok { ch := make(chan interface{}, 1) - w.m[id] = waitItem{ch: ch, cb: cb} + w.m[id] = waitItem{ch: ch, cb: cb, cancel: cancel} return ch } panic(fmt.Sprintf("duplicate id %x", id)) @@ -43,7 +45,6 @@ func (w *wait) trigger(id uint64, x interface{}) bool { waitItem.cb() } waitItem.ch <- x - close(waitItem.ch) return true } return false @@ -54,8 +55,8 @@ func (w *wait) cancel(id uint64) { waitItem, ok := w.m[id] delete(w.m, id) w.l.Unlock() - if ok { - close(waitItem.ch) + if ok && waitItem.cancel != nil { + waitItem.cancel() } } @@ -65,6 +66,8 @@ func (w *wait) cancelAll() { for id, waitItem := range w.m { delete(w.m, id) - close(waitItem.ch) + if waitItem.cancel != nil { + waitItem.cancel() + } } }