diff --git a/vendor.conf b/vendor.conf index f5ab87bae6..583414f988 100644 --- a/vendor.conf +++ b/vendor.conf @@ -103,7 +103,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 62d835f478b2e4fd2768deb88fb3b32e334faaee +github.com/docker/swarmkit 98620dd1ddfcc03d8f4b0d2910ecf6b52918a731 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/agent/worker.go b/vendor/github.com/docker/swarmkit/agent/worker.go index d67513b2f3..74f6e040e0 100644 --- a/vendor/github.com/docker/swarmkit/agent/worker.go +++ b/vendor/github.com/docker/swarmkit/agent/worker.go @@ -148,7 +148,7 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange // Update updates the set of tasks and secret for the worker. // Tasks in the added set will be added to the worker, and tasks in the removed set // will be removed from the worker -// Serets in the added set will be added to the worker, and secrets in the removed set +// Secrets in the added set will be added to the worker, and secrets in the removed set // will be removed from the worker. func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error { w.mu.Lock() diff --git a/vendor/github.com/docker/swarmkit/api/specs.pb.go b/vendor/github.com/docker/swarmkit/api/specs.pb.go index efe7d1a147..1754465653 100644 --- a/vendor/github.com/docker/swarmkit/api/specs.pb.go +++ b/vendor/github.com/docker/swarmkit/api/specs.pb.go @@ -140,7 +140,12 @@ type ServiceSpec struct { // *ServiceSpec_Global Mode isServiceSpec_Mode `protobuf_oneof:"mode"` // UpdateConfig controls the rate and policy of updates. - Update *UpdateConfig `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"` + Update *UpdateConfig `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"` + // ServiceSpec.Networks has been deprecated and is replaced by + // Networks field in Task (TaskSpec.Networks). + // This field (ServiceSpec.Networks) is kept for compatibility. + // In case TaskSpec.Networks does not exist, ServiceSpec.Networks + // is still honored if it exists. Networks []*NetworkAttachmentConfig `protobuf:"bytes,7,rep,name=networks" json:"networks,omitempty"` // Service endpoint specifies the user provided configuration // to properly discover and load balance a service. diff --git a/vendor/github.com/docker/swarmkit/api/specs.proto b/vendor/github.com/docker/swarmkit/api/specs.proto index 4b91395599..a601e9007f 100644 --- a/vendor/github.com/docker/swarmkit/api/specs.proto +++ b/vendor/github.com/docker/swarmkit/api/specs.proto @@ -72,6 +72,11 @@ message ServiceSpec { // UpdateConfig controls the rate and policy of updates. UpdateConfig update = 6; + // ServiceSpec.Networks has been deprecated and is replaced by + // Networks field in Task (TaskSpec.Networks). + // This field (ServiceSpec.Networks) is kept for compatibility. + // In case TaskSpec.Networks does not exist, ServiceSpec.Networks + // is still honored if it exists. repeated NetworkAttachmentConfig networks = 7 [deprecated=true]; // Service endpoint specifies the user provided configuration diff --git a/vendor/github.com/docker/swarmkit/ca/auth.go b/vendor/github.com/docker/swarmkit/ca/auth.go index bc7c629a54..10cb2c76a1 100644 --- a/vendor/github.com/docker/swarmkit/ca/auth.go +++ b/vendor/github.com/docker/swarmkit/ca/auth.go @@ -19,7 +19,7 @@ import ( type localRequestKeyType struct{} // LocalRequestKey is a context key to mark a request that originating on the -// local node. The assocated value is a RemoteNodeInfo structure describing the +// local node. The associated value is a RemoteNodeInfo structure describing the // local node. var LocalRequestKey = localRequestKeyType{} diff --git a/vendor/github.com/docker/swarmkit/ca/external.go b/vendor/github.com/docker/swarmkit/ca/external.go index c2240268ce..a6aedbb4b9 100644 --- a/vendor/github.com/docker/swarmkit/ca/external.go +++ b/vendor/github.com/docker/swarmkit/ca/external.go @@ -103,26 +103,9 @@ func makeExternalSignRequest(ctx context.Context, client *http.Client, url strin if err != nil { return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")} } - - doneReading := make(chan struct{}) - bodyClosed := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): - case <-doneReading: - } - resp.Body.Close() - close(bodyClosed) - }() + defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) - close(doneReading) - <-bodyClosed - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } if err != nil { return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")} } diff --git a/vendor/github.com/docker/swarmkit/ca/server.go b/vendor/github.com/docker/swarmkit/ca/server.go index a7097f86a3..a6928949fc 100644 --- a/vendor/github.com/docker/swarmkit/ca/server.go +++ b/vendor/github.com/docker/swarmkit/ca/server.go @@ -353,7 +353,7 @@ func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr [ }, nil } -// GetRootCACertificate returns the certificate of the Root CA. It is used as a convinience for distributing +// GetRootCACertificate returns the certificate of the Root CA. It is used as a convenience for distributing // the root of trust for the swarm. Clients should be using the CA hash to verify if they weren't target to // a MiTM. If they fail to do so, node bootstrap works with TOFU semantics. func (s *Server) GetRootCACertificate(ctx context.Context, request *api.GetRootCACertificateRequest) (*api.GetRootCACertificateResponse, error) { diff --git a/vendor/github.com/docker/swarmkit/connectionbroker/broker.go b/vendor/github.com/docker/swarmkit/connectionbroker/broker.go index f22726f2b1..19e1c0a90c 100644 --- a/vendor/github.com/docker/swarmkit/connectionbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/connectionbroker/broker.go @@ -96,9 +96,9 @@ func (c *Conn) Close(success bool) error { } if success { - c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) - } else { c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight) + } else { + c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) } return c.ClientConn.Close() diff --git a/vendor/github.com/docker/swarmkit/log/context.go b/vendor/github.com/docker/swarmkit/log/context.go index 3da380f112..ce7da930ba 100644 --- a/vendor/github.com/docker/swarmkit/log/context.go +++ b/vendor/github.com/docker/swarmkit/log/context.go @@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context { return context.WithValue(ctx, loggerKey{}, logger) } +// WithFields returns a new context with added fields to logger. +func WithFields(ctx context.Context, fields logrus.Fields) context.Context { + logger := ctx.Value(loggerKey{}) + + if logger == nil { + logger = L + } + return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields)) +} + +// WithField is convenience wrapper around WithFields. +func WithField(ctx context.Context, key, value string) context.Context { + return WithFields(ctx, logrus.Fields{key: value}) +} + // GetLogger retrieves the current logger from the context. If no logger is // available, the default logger is returned. func GetLogger(ctx context.Context) *logrus.Entry { diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/allocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/allocator.go index 08a0ca71c9..fbb69b98d7 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/allocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/allocator.go @@ -3,6 +3,7 @@ package allocator import ( "sync" + "github.com/docker/docker/pkg/plugingetter" "github.com/docker/go-events" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" @@ -27,6 +28,9 @@ type Allocator struct { stopChan chan struct{} // doneChan is closed when the allocator is finished running. doneChan chan struct{} + + // pluginGetter provides access to docker's plugin inventory. + pluginGetter plugingetter.PluginGetter } // taskBallot controls how the voting for task allocation is @@ -67,14 +71,15 @@ type allocActor struct { // New returns a new instance of Allocator for use during allocation // stage of the manager. -func New(store *store.MemoryStore) (*Allocator, error) { +func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, error) { a := &Allocator{ store: store, taskBallot: &taskBallot{ votes: make(map[string][]string), }, - stopChan: make(chan struct{}), - doneChan: make(chan struct{}), + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + pluginGetter: pg, } return a, nil diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/network.go b/vendor/github.com/docker/swarmkit/manager/allocator/network.go index 0da215131b..6e12c5a0a4 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/network.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/network.go @@ -73,7 +73,7 @@ type networkContext struct { } func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { - na, err := networkallocator.New() + na, err := networkallocator.New(a.pluginGetter) if err != nil { return err } diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go index f16b0d4175..20d90a2b28 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go @@ -4,7 +4,7 @@ import ( "fmt" "net" - "github.com/docker/docker/pkg/plugins" + "github.com/docker/docker/pkg/plugingetter" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/drvregistry" @@ -49,7 +49,7 @@ type NetworkAllocator struct { nodes map[string]struct{} } -// Local in-memory state related to netwok that need to be tracked by NetworkAllocator +// Local in-memory state related to network that need to be tracked by NetworkAllocator type network struct { // A local cache of the store object. nw *api.Network @@ -69,7 +69,7 @@ type initializer struct { } // New returns a new NetworkAllocator handle -func New() (*NetworkAllocator, error) { +func New(pg plugingetter.PluginGetter) (*NetworkAllocator, error) { na := &NetworkAllocator{ networks: make(map[string]*network), services: make(map[string]struct{}), @@ -79,7 +79,7 @@ func New() (*NetworkAllocator, error) { // There are no driver configurations and notification // functions as of now. - reg, err := drvregistry.New(nil, nil, nil, nil, nil) + reg, err := drvregistry.New(nil, nil, nil, nil, pg) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (na *NetworkAllocator) getNetwork(id string) *network { } // Deallocate frees all the general and driver specific resources -// whichs were assigned to the passed network. +// which were assigned to the passed network. func (na *NetworkAllocator) Deallocate(n *api.Network) error { localNet := na.getNetwork(n.ID) if localNet == nil { @@ -657,7 +657,11 @@ func (na *NetworkAllocator) resolveDriver(n *api.Network) (driverapi.Driver, str } func (na *NetworkAllocator) loadDriver(name string) error { - _, err := plugins.Get(name, driverapi.NetworkPluginEndpointType) + pg := na.drvRegistry.GetPluginGetter() + if pg == nil { + return fmt.Errorf("plugin store is unintialized") + } + _, err := pg.Get(name, driverapi.NetworkPluginEndpointType, plugingetter.Lookup) return err } diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go index 33d4b04f5c..fc0fc79502 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go @@ -17,12 +17,12 @@ const ( dynamicPortEnd = 32767 // The start of master port range which will hold all the - // allocation state of ports allocated so far regerdless of + // allocation state of ports allocated so far regardless of // whether it was user defined or not. masterPortStart = 1 // The end of master port range which will hold all the - // allocation state of ports allocated so far regerdless of + // allocation state of ports allocated so far regardless of // whether it was user defined or not. masterPortEnd = 65535 ) @@ -65,7 +65,7 @@ func (ps allocatedPorts) addState(p *api.PortConfig) { // Note multiple dynamically allocated ports might exists. In this case, // we will remove only at a time so both allocated ports are tracked. // -// Note becasue of the potential co-existence of user-defined and dynamically +// Note because of the potential co-existence of user-defined and dynamically // allocated ports, delState has to be called for user-defined port first. // dynamically allocated ports should be removed later. func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig { @@ -277,7 +277,7 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool { } // If service has allocated endpoint while has no user-defined endpoint, - // we assume allocated endpoints are redudant, and they need deallocated. + // we assume allocated endpoints are redundant, and they need deallocated. // If service has no allocated endpoint while has user-defined endpoint, // we assume it is not allocated. if (s.Endpoint != nil && s.Spec.Endpoint == nil) || diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go index 194a9299a4..ae2b9b2f83 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go @@ -502,7 +502,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe } if !reflect.DeepEqual(requestSpecNetworks, specNetworks) { - return errNetworkUpdateNotSupported + return grpc.Errorf(codes.Unimplemented, errNetworkUpdateNotSupported.Error()) } // Check to see if all the secrets being added exist as objects @@ -516,11 +516,11 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe // with service mode change (comparing current config with previous config). // proper way to change service mode is to delete and re-add. if reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) { - return errModeChangeNotAllowed + return grpc.Errorf(codes.Unimplemented, errModeChangeNotAllowed.Error()) } if service.Spec.Annotations.Name != request.Spec.Annotations.Name { - return errRenameNotSupported + return grpc.Errorf(codes.Unimplemented, errRenameNotSupported.Error()) } service.Meta.Version = *request.ServiceVersion diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 3073a35fae..f20f238eb7 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -106,6 +106,7 @@ type nodeUpdate struct { // Dispatcher is responsible for dispatching tasks and tracking agent health. type Dispatcher struct { mu sync.Mutex + wg sync.WaitGroup nodes *nodeStore store *store.MemoryStore mgrQueue *watch.Queue @@ -216,6 +217,9 @@ func (d *Dispatcher) Run(ctx context.Context) error { defer cancel() d.ctx, d.cancel = context.WithCancel(ctx) + ctx = d.ctx + d.wg.Add(1) + defer d.wg.Done() d.mu.Unlock() publishManagers := func(peers []*api.Peer) { @@ -240,10 +244,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { case ev := <-peerWatcher: publishManagers(ev.([]*api.Peer)) case <-d.processUpdatesTrigger: - d.processUpdates() + d.processUpdates(ctx) batchTimer.Reset(maxBatchInterval) case <-batchTimer.C: - d.processUpdates() + d.processUpdates(ctx) batchTimer.Reset(maxBatchInterval) case v := <-configWatcher: cluster := v.(state.EventUpdateCluster) @@ -260,7 +264,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys d.mu.Unlock() d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys) - case <-d.ctx.Done(): + case <-ctx.Done(): return nil } } @@ -287,17 +291,20 @@ func (d *Dispatcher) Stop() error { d.mgrQueue.Close() d.keyMgrQueue.Close() + d.wg.Wait() + return nil } -func (d *Dispatcher) isRunningLocked() error { +func (d *Dispatcher) isRunningLocked() (context.Context, error) { d.mu.Lock() if !d.isRunning() { d.mu.Unlock() - return grpc.Errorf(codes.Aborted, "dispatcher is stopped") + return nil, grpc.Errorf(codes.Aborted, "dispatcher is stopped") } + ctx := d.ctx d.mu.Unlock() - return nil + return ctx, nil } func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { @@ -377,7 +384,7 @@ func (d *Dispatcher) isRunning() bool { // markNodeReady updates the description of a node, updates its address, and sets status to READY // this is used during registration when a new node description is provided // and during node updates when the node description changes -func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error { +func (d *Dispatcher) markNodeReady(ctx context.Context, nodeID string, description *api.NodeDescription, addr string) error { d.nodeUpdatesLock.Lock() d.nodeUpdates[nodeID] = nodeUpdate{ status: &api.NodeStatus{ @@ -396,8 +403,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti if numUpdates >= maxBatchItems { select { case d.processUpdatesTrigger <- struct{}{}: - case <-d.ctx.Done(): - return d.ctx.Err() + case <-ctx.Done(): + return ctx.Err() } } @@ -405,8 +412,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti // Wait until the node update batch happens before unblocking register. d.processUpdatesLock.Lock() select { - case <-d.ctx.Done(): - return d.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } d.processUpdatesCond.Wait() @@ -431,7 +438,8 @@ func nodeIPFromContext(ctx context.Context) (string, error) { // register is used for registration of node with particular dispatcher. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { // prevent register until we're ready to accept it - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return "", err } @@ -453,7 +461,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a log.G(ctx).Debugf(err.Error()) } - if err := d.markNodeReady(nodeID, description, addr); err != nil { + if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil { return "", err } @@ -496,7 +504,8 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat } log := log.G(ctx).WithFields(fields) - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return nil, err } @@ -542,13 +551,13 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat if numUpdates >= maxBatchItems { select { case d.processUpdatesTrigger <- struct{}{}: - case <-d.ctx.Done(): + case <-dctx.Done(): } } return nil, nil } -func (d *Dispatcher) processUpdates() { +func (d *Dispatcher) processUpdates(ctx context.Context) { var ( taskUpdates map[string]*api.TaskStatus nodeUpdates map[string]nodeUpdate @@ -571,7 +580,7 @@ func (d *Dispatcher) processUpdates() { return } - log := log.G(d.ctx).WithFields(logrus.Fields{ + log := log.G(ctx).WithFields(logrus.Fields{ "method": "(*Dispatcher).processUpdates", }) @@ -661,7 +670,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe } nodeID := nodeInfo.NodeID - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return err } @@ -763,8 +773,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe break batchingLoop case <-stream.Context().Done(): return stream.Context().Err() - case <-d.ctx.Done(): - return d.ctx.Err() + case <-dctx.Done(): + return dctx.Err() } } @@ -783,7 +793,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche } nodeID := nodeInfo.NodeID - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return err } @@ -1075,8 +1086,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche break batchingLoop case <-stream.Context().Done(): return stream.Context().Err() - case <-d.ctx.Done(): - return d.ctx.Err() + case <-dctx.Done(): + return dctx.Err() } } @@ -1197,16 +1208,14 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error { // markNodeNotReady sets the node state to some state other than READY func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error { - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return err } // Node is down. Add it to down nodes so that we can keep // track of tasks assigned to the node. - var ( - node *api.Node - err error - ) + var node *api.Node d.store.View(func(readTx store.ReadTx) { node = store.GetNode(readTx, id) if node == nil { @@ -1219,7 +1228,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes expireFunc := func() { if err := d.moveTasksToOrphaned(id); err != nil { - log.G(context.TODO()).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) + log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) } d.downNodes.Delete(id) @@ -1243,7 +1252,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes if numUpdates >= maxBatchItems { select { case d.processUpdatesTrigger <- struct{}{}: - case <-d.ctx.Done(): + case <-dctx.Done(): } } @@ -1291,7 +1300,8 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } nodeID := nodeInfo.NodeID - if err := d.isRunningLocked(); err != nil { + dctx, err := d.isRunningLocked() + if err != nil { return err } @@ -1310,7 +1320,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio log.G(ctx).Debugf(err.Error()) } // update the node description - if err := d.markNodeReady(nodeID, r.Description, addr); err != nil { + if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil { return err } } @@ -1401,7 +1411,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio return stream.Context().Err() case <-node.Disconnect: disconnect = true - case <-d.ctx.Done(): + case <-dctx.Done(): disconnect = true case ev := <-keyMgrUpdates: netKeys = ev.([]*api.EncryptionKey) diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index b1c65aa14b..f36d8ce554 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -14,6 +14,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/plugingetter" "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" @@ -105,6 +106,9 @@ type Config struct { // Availability allows a user to control the current scheduling status of a node Availability api.NodeSpec_Availability + + // PluginGetter provides access to docker's plugin inventory. + PluginGetter plugingetter.PluginGetter } // Manager is the cluster manager for Swarm. @@ -478,7 +482,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) { // starting up. <-m.started - // the mutex stops us from trying to stop while we're alrady stopping, or + // the mutex stops us from trying to stop while we're already stopping, or // from returning before we've finished stopping. m.mu.Lock() defer m.mu.Unlock() @@ -833,7 +837,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { // shutdown underlying manager processes when leadership is // lost. - m.allocator, err = allocator.New(s) + m.allocator, err = allocator.New(s, m.config.PluginGetter) if err != nil { log.G(ctx).WithError(err).Error("failed to create allocator") // TODO(stevvooe): It doesn't seem correct here to fail diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go index e16ef525a6..f655290231 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go @@ -406,7 +406,11 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update } if delayStartCh != nil { - <-delayStartCh + select { + case <-delayStartCh: + case <-u.stopChan: + return nil + } } // Wait for the new task to come up. @@ -456,7 +460,11 @@ func (u *Updater) useExistingTask(ctx context.Context, slot orchestrator.Slot, e } if delayStartCh != nil { - <-delayStartCh + select { + case <-delayStartCh: + case <-u.stopChan: + return nil + } } } diff --git a/vendor/github.com/docker/swarmkit/manager/scheduler/nodeinfo.go b/vendor/github.com/docker/swarmkit/manager/scheduler/nodeinfo.go index 89936a09ad..a3013b284f 100644 --- a/vendor/github.com/docker/swarmkit/manager/scheduler/nodeinfo.go +++ b/vendor/github.com/docker/swarmkit/manager/scheduler/nodeinfo.go @@ -39,7 +39,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api return nodeInfo } -// addTask removes a task from nodeInfo if it's tracked there, and returns true +// removeTask removes a task from nodeInfo if it's tracked there, and returns true // if nodeInfo was modified. func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool { oldTask, ok := nodeInfo.Tasks[t.ID] diff --git a/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go b/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go index 6959f3a728..67f4d4a7b1 100644 --- a/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go +++ b/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go @@ -532,7 +532,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] } nodes := s.nodeSet.findBestNodes(len(taskGroup), s.pipeline.Process, nodeLess) - if len(nodes) == 0 { + nodeCount := len(nodes) + if nodeCount == 0 { s.noSuitableNode(ctx, taskGroup, schedulingDecisions) return } @@ -540,7 +541,7 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] failedConstraints := make(map[int]bool) // key is index in nodes slice nodeIter := 0 for taskID, t := range taskGroup { - n := &nodes[nodeIter%len(nodes)] + n := &nodes[nodeIter%nodeCount] log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", n.ID) newT := *t @@ -555,16 +556,16 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] nodeInfo, err := s.nodeSet.nodeInfo(n.ID) if err == nil && nodeInfo.addTask(&newT) { s.nodeSet.updateNode(nodeInfo) - nodes[nodeIter%len(nodes)] = nodeInfo + nodes[nodeIter%nodeCount] = nodeInfo } schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT} delete(taskGroup, taskID) - if nodeIter+1 < len(nodes) { + if nodeIter+1 < nodeCount { // First pass fills the nodes until they have the same // number of tasks from this service. - nextNode := nodes[(nodeIter+1)%len(nodes)] + nextNode := nodes[(nodeIter+1)%nodeCount] if nodeLess(&nextNode, &nodeInfo) { nodeIter++ } @@ -575,10 +576,10 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] } origNodeIter := nodeIter - for failedConstraints[nodeIter%len(nodes)] || !s.pipeline.Process(&nodes[nodeIter%len(nodes)]) { - failedConstraints[nodeIter%len(nodes)] = true + for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) { + failedConstraints[nodeIter%nodeCount] = true nodeIter++ - if nodeIter-origNodeIter == len(nodes) { + if nodeIter-origNodeIter == nodeCount { // None of the nodes meet the constraints anymore. s.noSuitableNode(ctx, taskGroup, schedulingDecisions) return diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go b/vendor/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go index 84c9514066..0bf69da151 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go @@ -2,16 +2,12 @@ package membership import ( "errors" - "fmt" "sync" - "google.golang.org/grpc" - "github.com/coreos/etcd/raft/raftpb" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/watch" "github.com/gogo/protobuf/proto" - "golang.org/x/net/context" ) var ( @@ -25,26 +21,19 @@ var ( ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode") // ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change") + // ErrMemberRemoved is thrown when a node was removed from the cluster + ErrMemberRemoved = errors.New("raft: member was removed from the cluster") ) -// deferredConn used to store removed members connection for some time. -// We need this in case if removed node is redirector or endpoint of ControlAPI call. -type deferredConn struct { - tick int - conn *grpc.ClientConn -} - // Cluster represents a set of active // raft Members type Cluster struct { - mu sync.RWMutex - members map[uint64]*Member - deferedConns map[*deferredConn]struct{} + mu sync.RWMutex + members map[uint64]*Member // removed contains the list of removed Members, // those ids cannot be reused - removed map[uint64]bool - heartbeatTicks int + removed map[uint64]bool PeersBroadcast *watch.Queue } @@ -52,74 +41,19 @@ type Cluster struct { // Member represents a raft Cluster Member type Member struct { *api.RaftMember - - Conn *grpc.ClientConn - tick int - active bool - lastSeenHost string -} - -// HealthCheck sends a health check RPC to the member and returns the response. -func (member *Member) HealthCheck(ctx context.Context) error { - healthClient := api.NewHealthClient(member.Conn) - resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"}) - if err != nil { - return err - } - if resp.Status != api.HealthCheckResponse_SERVING { - return fmt.Errorf("health check returned status %s", resp.Status.String()) - } - return nil } // NewCluster creates a new Cluster neighbors list for a raft Member. -// Member marked as inactive if there was no call ReportActive for heartbeatInterval. -func NewCluster(heartbeatTicks int) *Cluster { +func NewCluster() *Cluster { // TODO(abronan): generate Cluster ID for federation return &Cluster{ members: make(map[uint64]*Member), removed: make(map[uint64]bool), - deferedConns: make(map[*deferredConn]struct{}), - heartbeatTicks: heartbeatTicks, PeersBroadcast: watch.NewQueue(), } } -func (c *Cluster) handleInactive() { - for _, m := range c.members { - if !m.active { - continue - } - m.tick++ - if m.tick > c.heartbeatTicks { - m.active = false - if m.Conn != nil { - m.Conn.Close() - } - } - } -} - -func (c *Cluster) handleDeferredConns() { - for dc := range c.deferedConns { - dc.tick++ - if dc.tick > c.heartbeatTicks { - dc.conn.Close() - delete(c.deferedConns, dc) - } - } -} - -// Tick increases ticks for all members. After heartbeatTicks node marked as -// inactive. -func (c *Cluster) Tick() { - c.mu.Lock() - defer c.mu.Unlock() - c.handleInactive() - c.handleDeferredConns() -} - // Members returns the list of raft Members in the Cluster. func (c *Cluster) Members() map[uint64]*Member { members := make(map[uint64]*Member) @@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error { if c.removed[member.RaftID] { return ErrIDRemoved } - member.active = true - member.tick = 0 c.members[member.RaftID] = member @@ -187,6 +119,33 @@ func (c *Cluster) RemoveMember(id uint64) error { return c.clearMember(id) } +// UpdateMember updates member address. +func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.removed[id] { + return ErrIDRemoved + } + + oldMember, ok := c.members[id] + if !ok { + return ErrIDNotFound + } + + if oldMember.NodeID != m.NodeID { + // Should never happen; this is a sanity check + return errors.New("node ID mismatch match on node update") + } + + if oldMember.Addr == m.Addr { + // nothing to do + return nil + } + oldMember.RaftMember = m + return nil +} + // ClearMember removes a node from the Cluster Memberlist, but does NOT add it // to the removed list. func (c *Cluster) ClearMember(id uint64) error { @@ -197,45 +156,10 @@ func (c *Cluster) ClearMember(id uint64) error { } func (c *Cluster) clearMember(id uint64) error { - m, ok := c.members[id] - if ok { - if m.Conn != nil { - // defer connection close to after heartbeatTicks - dConn := &deferredConn{conn: m.Conn} - c.deferedConns[dConn] = struct{}{} - } + if _, ok := c.members[id]; ok { delete(c.members, id) + c.broadcastUpdate() } - c.broadcastUpdate() - return nil -} - -// ReplaceMemberConnection replaces the member's GRPC connection. -func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error { - c.mu.Lock() - defer c.mu.Unlock() - - oldMember, ok := c.members[id] - if !ok { - return ErrIDNotFound - } - - if !force && oldConn.Conn != oldMember.Conn { - // The connection was already replaced. Don't do it again. - newConn.Conn.Close() - return nil - } - - if oldMember.Conn != nil { - oldMember.Conn.Close() - } - - newMember := *oldMember - newMember.RaftMember = oldMember.RaftMember.Copy() - newMember.RaftMember.Addr = newAddr - newMember.Conn = newConn.Conn - c.members[id] = &newMember - return nil } @@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool { // Clear resets the list of active Members and removed Members. func (c *Cluster) Clear() { c.mu.Lock() - for _, member := range c.members { - if member.Conn != nil { - member.Conn.Close() - } - } - - for dc := range c.deferedConns { - dc.conn.Close() - } c.members = make(map[uint64]*Member) c.removed = make(map[uint64]bool) - c.deferedConns = make(map[*deferredConn]struct{}) c.mu.Unlock() } -// ReportActive reports that member is active (called ProcessRaftMessage), -func (c *Cluster) ReportActive(id uint64, sourceHost string) { - c.mu.Lock() - defer c.mu.Unlock() - m, ok := c.members[id] - if !ok { - return - } - m.tick = 0 - m.active = true - if sourceHost != "" { - m.lastSeenHost = sourceHost - } -} - -// Active returns true if node is active. -func (c *Cluster) Active(id uint64) bool { - c.mu.RLock() - defer c.mu.RUnlock() - m, ok := c.members[id] - if !ok { - return false - } - return m.active -} - -// LastSeenHost returns the last observed source address that the specified -// member connected from. -func (c *Cluster) LastSeenHost(id uint64) string { - c.mu.RLock() - defer c.mu.RUnlock() - m, ok := c.members[id] - if ok { - return m.lastSeenHost - } - return "" -} - // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { @@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } return nil } - -// CanRemoveMember checks if removing a Member would not result in a loss -// of quorum, this check is needed before submitting a configuration change -// that might block or harm the Cluster on Member recovery -func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool { - members := c.Members() - nreachable := 0 // reachable managers after removal - - for _, m := range members { - if m.RaftID == id { - continue - } - - // Local node from where the remove is issued - if m.RaftID == from { - nreachable++ - continue - } - - if c.Active(m.RaftID) { - nreachable++ - } - } - - nquorum := (len(members)-1)/2 + 1 - if nreachable < nquorum { - return false - } - - return true -} diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index a19f07b6e0..6d9cdc2dad 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -27,6 +27,7 @@ import ( "github.com/docker/swarmkit/manager/raftselector" "github.com/docker/swarmkit/manager/state/raft/membership" "github.com/docker/swarmkit/manager/state/raft/storage" + "github.com/docker/swarmkit/manager/state/raft/transport" "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/watch" "github.com/gogo/protobuf/proto" @@ -51,8 +52,6 @@ var ( ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent") // ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum") - // ErrMemberRemoved is thrown when a node was removed from the cluster - ErrMemberRemoved = errors.New("raft: member was removed from the cluster") // ErrNoClusterLeader is thrown when the cluster has no elected leader ErrNoClusterLeader = errors.New("raft: no elected cluster leader") // ErrMemberUnknown is sent in response to a message from an @@ -88,8 +87,9 @@ type EncryptionKeyRotator interface { // Node represents the Raft Node useful // configuration. type Node struct { - raftNode raft.Node - cluster *membership.Cluster + raftNode raft.Node + cluster *membership.Cluster + transport *transport.Transport raftStore *raft.MemoryStorage memoryStore *store.MemoryStore @@ -100,6 +100,7 @@ type Node struct { campaignWhenAble bool signalledLeadership uint32 isMember uint32 + bootstrapMembers []*api.RaftMember // waitProp waits for all the proposals to be terminated before // shutting down the node. @@ -113,9 +114,11 @@ type Node struct { ticker clock.Ticker doneCh chan struct{} // RemovedFromRaft notifies about node deletion from raft cluster - RemovedFromRaft chan struct{} - removeRaftFunc func() - cancelFunc func() + RemovedFromRaft chan struct{} + cancelFunc func() + // removeRaftCh notifies about node deletion from raft cluster + removeRaftCh chan struct{} + removeRaftOnce sync.Once leadershipBroadcast *watch.Queue // used to coordinate shutdown @@ -131,7 +134,6 @@ type Node struct { // to stop. stopped chan struct{} - lastSendToMember map[uint64]chan struct{} raftLogger *storage.EncryptedRaftLogger keyRotator EncryptionKeyRotator rotationQueued bool @@ -189,7 +191,7 @@ func NewNode(opts NodeOptions) *Node { raftStore := raft.NewMemoryStorage() n := &Node{ - cluster: membership.NewCluster(2 * cfg.ElectionTick), + cluster: membership.NewCluster(), raftStore: raftStore, opts: opts, Config: &raft.Config{ @@ -204,7 +206,6 @@ func NewNode(opts NodeOptions) *Node { RemovedFromRaft: make(chan struct{}), stopped: make(chan struct{}), leadershipBroadcast: watch.NewQueue(), - lastSendToMember: make(map[uint64]chan struct{}), keyRotator: opts.KeyRotator, } n.memoryStore = store.NewMemoryStore(n) @@ -218,16 +219,6 @@ func NewNode(opts NodeOptions) *Node { n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now()) n.wait = newWait() - n.removeRaftFunc = func(n *Node) func() { - var removeRaftOnce sync.Once - return func() { - removeRaftOnce.Do(func() { - atomic.StoreUint32(&n.isMember, 0) - close(n.RemovedFromRaft) - }) - } - }(n) - n.cancelFunc = func(n *Node) func() { var cancelOnce sync.Once return func() { @@ -240,6 +231,34 @@ func NewNode(opts NodeOptions) *Node { return n } +// IsIDRemoved reports if member with id was removed from cluster. +// Part of transport.Raft interface. +func (n *Node) IsIDRemoved(id uint64) bool { + return n.cluster.IsIDRemoved(id) +} + +// NodeRemoved signals that node was removed from cluster and should stop. +// Part of transport.Raft interface. +func (n *Node) NodeRemoved() { + n.removeRaftOnce.Do(func() { + atomic.StoreUint32(&n.isMember, 0) + close(n.RemovedFromRaft) + }) +} + +// ReportSnapshot reports snapshot status to underlying raft node. +// Part of transport.Raft interface. +func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) { + n.raftNode.ReportSnapshot(id, status) +} + +// ReportUnreachable reports to underlying raft node that member with id is +// unreachable. +// Part of transport.Raft interface. +func (n *Node) ReportUnreachable(id uint64) { + n.raftNode.ReportUnreachable(id) +} + // WithContext returns context which is cancelled when parent context cancelled // or node is stopped. func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) { @@ -255,13 +274,29 @@ func (n *Node) WithContext(ctx context.Context) (context.Context, context.Cancel return ctx, cancel } +func (n *Node) initTransport() { + transportConfig := &transport.Config{ + HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval, + SendTimeout: n.opts.SendTimeout, + Credentials: n.opts.TLSCredentials, + Raft: n, + } + n.transport = transport.New(transportConfig) +} + // JoinAndStart joins and starts the raft server func (n *Node) JoinAndStart(ctx context.Context) (err error) { ctx, cancel := n.WithContext(ctx) defer func() { cancel() if err != nil { + n.stopMu.Lock() + // to shutdown transport + close(n.stopped) + n.stopMu.Unlock() n.done() + } else { + atomic.StoreUint32(&n.isMember, 1) } }() @@ -281,58 +316,59 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { n.snapshotMeta = snapshot.Metadata n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error - if loadAndStartErr == storage.ErrNoWAL { + // restore from snapshot + if loadAndStartErr == nil { if n.opts.JoinAddr != "" { - c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second) - if err != nil { - return err - } - client := api.NewRaftMembershipClient(c.Conn) - defer func() { - _ = c.Conn.Close() - }() - - joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second) - defer joinCancel() - resp, err := client.Join(joinCtx, &api.JoinRequest{ - Addr: n.opts.Addr, - }) - if err != nil { - return err - } - - n.Config.ID = resp.RaftID - - if _, err := n.newRaftLogs(n.opts.ID); err != nil { - return err - } - - n.raftNode = raft.StartNode(n.Config, []raft.Peer{}) - - if err := n.registerNodes(resp.Members); err != nil { - n.raftLogger.Close(ctx) - return err - } - } else { - // First member in the cluster, self-assign ID - n.Config.ID = uint64(rand.Int63()) + 1 - peer, err := n.newRaftLogs(n.opts.ID) - if err != nil { - return err - } - n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer}) - n.campaignWhenAble = true + log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists") } - atomic.StoreUint32(&n.isMember, 1) + n.campaignWhenAble = true + n.initTransport() + n.raftNode = raft.RestartNode(n.Config) return nil } - if n.opts.JoinAddr != "" { - log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists") + // first member of cluster + if n.opts.JoinAddr == "" { + // First member in the cluster, self-assign ID + n.Config.ID = uint64(rand.Int63()) + 1 + peer, err := n.newRaftLogs(n.opts.ID) + if err != nil { + return err + } + n.campaignWhenAble = true + n.initTransport() + n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer}) + return nil } - n.campaignWhenAble = true - n.raftNode = raft.RestartNode(n.Config) - atomic.StoreUint32(&n.isMember, 1) + + // join to existing cluster + + conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second) + if err != nil { + return err + } + defer conn.Close() + client := api.NewRaftMembershipClient(conn) + + joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second) + defer joinCancel() + resp, err := client.Join(joinCtx, &api.JoinRequest{ + Addr: n.opts.Addr, + }) + if err != nil { + return err + } + + n.Config.ID = resp.RaftID + + if _, err := n.newRaftLogs(n.opts.ID); err != nil { + return err + } + n.bootstrapMembers = resp.Members + + n.initTransport() + n.raftNode = raft.StartNode(n.Config, nil) + return nil } @@ -372,6 +408,9 @@ func (n *Node) done() { n.leadershipBroadcast.Close() n.cluster.PeersBroadcast.Close() n.memoryStore.Close() + if n.transport != nil { + n.transport.Stop() + } close(n.doneCh) } @@ -391,6 +430,12 @@ func (n *Node) Run(ctx context.Context) error { ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID))) ctx, cancel := context.WithCancel(ctx) + for _, node := range n.bootstrapMembers { + if err := n.registerNode(node); err != nil { + log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID) + } + } + defer func() { cancel() n.stop(ctx) @@ -414,7 +459,6 @@ func (n *Node) Run(ctx context.Context) error { select { case <-n.ticker.C(): n.raftNode.Tick() - n.cluster.Tick() case rd := <-n.raftNode.Ready(): raftConfig := n.getCurrentRaftConfig() @@ -423,10 +467,10 @@ func (n *Node) Run(ctx context.Context) error { return errors.Wrap(err, "failed to save entries to storage") } - if len(rd.Messages) != 0 { + for _, msg := range rd.Messages { // Send raft messages to peers - if err := n.send(ctx, rd.Messages); err != nil { - log.G(ctx).WithError(err).Error("failed to send message to members") + if err := n.transport.Send(msg); err != nil { + log.G(ctx).WithError(err).Error("failed to send message to member") } } @@ -435,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error { // saveToStorage. if !raft.IsEmptySnap(rd.Snapshot) { // Load the snapshot data into the store - if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil { - log.G(ctx).WithError(err).Error("failed to restore from snapshot") + if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil { + log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot") } n.appliedIndex = rd.Snapshot.Metadata.Index n.snapshotMeta = rd.Snapshot.Metadata @@ -555,6 +599,40 @@ func (n *Node) Run(ctx context.Context) error { } } +func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error { + snapCluster, err := n.clusterSnapshot(data) + if err != nil { + return err + } + + oldMembers := n.cluster.Members() + + for _, member := range snapCluster.Members { + delete(oldMembers, member.RaftID) + } + + for _, removedMember := range snapCluster.Removed { + n.cluster.RemoveMember(removedMember) + if err := n.transport.RemovePeer(removedMember); err != nil { + log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", removedMember) + } + delete(oldMembers, removedMember) + } + + for id, member := range oldMembers { + n.cluster.ClearMember(id) + if err := n.transport.RemovePeer(member.RaftID); err != nil { + log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID) + } + } + for _, node := range snapCluster.Members { + if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil { + log.G(ctx).WithError(err).Error("failed to register node from snapshot") + } + } + return nil +} + func (n *Node) needsSnapshot(ctx context.Context) bool { if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() { keys := n.keyRotator.GetKeys() @@ -798,22 +876,27 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons // checkHealth tries to contact an aspiring member through its advertised address // and checks if its raft server is running. func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error { - conn, err := n.ConnectToMember(addr, timeout) + conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout) if err != nil { return err } + defer conn.Close() + if timeout != 0 { tctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ctx = tctx } - defer conn.Conn.Close() - - if err := conn.HealthCheck(ctx); err != nil { + healthClient := api.NewHealthClient(conn) + resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"}) + if err != nil { return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address") } + if resp.Status != api.HealthCheckResponse_SERVING { + return fmt.Errorf("health check returned status %s", resp.Status.String()) + } return nil } @@ -841,11 +924,15 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID return n.configure(ctx, cc) } -// updateMember submits a configuration change to change a member's address. -func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error { +// updateNodeBlocking runs synchronous job to update node address in whole cluster. +func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error { + m := n.cluster.GetMember(id) + if m == nil { + return errors.Errorf("member %x is not found for update", id) + } node := api.RaftMember{ - RaftID: raftID, - NodeID: nodeID, + RaftID: m.RaftID, + NodeID: m.NodeID, Addr: addr, } @@ -856,7 +943,7 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod cc := raftpb.ConfChange{ Type: raftpb.ConfChangeUpdateNode, - NodeID: raftID, + NodeID: id, Context: meta, } @@ -864,6 +951,18 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod return n.configure(ctx, cc) } +// UpdateNode submits a configuration change to change a member's address. +func (n *Node) UpdateNode(id uint64, addr string) { + ctx, cancel := n.WithContext(context.Background()) + defer cancel() + // spawn updating info in raft in background to unblock transport + go func() { + if err := n.updateNodeBlocking(ctx, id, addr); err != nil { + log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster") + } + }() +} + // Leave asks to a member of the raft to remove // us from the raft cluster. This method is called // from a member who is willing to leave its raft @@ -897,7 +996,31 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp // CanRemoveMember checks if a member can be removed from // the context of the current node. func (n *Node) CanRemoveMember(id uint64) bool { - return n.cluster.CanRemoveMember(n.Config.ID, id) + members := n.cluster.Members() + nreachable := 0 // reachable managers after removal + + for _, m := range members { + if m.RaftID == id { + continue + } + + // Local node from where the remove is issued + if m.RaftID == n.Config.ID { + nreachable++ + continue + } + + if n.transport.Active(m.RaftID) { + nreachable++ + } + } + + nquorum := (len(members)-1)/2 + 1 + if nreachable < nquorum { + return false + } + + return true } func (n *Node) removeMember(ctx context.Context, id uint64) error { @@ -915,7 +1038,7 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error { n.membershipLock.Lock() defer n.membershipLock.Unlock() - if n.cluster.CanRemoveMember(n.Config.ID, id) { + if n.CanRemoveMember(id) { cc := raftpb.ConfChange{ ID: id, Type: raftpb.ConfChangeRemoveNode, @@ -956,6 +1079,34 @@ func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaf return log.G(ctx).WithFields(fields) } +func (n *Node) reportNewAddress(ctx context.Context, id uint64) error { + // too early + if !n.IsMember() { + return nil + } + p, ok := peer.FromContext(ctx) + if !ok { + return nil + } + oldAddr, err := n.transport.PeerAddr(id) + if err != nil { + return err + } + newHost, _, err := net.SplitHostPort(p.Addr.String()) + if err != nil { + return err + } + _, officialPort, err := net.SplitHostPort(oldAddr) + if err != nil { + return err + } + newAddr := net.JoinHostPort(newHost, officialPort) + if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil { + return err + } + return nil +} + // ProcessRaftMessage calls 'Step' which advances the // raft state machine with the provided message on the // receiving node @@ -969,32 +1120,25 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa // a node in the remove set if n.cluster.IsIDRemoved(msg.Message.From) { n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member") - return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error()) + return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error()) } - var sourceHost string - peer, ok := peer.FromContext(ctx) - if ok { - sourceHost, _, _ = net.SplitHostPort(peer.Addr.String()) - } - - n.cluster.ReportActive(msg.Message.From, sourceHost) - ctx, cancel := n.WithContext(ctx) defer cancel() + if err := n.reportNewAddress(ctx, msg.Message.From); err != nil { + log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From) + } + // Reject vote requests from unreachable peers if msg.Message.Type == raftpb.MsgVote { member := n.cluster.GetMember(msg.Message.From) - if member == nil || member.Conn == nil { + if member == nil { n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member") return &api.ProcessRaftMessageResponse{}, nil } - healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) - defer cancel() - - if err := member.HealthCheck(healthCtx); err != nil { + if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil { n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check") return &api.ProcessRaftMessageResponse{}, nil } @@ -1064,17 +1208,11 @@ func (n *Node) getLeaderConn() (*grpc.ClientConn, error) { if leader == n.Config.ID { return nil, raftselector.ErrIsLeader } - l := n.cluster.GetMember(leader) - if l == nil { - return nil, errors.New("no leader found") + conn, err := n.transport.PeerConn(leader) + if err != nil { + return nil, errors.Wrap(err, "failed to get connection to leader") } - if !n.cluster.Active(leader) { - return nil, errors.New("leader marked as inactive") - } - if l.Conn == nil { - return nil, errors.New("no connection to leader in member list") - } - return l.Conn, nil + return conn, nil } // LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader @@ -1122,8 +1260,12 @@ func (n *Node) registerNode(node *api.RaftMember) error { // and are adding ourself now with the remotely-reachable // address. if existingMember.Addr != node.Addr { + if node.RaftID != n.Config.ID { + if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil { + return err + } + } member.RaftMember = node - member.Conn = existingMember.Conn n.cluster.AddMember(member) } @@ -1132,11 +1274,7 @@ func (n *Node) registerNode(node *api.RaftMember) error { // Avoid opening a connection to the local node if node.RaftID != n.Config.ID { - // We don't want to impose a timeout on the grpc connection. It - // should keep retrying as long as necessary, in case the peer - // is temporarily unavailable. - var err error - if member, err = n.ConnectToMember(node.Addr, 0); err != nil { + if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil { return err } } @@ -1144,8 +1282,8 @@ func (n *Node) registerNode(node *api.RaftMember) error { member.RaftMember = node err := n.cluster.AddMember(member) if err != nil { - if member.Conn != nil { - _ = member.Conn.Close() + if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil { + return errors.Wrapf(rerr, "failed to remove peer after error %v", err) } return err } @@ -1153,17 +1291,6 @@ func (n *Node) registerNode(node *api.RaftMember) error { return nil } -// registerNodes registers a set of nodes in the cluster -func (n *Node) registerNodes(nodes []*api.RaftMember) error { - for _, node := range nodes { - if err := n.registerNode(node); err != nil { - return err - } - } - - return nil -} - // ProposeValue calls Propose on the raft and waits // on the commit log action before returning a result func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error { @@ -1209,7 +1336,7 @@ func (n *Node) GetMemberlist() map[uint64]*api.RaftMember { leader := false if member.RaftID != n.Config.ID { - if !n.cluster.Active(member.RaftID) { + if !n.transport.Active(member.RaftID) { reachability = api.RaftMemberStatus_UNREACHABLE } } @@ -1294,183 +1421,6 @@ func (n *Node) saveToStorage( return nil } -// Sends a series of messages to members in the raft -func (n *Node) send(ctx context.Context, messages []raftpb.Message) error { - members := n.cluster.Members() - - n.stopMu.RLock() - defer n.stopMu.RUnlock() - - for _, m := range messages { - // Process locally - if m.To == n.Config.ID { - if err := n.raftNode.Step(ctx, m); err != nil { - return err - } - continue - } - - if m.Type == raftpb.MsgProp { - // We don't forward proposals to the leader. Our - // current architecture depends on only the leader - // making proposals, so in-flight proposals can be - // guaranteed not to conflict. - continue - } - - ch := make(chan struct{}) - - n.asyncTasks.Add(1) - go n.sendToMember(ctx, members, m, n.lastSendToMember[m.To], ch) - - n.lastSendToMember[m.To] = ch - } - - return nil -} - -func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.Member, m raftpb.Message, lastSend <-chan struct{}, thisSend chan<- struct{}) { - defer n.asyncTasks.Done() - defer close(thisSend) - - if lastSend != nil { - waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout) - defer waitCancel() - - select { - case <-lastSend: - case <-waitCtx.Done(): - return - } - - select { - case <-waitCtx.Done(): - return - default: - } - } - - ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout) - defer cancel() - - if n.cluster.IsIDRemoved(m.To) { - // Should not send to removed members - return - } - - var conn *membership.Member - if toMember, ok := members[m.To]; ok { - conn = toMember - } else { - // If we are being asked to send to a member that's not in - // our member list, that could indicate that the current leader - // was added while we were offline. Try to resolve its address. - log.G(ctx).Warningf("sending message to an unrecognized member ID %x", m.To) - - // Choose a random member - var ( - queryMember *membership.Member - id uint64 - ) - for id, queryMember = range members { - if id != n.Config.ID { - break - } - } - - if queryMember == nil || queryMember.RaftID == n.Config.ID { - log.G(ctx).Error("could not find cluster member to query for leader address") - return - } - - resp, err := api.NewRaftClient(queryMember.Conn).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: m.To}) - if err != nil { - log.G(ctx).WithError(err).Errorf("could not resolve address of member ID %x", m.To) - return - } - conn, err = n.ConnectToMember(resp.Addr, n.opts.SendTimeout) - if err != nil { - log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, resp.Addr) - return - } - // The temporary connection is only used for this message. - // Eventually, we should catch up and add a long-lived - // connection to the member list. - defer conn.Conn.Close() - } - - _, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) - if err != nil { - if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() { - n.removeRaftFunc() - } - if m.Type == raftpb.MsgSnap { - n.raftNode.ReportSnapshot(m.To, raft.SnapshotFailure) - } - if !n.IsMember() { - // node is removed from cluster or stopped - return - } - n.raftNode.ReportUnreachable(m.To) - - lastSeenHost := n.cluster.LastSeenHost(m.To) - if lastSeenHost != "" { - // Check if address has changed - officialHost, officialPort, _ := net.SplitHostPort(conn.Addr) - if officialHost != lastSeenHost { - reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort) - log.G(ctx).Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr) - if err := n.handleAddressChange(ctx, conn, reconnectAddr); err != nil { - log.G(ctx).WithError(err).Error("failed to hande address change") - } - return - } - } - - // Bounce the connection - newConn, err := n.ConnectToMember(conn.Addr, 0) - if err != nil { - log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, conn.Addr) - return - } - err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn, conn.Addr, false) - if err != nil { - log.G(ctx).WithError(err).Error("failed to replace connection to raft member") - newConn.Conn.Close() - } - } else if m.Type == raftpb.MsgSnap { - n.raftNode.ReportSnapshot(m.To, raft.SnapshotFinish) - } -} - -func (n *Node) handleAddressChange(ctx context.Context, member *membership.Member, reconnectAddr string) error { - newConn, err := n.ConnectToMember(reconnectAddr, 0) - if err != nil { - return errors.Wrapf(err, "could connect to member ID %x at observed address %s", member.RaftID, reconnectAddr) - } - - healthCtx, cancelHealth := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) - defer cancelHealth() - - if err := newConn.HealthCheck(healthCtx); err != nil { - return errors.Wrapf(err, "%x failed health check at observed address %s", member.RaftID, reconnectAddr) - } - - if err := n.cluster.ReplaceMemberConnection(member.RaftID, member, newConn, reconnectAddr, false); err != nil { - newConn.Conn.Close() - return errors.Wrap(err, "failed to replace connection to raft member") - } - - // If we're the leader, write the address change to raft - updateCtx, cancelUpdate := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) - defer cancelUpdate() - if err := n.updateMember(updateCtx, reconnectAddr, member.RaftID, member.NodeID); err != nil { - return errors.Wrap(err, "failed to update member address in raft") - } - - return nil -} - // processInternalRaftRequest sends a message to nodes participating // in the raft to apply a log entry and then waits for it to be applied // on the server. It will block until the update is performed, there is @@ -1681,32 +1631,13 @@ func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error return err } - oldMember := n.cluster.GetMember(newMember.RaftID) - - if oldMember == nil { - return ErrMemberUnknown - } - if oldMember.NodeID != newMember.NodeID { - // Should never happen; this is a sanity check - log.G(ctx).Errorf("node ID mismatch on node update (old: %x, new: %x)", oldMember.NodeID, newMember.NodeID) - return errors.New("node ID mismatch match on node update") - } - - if oldMember.Addr == newMember.Addr || oldMember.Conn == nil { - // nothing to do + if newMember.RaftID == n.Config.ID { return nil } - - newConn, err := n.ConnectToMember(newMember.Addr, 0) - if err != nil { - return errors.Errorf("could connect to member ID %x at %s: %v", newMember.RaftID, newMember.Addr, err) - } - if err := n.cluster.ReplaceMemberConnection(newMember.RaftID, oldMember, newConn, newMember.Addr, true); err != nil { - newConn.Conn.Close() + if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil { return err } - - return nil + return n.cluster.UpdateMember(newMember.RaftID, newMember) } // applyRemoveNode is called when we receive a ConfChange @@ -1724,11 +1655,11 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e } if cc.NodeID == n.Config.ID { + // wait the commit ack to be sent before closing connection n.asyncTasks.Wait() - n.removeRaftFunc() - + n.NodeRemoved() // if there are only 2 nodes in the cluster, and leader is leaving // before closing the connection, leader has to ensure that follower gets // noticed about this raft conf change commit. Otherwise, follower would @@ -1738,24 +1669,15 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e // while n.asyncTasks.Wait() could be helpful in this case // it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds) // TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+ + } else { + if err := n.transport.RemovePeer(cc.NodeID); err != nil { + return err + } } return n.cluster.RemoveMember(cc.NodeID) } -// ConnectToMember returns a member object with an initialized -// connection to communicate with other raft members -func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) { - conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout) - if err != nil { - return nil, err - } - - return &membership.Member{ - Conn: conn, - }, nil -} - // SubscribeLeadership returns channel to which events about leadership change // will be sent in form of raft.LeadershipState. Also cancel func is returned - // it should be called when listener is no longer interested in events. diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go index 402b04a33f..e56f624783 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go @@ -60,10 +60,26 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { n.Config.ID = raftNode.RaftID if snapshot != nil { - // Load the snapshot data into the store - if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil { + snapCluster, err := n.clusterSnapshot(snapshot.Data) + if err != nil { return err } + var bootstrapMembers []*api.RaftMember + if forceNewCluster { + for _, m := range snapCluster.Members { + if m.RaftID != n.Config.ID { + n.cluster.RemoveMember(m.RaftID) + continue + } + bootstrapMembers = append(bootstrapMembers, m) + } + } else { + bootstrapMembers = snapCluster.Members + } + n.bootstrapMembers = bootstrapMembers + for _, removedMember := range snapCluster.Removed { + n.cluster.RemoveMember(removedMember) + } } ents, st := waldata.Entries, waldata.HardState @@ -215,40 +231,18 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { <-viewStarted } -func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error { +func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) { var snapshot api.Snapshot if err := snapshot.Unmarshal(data); err != nil { - return err + return snapshot.Membership, err } if snapshot.Version != api.Snapshot_V0 { - return fmt.Errorf("unrecognized snapshot version %d", snapshot.Version) + return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version) } if err := n.memoryStore.Restore(&snapshot.Store); err != nil { - return err + return snapshot.Membership, err } - oldMembers := n.cluster.Members() - - for _, member := range snapshot.Membership.Members { - if forceNewCluster && member.RaftID != n.Config.ID { - n.cluster.RemoveMember(member.RaftID) - } else { - if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil { - return err - } - } - delete(oldMembers, member.RaftID) - } - - for _, removedMember := range snapshot.Membership.Removed { - n.cluster.RemoveMember(removedMember) - delete(oldMembers, removedMember) - } - - for member := range oldMembers { - n.cluster.ClearMember(member) - } - - return nil + return snapshot.Membership, nil } diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go b/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go new file mode 100644 index 0000000000..0ed0dc6c29 --- /dev/null +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go @@ -0,0 +1,299 @@ +package transport + +import ( + "fmt" + "sync" + "time" + + "golang.org/x/net/context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/manager/state/raft/membership" + "github.com/pkg/errors" +) + +type peer struct { + id uint64 + + tr *Transport + + msgc chan raftpb.Message + + ctx context.Context + cancel context.CancelFunc + done chan struct{} + + mu sync.Mutex + cc *grpc.ClientConn + addr string + newAddr string + + active bool + becameActive time.Time +} + +func newPeer(id uint64, addr string, tr *Transport) (*peer, error) { + cc, err := tr.dial(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to create conn for %x with addr %s", id, addr) + } + ctx, cancel := context.WithCancel(tr.ctx) + ctx = log.WithField(ctx, "peer_id", fmt.Sprintf("%x", id)) + p := &peer{ + id: id, + addr: addr, + cc: cc, + tr: tr, + ctx: ctx, + cancel: cancel, + msgc: make(chan raftpb.Message, 4096), + done: make(chan struct{}), + } + go p.run(ctx) + return p, nil +} + +func (p *peer) send(m raftpb.Message) (err error) { + p.mu.Lock() + defer func() { + if err != nil { + p.active = false + p.becameActive = time.Time{} + } + p.mu.Unlock() + }() + select { + case <-p.ctx.Done(): + return p.ctx.Err() + default: + } + select { + case p.msgc <- m: + case <-p.ctx.Done(): + return p.ctx.Err() + default: + p.tr.config.ReportUnreachable(p.id) + return errors.Errorf("peer is unreachable") + } + return nil +} + +func (p *peer) update(addr string) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.addr == addr { + return nil + } + cc, err := p.tr.dial(addr) + if err != nil { + return err + } + + p.cc.Close() + p.cc = cc + p.addr = addr + return nil +} + +func (p *peer) updateAddr(addr string) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.addr == addr { + return nil + } + log.G(p.ctx).Debugf("peer %x updated to address %s, it will be used if old failed", p.id, addr) + p.newAddr = addr + return nil +} + +func (p *peer) conn() *grpc.ClientConn { + p.mu.Lock() + defer p.mu.Unlock() + return p.cc +} + +func (p *peer) address() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.addr +} + +func (p *peer) resolveAddr(ctx context.Context, id uint64) (string, error) { + resp, err := api.NewRaftClient(p.conn()).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: id}) + if err != nil { + return "", errors.Wrap(err, "failed to resolve address") + } + return resp.Addr, nil +} + +func (p *peer) reportSnapshot(failure bool) { + if failure { + p.tr.config.ReportSnapshot(p.id, raft.SnapshotFailure) + return + } + p.tr.config.ReportSnapshot(p.id, raft.SnapshotFinish) +} + +func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error { + ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + defer cancel() + _, err := api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) + if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() { + p.tr.config.NodeRemoved() + } + if m.Type == raftpb.MsgSnap { + if err != nil { + p.tr.config.ReportSnapshot(m.To, raft.SnapshotFailure) + } else { + } + } + p.reportSnapshot(err != nil) + if err != nil { + p.tr.config.ReportUnreachable(m.To) + return err + } + return nil +} + +func healthCheckConn(ctx context.Context, cc *grpc.ClientConn) error { + resp, err := api.NewHealthClient(cc).Check(ctx, &api.HealthCheckRequest{Service: "Raft"}) + if err != nil { + return errors.Wrap(err, "failed to check health") + } + if resp.Status != api.HealthCheckResponse_SERVING { + return errors.Errorf("health check returned status %s", resp.Status) + } + return nil +} + +func (p *peer) healthCheck(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + defer cancel() + return healthCheckConn(ctx, p.conn()) +} + +func (p *peer) setActive() { + p.mu.Lock() + if !p.active { + p.active = true + p.becameActive = time.Now() + } + p.mu.Unlock() +} + +func (p *peer) setInactive() { + p.mu.Lock() + p.active = false + p.becameActive = time.Time{} + p.mu.Unlock() +} + +func (p *peer) activeTime() time.Time { + p.mu.Lock() + defer p.mu.Unlock() + return p.becameActive +} + +func (p *peer) drain() error { + ctx, cancel := context.WithTimeout(context.Background(), 16*time.Second) + defer cancel() + for { + select { + case m, ok := <-p.msgc: + if !ok { + // all messages proceeded + return nil + } + if err := p.sendProcessMessage(ctx, m); err != nil { + return errors.Wrap(err, "send drain message") + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (p *peer) handleAddressChange(ctx context.Context) error { + p.mu.Lock() + newAddr := p.newAddr + p.newAddr = "" + p.mu.Unlock() + if newAddr == "" { + return nil + } + cc, err := p.tr.dial(newAddr) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + defer cancel() + if err := healthCheckConn(ctx, cc); err != nil { + cc.Close() + return err + } + // there is possibility of race if host changing address too fast, but + // it's unlikely and eventually thing should be settled + p.mu.Lock() + p.cc.Close() + p.cc = cc + p.addr = newAddr + p.tr.config.UpdateNode(p.id, p.addr) + p.mu.Unlock() + return nil +} + +func (p *peer) run(ctx context.Context) { + defer func() { + p.mu.Lock() + p.active = false + p.becameActive = time.Time{} + // at this point we can be sure that nobody will write to msgc + if p.msgc != nil { + close(p.msgc) + } + p.mu.Unlock() + if err := p.drain(); err != nil { + log.G(ctx).WithError(err).Error("failed to drain message queue") + } + close(p.done) + }() + if err := p.healthCheck(ctx); err == nil { + p.setActive() + } + for { + select { + case <-ctx.Done(): + return + default: + } + + select { + case m := <-p.msgc: + // we do not propagate context here, because this operation should be finished + // or timed out for correct raft work. + err := p.sendProcessMessage(context.Background(), m) + if err != nil { + log.G(ctx).WithError(err).Debugf("failed to send message %s", m.Type) + p.setInactive() + if err := p.handleAddressChange(ctx); err != nil { + log.G(ctx).WithError(err).Error("failed to change address after failure") + } + continue + } + p.setActive() + case <-ctx.Done(): + return + } + } +} + +func (p *peer) stop() { + p.cancel() + <-p.done +} diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/transport/transport.go b/vendor/github.com/docker/swarmkit/manager/state/raft/transport/transport.go new file mode 100644 index 0000000000..ec1e971cf3 --- /dev/null +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/transport/transport.go @@ -0,0 +1,382 @@ +// Package transport provides grpc transport layer for raft. +// All methods are non-blocking. +package transport + +import ( + "sync" + "time" + + "golang.org/x/net/context" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/docker/swarmkit/log" + "github.com/pkg/errors" +) + +// ErrIsNotFound indicates that peer was never added to transport. +var ErrIsNotFound = errors.New("peer not found") + +// Raft is interface which represents Raft API for transport package. +type Raft interface { + ReportUnreachable(id uint64) + ReportSnapshot(id uint64, status raft.SnapshotStatus) + IsIDRemoved(id uint64) bool + UpdateNode(id uint64, addr string) + + NodeRemoved() +} + +// Config for Transport +type Config struct { + HeartbeatInterval time.Duration + SendTimeout time.Duration + Credentials credentials.TransportCredentials + RaftID string + + Raft +} + +// Transport is structure which manages remote raft peers and sends messages +// to them. +type Transport struct { + config *Config + + unknownc chan raftpb.Message + + mu sync.Mutex + peers map[uint64]*peer + stopped bool + + ctx context.Context + cancel context.CancelFunc + done chan struct{} + + deferredConns map[*grpc.ClientConn]*time.Timer +} + +// New returns new Transport with specified Config. +func New(cfg *Config) *Transport { + ctx, cancel := context.WithCancel(context.Background()) + if cfg.RaftID != "" { + ctx = log.WithField(ctx, "raft_id", cfg.RaftID) + } + t := &Transport{ + peers: make(map[uint64]*peer), + config: cfg, + unknownc: make(chan raftpb.Message), + done: make(chan struct{}), + ctx: ctx, + cancel: cancel, + + deferredConns: make(map[*grpc.ClientConn]*time.Timer), + } + go t.run(ctx) + return t +} + +func (t *Transport) run(ctx context.Context) { + defer func() { + log.G(ctx).Debug("stop transport") + t.mu.Lock() + defer t.mu.Unlock() + t.stopped = true + for _, p := range t.peers { + p.stop() + p.cc.Close() + } + for cc, timer := range t.deferredConns { + timer.Stop() + cc.Close() + } + t.deferredConns = nil + close(t.done) + }() + for { + select { + case <-ctx.Done(): + return + default: + } + + select { + case m := <-t.unknownc: + if err := t.sendUnknownMessage(ctx, m); err != nil { + log.G(ctx).WithError(err).Warnf("ignored message %s to unknown peer %x", m.Type, m.To) + } + case <-ctx.Done(): + return + } + } +} + +// Stop stops transport and waits until it finished +func (t *Transport) Stop() { + t.cancel() + <-t.done +} + +// Send sends raft message to remote peers. +func (t *Transport) Send(m raftpb.Message) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.stopped { + return errors.New("transport stopped") + } + if t.config.IsIDRemoved(m.To) { + return errors.Errorf("refusing to send message %s to removed member %x", m.Type, m.To) + } + p, ok := t.peers[m.To] + if !ok { + log.G(t.ctx).Warningf("sending message %s to an unrecognized member ID %x", m.Type, m.To) + select { + // we need to process messages to unknown peers in separate goroutine + // to not block sender + case t.unknownc <- m: + case <-t.ctx.Done(): + return t.ctx.Err() + default: + return errors.New("unknown messages queue is full") + } + return nil + } + if err := p.send(m); err != nil { + return errors.Wrapf(err, "failed to send message %x to %x", m.Type, m.To) + } + return nil +} + +// AddPeer adds new peer with id and address addr to Transport. +// If there is already peer with such id in Transport it will return error if +// address is different (UpdatePeer should be used) or nil otherwise. +func (t *Transport) AddPeer(id uint64, addr string) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.stopped { + return errors.New("transport stopped") + } + if ep, ok := t.peers[id]; ok { + if ep.address() == addr { + return nil + } + return errors.Errorf("peer %x already added with addr %s", id, ep.addr) + } + log.G(t.ctx).Debugf("transport: add peer %x with address %s", id, addr) + p, err := newPeer(id, addr, t) + if err != nil { + return errors.Wrapf(err, "failed to create peer %x with addr %s", id, addr) + } + t.peers[id] = p + return nil +} + +// RemovePeer removes peer from Transport and wait for it to stop. +func (t *Transport) RemovePeer(id uint64) error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.stopped { + return errors.New("transport stopped") + } + p, ok := t.peers[id] + if !ok { + return ErrIsNotFound + } + delete(t.peers, id) + cc := p.conn() + p.stop() + timer := time.AfterFunc(8*time.Second, func() { + t.mu.Lock() + if !t.stopped { + delete(t.deferredConns, cc) + cc.Close() + } + t.mu.Unlock() + }) + // store connection and timer for cleaning up on stop + t.deferredConns[cc] = timer + + return nil +} + +// UpdatePeer updates peer with new address. It replaces connection immediately. +func (t *Transport) UpdatePeer(id uint64, addr string) error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.stopped { + return errors.New("transport stopped") + } + p, ok := t.peers[id] + if !ok { + return ErrIsNotFound + } + if err := p.update(addr); err != nil { + return err + } + log.G(t.ctx).Debugf("peer %x updated to address %s", id, addr) + return nil +} + +// UpdatePeerAddr updates peer with new address, but delays connection creation. +// New address won't be used until first failure on old address. +func (t *Transport) UpdatePeerAddr(id uint64, addr string) error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.stopped { + return errors.New("transport stopped") + } + p, ok := t.peers[id] + if !ok { + return ErrIsNotFound + } + if err := p.updateAddr(addr); err != nil { + return err + } + return nil +} + +// PeerConn returns raw grpc connection to peer. +func (t *Transport) PeerConn(id uint64) (*grpc.ClientConn, error) { + t.mu.Lock() + defer t.mu.Unlock() + p, ok := t.peers[id] + if !ok { + return nil, ErrIsNotFound + } + p.mu.Lock() + active := p.active + p.mu.Unlock() + if !active { + return nil, errors.New("peer is inactive") + } + return p.conn(), nil +} + +// PeerAddr returns address of peer with id. +func (t *Transport) PeerAddr(id uint64) (string, error) { + t.mu.Lock() + defer t.mu.Unlock() + p, ok := t.peers[id] + if !ok { + return "", ErrIsNotFound + } + return p.address(), nil +} + +// HealthCheck checks health of particular peer. +func (t *Transport) HealthCheck(ctx context.Context, id uint64) error { + t.mu.Lock() + p, ok := t.peers[id] + t.mu.Unlock() + if !ok { + return ErrIsNotFound + } + ctx, cancel := t.withContext(ctx) + defer cancel() + return p.healthCheck(ctx) +} + +// Active returns true if node was recently active and false otherwise. +func (t *Transport) Active(id uint64) bool { + t.mu.Lock() + defer t.mu.Unlock() + p, ok := t.peers[id] + if !ok { + return false + } + p.mu.Lock() + active := p.active + p.mu.Unlock() + return active +} + +func (t *Transport) longestActive() (*peer, error) { + var longest *peer + var longestTime time.Time + t.mu.Lock() + defer t.mu.Unlock() + for _, p := range t.peers { + becameActive := p.activeTime() + if becameActive.IsZero() { + continue + } + if longest == nil { + longest = p + continue + } + if becameActive.Before(longestTime) { + longest = p + longestTime = becameActive + } + } + if longest == nil { + return nil, errors.New("failed to find longest active peer") + } + return longest, nil +} + +func (t *Transport) dial(addr string) (*grpc.ClientConn, error) { + grpcOptions := []grpc.DialOption{ + grpc.WithBackoffMaxDelay(8 * time.Second), + } + if t.config.Credentials != nil { + grpcOptions = append(grpcOptions, grpc.WithTransportCredentials(t.config.Credentials)) + } else { + grpcOptions = append(grpcOptions, grpc.WithInsecure()) + } + + if t.config.SendTimeout > 0 { + grpcOptions = append(grpcOptions, grpc.WithTimeout(t.config.SendTimeout)) + } + + cc, err := grpc.Dial(addr, grpcOptions...) + if err != nil { + return nil, err + } + + return cc, nil +} + +func (t *Transport) withContext(ctx context.Context) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + + go func() { + select { + case <-ctx.Done(): + case <-t.ctx.Done(): + cancel() + } + }() + return ctx, cancel +} + +func (t *Transport) resolvePeer(ctx context.Context, id uint64) (*peer, error) { + longestActive, err := t.longestActive() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(ctx, t.config.SendTimeout) + defer cancel() + addr, err := longestActive.resolveAddr(ctx, id) + if err != nil { + return nil, err + } + return newPeer(id, addr, t) +} + +func (t *Transport) sendUnknownMessage(ctx context.Context, m raftpb.Message) error { + p, err := t.resolvePeer(ctx, m.To) + if err != nil { + return errors.Wrapf(err, "failed to resolve peer") + } + defer p.cancel() + if err := p.sendProcessMessage(ctx, m); err != nil { + return errors.Wrapf(err, "failed to send message") + } + return nil +} diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index be776dc151..614aa3296d 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -14,6 +14,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/boltdb/bolt" + "github.com/docker/docker/pkg/plugingetter" "github.com/docker/swarmkit/agent" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" @@ -98,6 +99,9 @@ type Config struct { // Availability allows a user to control the current scheduling status of a node Availability api.NodeSpec_Availability + + // PluginGetter provides access to docker's plugin inventory. + PluginGetter plugingetter.PluginGetter } // Node implements the primary node functionality for a member of a swarm @@ -683,6 +687,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig AutoLockManagers: n.config.AutoLockManagers, UnlockKey: n.unlockKey, Availability: n.config.Availability, + PluginGetter: n.config.PluginGetter, }) if err != nil { return err diff --git a/vendor/github.com/docker/swarmkit/remotes/remotes.go b/vendor/github.com/docker/swarmkit/remotes/remotes.go index 9da0b7d494..e79ed3f326 100644 --- a/vendor/github.com/docker/swarmkit/remotes/remotes.go +++ b/vendor/github.com/docker/swarmkit/remotes/remotes.go @@ -91,9 +91,9 @@ func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) { // https://github.com/LK4D4/sample // - // The first link applies exponential distribution weight choice reservior + // The first link applies exponential distribution weight choice reservoir // sampling. This may be relevant if we view the master selection as a - // distributed reservior sampling problem. + // distributed reservoir sampling problem. // bias to zero-weighted remotes have same probability. otherwise, we // always select first entry when all are zero.