Merge pull request #30145 from anusha-ragunathan/ps-swarm

Pass plugingetter as part of swarm node config.
This commit is contained in:
Tibor Vass 2017-01-20 10:35:49 -08:00 committed by GitHub
commit 2b89356c07
29 changed files with 1144 additions and 650 deletions

View file

@ -56,4 +56,5 @@ type Backend interface {
GetRepository(context.Context, reference.NamedTagged, *types.AuthConfig) (distribution.Repository, bool, error)
LookupImage(name string) (*types.ImageInspect, error)
PluginManager() *plugin.Manager
PluginGetter() *plugin.Store
}

View file

@ -109,6 +109,7 @@ func (n *nodeRunner) start(conf nodeStartConfig) error {
ElectionTick: 3,
UnlockKey: conf.lockKey,
AutoLockManagers: conf.autolock,
PluginGetter: n.cluster.config.Backend.PluginGetter(),
}
if conf.availability != "" {
avail, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(conf.availability))]

View file

@ -1276,6 +1276,11 @@ func (daemon *Daemon) PluginManager() *plugin.Manager { // set up before daemon
return daemon.pluginManager
}
// PluginGetter returns current pluginStore associated with the daemon
func (daemon *Daemon) PluginGetter() *plugin.Store {
return daemon.PluginStore
}
// CreateDaemonRoot creates the root for the daemon
func CreateDaemonRoot(config *Config) error {
// get the canonical path to the Docker root directory

View file

@ -102,7 +102,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 62d835f478b2e4fd2768deb88fb3b32e334faaee
github.com/docker/swarmkit 98620dd1ddfcc03d8f4b0d2910ecf6b52918a731
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -148,7 +148,7 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
// Update updates the set of tasks and secret for the worker.
// Tasks in the added set will be added to the worker, and tasks in the removed set
// will be removed from the worker
// Serets in the added set will be added to the worker, and secrets in the removed set
// Secrets in the added set will be added to the worker, and secrets in the removed set
// will be removed from the worker.
func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error {
w.mu.Lock()

View file

@ -140,7 +140,12 @@ type ServiceSpec struct {
// *ServiceSpec_Global
Mode isServiceSpec_Mode `protobuf_oneof:"mode"`
// UpdateConfig controls the rate and policy of updates.
Update *UpdateConfig `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"`
Update *UpdateConfig `protobuf:"bytes,6,opt,name=update" json:"update,omitempty"`
// ServiceSpec.Networks has been deprecated and is replaced by
// Networks field in Task (TaskSpec.Networks).
// This field (ServiceSpec.Networks) is kept for compatibility.
// In case TaskSpec.Networks does not exist, ServiceSpec.Networks
// is still honored if it exists.
Networks []*NetworkAttachmentConfig `protobuf:"bytes,7,rep,name=networks" json:"networks,omitempty"`
// Service endpoint specifies the user provided configuration
// to properly discover and load balance a service.

View file

@ -72,6 +72,11 @@ message ServiceSpec {
// UpdateConfig controls the rate and policy of updates.
UpdateConfig update = 6;
// ServiceSpec.Networks has been deprecated and is replaced by
// Networks field in Task (TaskSpec.Networks).
// This field (ServiceSpec.Networks) is kept for compatibility.
// In case TaskSpec.Networks does not exist, ServiceSpec.Networks
// is still honored if it exists.
repeated NetworkAttachmentConfig networks = 7 [deprecated=true];
// Service endpoint specifies the user provided configuration

View file

@ -19,7 +19,7 @@ import (
type localRequestKeyType struct{}
// LocalRequestKey is a context key to mark a request that originating on the
// local node. The assocated value is a RemoteNodeInfo structure describing the
// local node. The associated value is a RemoteNodeInfo structure describing the
// local node.
var LocalRequestKey = localRequestKeyType{}

View file

@ -103,26 +103,9 @@ func makeExternalSignRequest(ctx context.Context, client *http.Client, url strin
if err != nil {
return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
}
doneReading := make(chan struct{})
bodyClosed := make(chan struct{})
go func() {
select {
case <-ctx.Done():
case <-doneReading:
}
resp.Body.Close()
close(bodyClosed)
}()
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
close(doneReading)
<-bodyClosed
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if err != nil {
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
}

View file

@ -353,7 +353,7 @@ func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr [
}, nil
}
// GetRootCACertificate returns the certificate of the Root CA. It is used as a convinience for distributing
// GetRootCACertificate returns the certificate of the Root CA. It is used as a convenience for distributing
// the root of trust for the swarm. Clients should be using the CA hash to verify if they weren't target to
// a MiTM. If they fail to do so, node bootstrap works with TOFU semantics.
func (s *Server) GetRootCACertificate(ctx context.Context, request *api.GetRootCACertificateRequest) (*api.GetRootCACertificateResponse, error) {

View file

@ -96,9 +96,9 @@ func (c *Conn) Close(success bool) error {
}
if success {
c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
} else {
c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
} else {
c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
}
return c.ClientConn.Close()

View file

@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}
// WithFields returns a new context with added fields to logger.
func WithFields(ctx context.Context, fields logrus.Fields) context.Context {
logger := ctx.Value(loggerKey{})
if logger == nil {
logger = L
}
return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields))
}
// WithField is convenience wrapper around WithFields.
func WithField(ctx context.Context, key, value string) context.Context {
return WithFields(ctx, logrus.Fields{key: value})
}
// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {

View file

@ -3,6 +3,7 @@ package allocator
import (
"sync"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/go-events"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
@ -27,6 +28,9 @@ type Allocator struct {
stopChan chan struct{}
// doneChan is closed when the allocator is finished running.
doneChan chan struct{}
// pluginGetter provides access to docker's plugin inventory.
pluginGetter plugingetter.PluginGetter
}
// taskBallot controls how the voting for task allocation is
@ -67,14 +71,15 @@ type allocActor struct {
// New returns a new instance of Allocator for use during allocation
// stage of the manager.
func New(store *store.MemoryStore) (*Allocator, error) {
func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, error) {
a := &Allocator{
store: store,
taskBallot: &taskBallot{
votes: make(map[string][]string),
},
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
pluginGetter: pg,
}
return a, nil

View file

@ -73,7 +73,7 @@ type networkContext struct {
}
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
na, err := networkallocator.New()
na, err := networkallocator.New(a.pluginGetter)
if err != nil {
return err
}

View file

@ -4,7 +4,7 @@ import (
"fmt"
"net"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/drvregistry"
@ -49,7 +49,7 @@ type NetworkAllocator struct {
nodes map[string]struct{}
}
// Local in-memory state related to netwok that need to be tracked by NetworkAllocator
// Local in-memory state related to network that need to be tracked by NetworkAllocator
type network struct {
// A local cache of the store object.
nw *api.Network
@ -69,7 +69,7 @@ type initializer struct {
}
// New returns a new NetworkAllocator handle
func New() (*NetworkAllocator, error) {
func New(pg plugingetter.PluginGetter) (*NetworkAllocator, error) {
na := &NetworkAllocator{
networks: make(map[string]*network),
services: make(map[string]struct{}),
@ -79,7 +79,7 @@ func New() (*NetworkAllocator, error) {
// There are no driver configurations and notification
// functions as of now.
reg, err := drvregistry.New(nil, nil, nil, nil, nil)
reg, err := drvregistry.New(nil, nil, nil, nil, pg)
if err != nil {
return nil, err
}
@ -133,7 +133,7 @@ func (na *NetworkAllocator) getNetwork(id string) *network {
}
// Deallocate frees all the general and driver specific resources
// whichs were assigned to the passed network.
// which were assigned to the passed network.
func (na *NetworkAllocator) Deallocate(n *api.Network) error {
localNet := na.getNetwork(n.ID)
if localNet == nil {
@ -657,7 +657,11 @@ func (na *NetworkAllocator) resolveDriver(n *api.Network) (driverapi.Driver, str
}
func (na *NetworkAllocator) loadDriver(name string) error {
_, err := plugins.Get(name, driverapi.NetworkPluginEndpointType)
pg := na.drvRegistry.GetPluginGetter()
if pg == nil {
return fmt.Errorf("plugin store is unintialized")
}
_, err := pg.Get(name, driverapi.NetworkPluginEndpointType, plugingetter.Lookup)
return err
}

View file

@ -17,12 +17,12 @@ const (
dynamicPortEnd = 32767
// The start of master port range which will hold all the
// allocation state of ports allocated so far regerdless of
// allocation state of ports allocated so far regardless of
// whether it was user defined or not.
masterPortStart = 1
// The end of master port range which will hold all the
// allocation state of ports allocated so far regerdless of
// allocation state of ports allocated so far regardless of
// whether it was user defined or not.
masterPortEnd = 65535
)
@ -65,7 +65,7 @@ func (ps allocatedPorts) addState(p *api.PortConfig) {
// Note multiple dynamically allocated ports might exists. In this case,
// we will remove only at a time so both allocated ports are tracked.
//
// Note becasue of the potential co-existence of user-defined and dynamically
// Note because of the potential co-existence of user-defined and dynamically
// allocated ports, delState has to be called for user-defined port first.
// dynamically allocated ports should be removed later.
func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig {
@ -277,7 +277,7 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
}
// If service has allocated endpoint while has no user-defined endpoint,
// we assume allocated endpoints are redudant, and they need deallocated.
// we assume allocated endpoints are redundant, and they need deallocated.
// If service has no allocated endpoint while has user-defined endpoint,
// we assume it is not allocated.
if (s.Endpoint != nil && s.Spec.Endpoint == nil) ||

View file

@ -502,7 +502,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
if !reflect.DeepEqual(requestSpecNetworks, specNetworks) {
return errNetworkUpdateNotSupported
return grpc.Errorf(codes.Unimplemented, errNetworkUpdateNotSupported.Error())
}
// Check to see if all the secrets being added exist as objects
@ -516,11 +516,11 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
// with service mode change (comparing current config with previous config).
// proper way to change service mode is to delete and re-add.
if reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) {
return errModeChangeNotAllowed
return grpc.Errorf(codes.Unimplemented, errModeChangeNotAllowed.Error())
}
if service.Spec.Annotations.Name != request.Spec.Annotations.Name {
return errRenameNotSupported
return grpc.Errorf(codes.Unimplemented, errRenameNotSupported.Error())
}
service.Meta.Version = *request.ServiceVersion

View file

@ -106,6 +106,7 @@ type nodeUpdate struct {
// Dispatcher is responsible for dispatching tasks and tracking agent health.
type Dispatcher struct {
mu sync.Mutex
wg sync.WaitGroup
nodes *nodeStore
store *store.MemoryStore
mgrQueue *watch.Queue
@ -216,6 +217,9 @@ func (d *Dispatcher) Run(ctx context.Context) error {
defer cancel()
d.ctx, d.cancel = context.WithCancel(ctx)
ctx = d.ctx
d.wg.Add(1)
defer d.wg.Done()
d.mu.Unlock()
publishManagers := func(peers []*api.Peer) {
@ -240,10 +244,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
case ev := <-peerWatcher:
publishManagers(ev.([]*api.Peer))
case <-d.processUpdatesTrigger:
d.processUpdates()
d.processUpdates(ctx)
batchTimer.Reset(maxBatchInterval)
case <-batchTimer.C:
d.processUpdates()
d.processUpdates(ctx)
batchTimer.Reset(maxBatchInterval)
case v := <-configWatcher:
cluster := v.(state.EventUpdateCluster)
@ -260,7 +264,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
d.mu.Unlock()
d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys)
case <-d.ctx.Done():
case <-ctx.Done():
return nil
}
}
@ -287,17 +291,20 @@ func (d *Dispatcher) Stop() error {
d.mgrQueue.Close()
d.keyMgrQueue.Close()
d.wg.Wait()
return nil
}
func (d *Dispatcher) isRunningLocked() error {
func (d *Dispatcher) isRunningLocked() (context.Context, error) {
d.mu.Lock()
if !d.isRunning() {
d.mu.Unlock()
return grpc.Errorf(codes.Aborted, "dispatcher is stopped")
return nil, grpc.Errorf(codes.Aborted, "dispatcher is stopped")
}
ctx := d.ctx
d.mu.Unlock()
return nil
return ctx, nil
}
func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
@ -377,7 +384,7 @@ func (d *Dispatcher) isRunning() bool {
// markNodeReady updates the description of a node, updates its address, and sets status to READY
// this is used during registration when a new node description is provided
// and during node updates when the node description changes
func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error {
func (d *Dispatcher) markNodeReady(ctx context.Context, nodeID string, description *api.NodeDescription, addr string) error {
d.nodeUpdatesLock.Lock()
d.nodeUpdates[nodeID] = nodeUpdate{
status: &api.NodeStatus{
@ -396,8 +403,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti
if numUpdates >= maxBatchItems {
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
return d.ctx.Err()
case <-ctx.Done():
return ctx.Err()
}
}
@ -405,8 +412,8 @@ func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescripti
// Wait until the node update batch happens before unblocking register.
d.processUpdatesLock.Lock()
select {
case <-d.ctx.Done():
return d.ctx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}
d.processUpdatesCond.Wait()
@ -431,7 +438,8 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return "", err
}
@ -453,7 +461,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
log.G(ctx).Debugf(err.Error())
}
if err := d.markNodeReady(nodeID, description, addr); err != nil {
if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
return "", err
}
@ -496,7 +504,8 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
}
log := log.G(ctx).WithFields(fields)
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return nil, err
}
@ -542,13 +551,13 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
if numUpdates >= maxBatchItems {
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
case <-dctx.Done():
}
}
return nil, nil
}
func (d *Dispatcher) processUpdates() {
func (d *Dispatcher) processUpdates(ctx context.Context) {
var (
taskUpdates map[string]*api.TaskStatus
nodeUpdates map[string]nodeUpdate
@ -571,7 +580,7 @@ func (d *Dispatcher) processUpdates() {
return
}
log := log.G(d.ctx).WithFields(logrus.Fields{
log := log.G(ctx).WithFields(logrus.Fields{
"method": "(*Dispatcher).processUpdates",
})
@ -661,7 +670,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
}
nodeID := nodeInfo.NodeID
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return err
}
@ -763,8 +773,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
break batchingLoop
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
case <-dctx.Done():
return dctx.Err()
}
}
@ -783,7 +793,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
nodeID := nodeInfo.NodeID
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return err
}
@ -1075,8 +1086,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
break batchingLoop
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
case <-dctx.Done():
return dctx.Err()
}
}
@ -1197,16 +1208,14 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
// markNodeNotReady sets the node state to some state other than READY
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return err
}
// Node is down. Add it to down nodes so that we can keep
// track of tasks assigned to the node.
var (
node *api.Node
err error
)
var node *api.Node
d.store.View(func(readTx store.ReadTx) {
node = store.GetNode(readTx, id)
if node == nil {
@ -1219,7 +1228,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
expireFunc := func() {
if err := d.moveTasksToOrphaned(id); err != nil {
log.G(context.TODO()).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
}
d.downNodes.Delete(id)
@ -1243,7 +1252,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
if numUpdates >= maxBatchItems {
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
case <-dctx.Done():
}
}
@ -1291,7 +1300,8 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
nodeID := nodeInfo.NodeID
if err := d.isRunningLocked(); err != nil {
dctx, err := d.isRunningLocked()
if err != nil {
return err
}
@ -1310,7 +1320,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
log.G(ctx).Debugf(err.Error())
}
// update the node description
if err := d.markNodeReady(nodeID, r.Description, addr); err != nil {
if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil {
return err
}
}
@ -1401,7 +1411,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
return stream.Context().Err()
case <-node.Disconnect:
disconnect = true
case <-d.ctx.Done():
case <-dctx.Done():
disconnect = true
case ev := <-keyMgrUpdates:
netKeys = ev.([]*api.EncryptionKey)

View file

@ -14,6 +14,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
@ -105,6 +106,9 @@ type Config struct {
// Availability allows a user to control the current scheduling status of a node
Availability api.NodeSpec_Availability
// PluginGetter provides access to docker's plugin inventory.
PluginGetter plugingetter.PluginGetter
}
// Manager is the cluster manager for Swarm.
@ -478,7 +482,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {
// starting up.
<-m.started
// the mutex stops us from trying to stop while we're alrady stopping, or
// the mutex stops us from trying to stop while we're already stopping, or
// from returning before we've finished stopping.
m.mu.Lock()
defer m.mu.Unlock()
@ -833,7 +837,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
// shutdown underlying manager processes when leadership is
// lost.
m.allocator, err = allocator.New(s)
m.allocator, err = allocator.New(s, m.config.PluginGetter)
if err != nil {
log.G(ctx).WithError(err).Error("failed to create allocator")
// TODO(stevvooe): It doesn't seem correct here to fail

View file

@ -406,7 +406,11 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update
}
if delayStartCh != nil {
<-delayStartCh
select {
case <-delayStartCh:
case <-u.stopChan:
return nil
}
}
// Wait for the new task to come up.
@ -456,7 +460,11 @@ func (u *Updater) useExistingTask(ctx context.Context, slot orchestrator.Slot, e
}
if delayStartCh != nil {
<-delayStartCh
select {
case <-delayStartCh:
case <-u.stopChan:
return nil
}
}
}

View file

@ -39,7 +39,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
return nodeInfo
}
// addTask removes a task from nodeInfo if it's tracked there, and returns true
// removeTask removes a task from nodeInfo if it's tracked there, and returns true
// if nodeInfo was modified.
func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
oldTask, ok := nodeInfo.Tasks[t.ID]

View file

@ -532,7 +532,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
}
nodes := s.nodeSet.findBestNodes(len(taskGroup), s.pipeline.Process, nodeLess)
if len(nodes) == 0 {
nodeCount := len(nodes)
if nodeCount == 0 {
s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
return
}
@ -540,7 +541,7 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
failedConstraints := make(map[int]bool) // key is index in nodes slice
nodeIter := 0
for taskID, t := range taskGroup {
n := &nodes[nodeIter%len(nodes)]
n := &nodes[nodeIter%nodeCount]
log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", n.ID)
newT := *t
@ -555,16 +556,16 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
nodeInfo, err := s.nodeSet.nodeInfo(n.ID)
if err == nil && nodeInfo.addTask(&newT) {
s.nodeSet.updateNode(nodeInfo)
nodes[nodeIter%len(nodes)] = nodeInfo
nodes[nodeIter%nodeCount] = nodeInfo
}
schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
delete(taskGroup, taskID)
if nodeIter+1 < len(nodes) {
if nodeIter+1 < nodeCount {
// First pass fills the nodes until they have the same
// number of tasks from this service.
nextNode := nodes[(nodeIter+1)%len(nodes)]
nextNode := nodes[(nodeIter+1)%nodeCount]
if nodeLess(&nextNode, &nodeInfo) {
nodeIter++
}
@ -575,10 +576,10 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
}
origNodeIter := nodeIter
for failedConstraints[nodeIter%len(nodes)] || !s.pipeline.Process(&nodes[nodeIter%len(nodes)]) {
failedConstraints[nodeIter%len(nodes)] = true
for failedConstraints[nodeIter%nodeCount] || !s.pipeline.Process(&nodes[nodeIter%nodeCount]) {
failedConstraints[nodeIter%nodeCount] = true
nodeIter++
if nodeIter-origNodeIter == len(nodes) {
if nodeIter-origNodeIter == nodeCount {
// None of the nodes meet the constraints anymore.
s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
return

View file

@ -2,16 +2,12 @@ package membership
import (
"errors"
"fmt"
"sync"
"google.golang.org/grpc"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)
var (
@ -25,26 +21,19 @@ var (
ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
)
// deferredConn used to store removed members connection for some time.
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
type deferredConn struct {
tick int
conn *grpc.ClientConn
}
// Cluster represents a set of active
// raft Members
type Cluster struct {
mu sync.RWMutex
members map[uint64]*Member
deferedConns map[*deferredConn]struct{}
mu sync.RWMutex
members map[uint64]*Member
// removed contains the list of removed Members,
// those ids cannot be reused
removed map[uint64]bool
heartbeatTicks int
removed map[uint64]bool
PeersBroadcast *watch.Queue
}
@ -52,74 +41,19 @@ type Cluster struct {
// Member represents a raft Cluster Member
type Member struct {
*api.RaftMember
Conn *grpc.ClientConn
tick int
active bool
lastSeenHost string
}
// HealthCheck sends a health check RPC to the member and returns the response.
func (member *Member) HealthCheck(ctx context.Context) error {
healthClient := api.NewHealthClient(member.Conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return err
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}
// NewCluster creates a new Cluster neighbors list for a raft Member.
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
func NewCluster(heartbeatTicks int) *Cluster {
func NewCluster() *Cluster {
// TODO(abronan): generate Cluster ID for federation
return &Cluster{
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
deferedConns: make(map[*deferredConn]struct{}),
heartbeatTicks: heartbeatTicks,
PeersBroadcast: watch.NewQueue(),
}
}
func (c *Cluster) handleInactive() {
for _, m := range c.members {
if !m.active {
continue
}
m.tick++
if m.tick > c.heartbeatTicks {
m.active = false
if m.Conn != nil {
m.Conn.Close()
}
}
}
}
func (c *Cluster) handleDeferredConns() {
for dc := range c.deferedConns {
dc.tick++
if dc.tick > c.heartbeatTicks {
dc.conn.Close()
delete(c.deferedConns, dc)
}
}
}
// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
c.handleInactive()
c.handleDeferredConns()
}
// Members returns the list of raft Members in the Cluster.
func (c *Cluster) Members() map[uint64]*Member {
members := make(map[uint64]*Member)
@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
if c.removed[member.RaftID] {
return ErrIDRemoved
}
member.active = true
member.tick = 0
c.members[member.RaftID] = member
@ -187,6 +119,33 @@ func (c *Cluster) RemoveMember(id uint64) error {
return c.clearMember(id)
}
// UpdateMember updates member address.
func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.removed[id] {
return ErrIDRemoved
}
oldMember, ok := c.members[id]
if !ok {
return ErrIDNotFound
}
if oldMember.NodeID != m.NodeID {
// Should never happen; this is a sanity check
return errors.New("node ID mismatch match on node update")
}
if oldMember.Addr == m.Addr {
// nothing to do
return nil
}
oldMember.RaftMember = m
return nil
}
// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
// to the removed list.
func (c *Cluster) ClearMember(id uint64) error {
@ -197,45 +156,10 @@ func (c *Cluster) ClearMember(id uint64) error {
}
func (c *Cluster) clearMember(id uint64) error {
m, ok := c.members[id]
if ok {
if m.Conn != nil {
// defer connection close to after heartbeatTicks
dConn := &deferredConn{conn: m.Conn}
c.deferedConns[dConn] = struct{}{}
}
if _, ok := c.members[id]; ok {
delete(c.members, id)
c.broadcastUpdate()
}
c.broadcastUpdate()
return nil
}
// ReplaceMemberConnection replaces the member's GRPC connection.
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
c.mu.Lock()
defer c.mu.Unlock()
oldMember, ok := c.members[id]
if !ok {
return ErrIDNotFound
}
if !force && oldConn.Conn != oldMember.Conn {
// The connection was already replaced. Don't do it again.
newConn.Conn.Close()
return nil
}
if oldMember.Conn != nil {
oldMember.Conn.Close()
}
newMember := *oldMember
newMember.RaftMember = oldMember.RaftMember.Copy()
newMember.RaftMember.Addr = newAddr
newMember.Conn = newConn.Conn
c.members[id] = &newMember
return nil
}
@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
// Clear resets the list of active Members and removed Members.
func (c *Cluster) Clear() {
c.mu.Lock()
for _, member := range c.members {
if member.Conn != nil {
member.Conn.Close()
}
}
for dc := range c.deferedConns {
dc.conn.Close()
}
c.members = make(map[uint64]*Member)
c.removed = make(map[uint64]bool)
c.deferedConns = make(map[*deferredConn]struct{})
c.mu.Unlock()
}
// ReportActive reports that member is active (called ProcessRaftMessage),
func (c *Cluster) ReportActive(id uint64, sourceHost string) {
c.mu.Lock()
defer c.mu.Unlock()
m, ok := c.members[id]
if !ok {
return
}
m.tick = 0
m.active = true
if sourceHost != "" {
m.lastSeenHost = sourceHost
}
}
// Active returns true if node is active.
func (c *Cluster) Active(id uint64) bool {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if !ok {
return false
}
return m.active
}
// LastSeenHost returns the last observed source address that the specified
// member connected from.
func (c *Cluster) LastSeenHost(id uint64) string {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if ok {
return m.lastSeenHost
}
return ""
}
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
return nil
}
// CanRemoveMember checks if removing a Member would not result in a loss
// of quorum, this check is needed before submitting a configuration change
// that might block or harm the Cluster on Member recovery
func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
members := c.Members()
nreachable := 0 // reachable managers after removal
for _, m := range members {
if m.RaftID == id {
continue
}
// Local node from where the remove is issued
if m.RaftID == from {
nreachable++
continue
}
if c.Active(m.RaftID) {
nreachable++
}
}
nquorum := (len(members)-1)/2 + 1
if nreachable < nquorum {
return false
}
return true
}

View file

@ -27,6 +27,7 @@ import (
"github.com/docker/swarmkit/manager/raftselector"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/raft/storage"
"github.com/docker/swarmkit/manager/state/raft/transport"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
@ -51,8 +52,6 @@ var (
ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
// ErrNoClusterLeader is thrown when the cluster has no elected leader
ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
// ErrMemberUnknown is sent in response to a message from an
@ -88,8 +87,9 @@ type EncryptionKeyRotator interface {
// Node represents the Raft Node useful
// configuration.
type Node struct {
raftNode raft.Node
cluster *membership.Cluster
raftNode raft.Node
cluster *membership.Cluster
transport *transport.Transport
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
@ -100,6 +100,7 @@ type Node struct {
campaignWhenAble bool
signalledLeadership uint32
isMember uint32
bootstrapMembers []*api.RaftMember
// waitProp waits for all the proposals to be terminated before
// shutting down the node.
@ -113,9 +114,11 @@ type Node struct {
ticker clock.Ticker
doneCh chan struct{}
// RemovedFromRaft notifies about node deletion from raft cluster
RemovedFromRaft chan struct{}
removeRaftFunc func()
cancelFunc func()
RemovedFromRaft chan struct{}
cancelFunc func()
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
removeRaftOnce sync.Once
leadershipBroadcast *watch.Queue
// used to coordinate shutdown
@ -131,7 +134,6 @@ type Node struct {
// to stop.
stopped chan struct{}
lastSendToMember map[uint64]chan struct{}
raftLogger *storage.EncryptedRaftLogger
keyRotator EncryptionKeyRotator
rotationQueued bool
@ -189,7 +191,7 @@ func NewNode(opts NodeOptions) *Node {
raftStore := raft.NewMemoryStorage()
n := &Node{
cluster: membership.NewCluster(2 * cfg.ElectionTick),
cluster: membership.NewCluster(),
raftStore: raftStore,
opts: opts,
Config: &raft.Config{
@ -204,7 +206,6 @@ func NewNode(opts NodeOptions) *Node {
RemovedFromRaft: make(chan struct{}),
stopped: make(chan struct{}),
leadershipBroadcast: watch.NewQueue(),
lastSendToMember: make(map[uint64]chan struct{}),
keyRotator: opts.KeyRotator,
}
n.memoryStore = store.NewMemoryStore(n)
@ -218,16 +219,6 @@ func NewNode(opts NodeOptions) *Node {
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()
n.removeRaftFunc = func(n *Node) func() {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
atomic.StoreUint32(&n.isMember, 0)
close(n.RemovedFromRaft)
})
}
}(n)
n.cancelFunc = func(n *Node) func() {
var cancelOnce sync.Once
return func() {
@ -240,6 +231,34 @@ func NewNode(opts NodeOptions) *Node {
return n
}
// IsIDRemoved reports if member with id was removed from cluster.
// Part of transport.Raft interface.
func (n *Node) IsIDRemoved(id uint64) bool {
return n.cluster.IsIDRemoved(id)
}
// NodeRemoved signals that node was removed from cluster and should stop.
// Part of transport.Raft interface.
func (n *Node) NodeRemoved() {
n.removeRaftOnce.Do(func() {
atomic.StoreUint32(&n.isMember, 0)
close(n.RemovedFromRaft)
})
}
// ReportSnapshot reports snapshot status to underlying raft node.
// Part of transport.Raft interface.
func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
n.raftNode.ReportSnapshot(id, status)
}
// ReportUnreachable reports to underlying raft node that member with id is
// unreachable.
// Part of transport.Raft interface.
func (n *Node) ReportUnreachable(id uint64) {
n.raftNode.ReportUnreachable(id)
}
// WithContext returns context which is cancelled when parent context cancelled
// or node is stopped.
func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
@ -255,13 +274,29 @@ func (n *Node) WithContext(ctx context.Context) (context.Context, context.Cancel
return ctx, cancel
}
func (n *Node) initTransport() {
transportConfig := &transport.Config{
HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
SendTimeout: n.opts.SendTimeout,
Credentials: n.opts.TLSCredentials,
Raft: n,
}
n.transport = transport.New(transportConfig)
}
// JoinAndStart joins and starts the raft server
func (n *Node) JoinAndStart(ctx context.Context) (err error) {
ctx, cancel := n.WithContext(ctx)
defer func() {
cancel()
if err != nil {
n.stopMu.Lock()
// to shutdown transport
close(n.stopped)
n.stopMu.Unlock()
n.done()
} else {
atomic.StoreUint32(&n.isMember, 1)
}
}()
@ -281,58 +316,59 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
n.snapshotMeta = snapshot.Metadata
n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
if loadAndStartErr == storage.ErrNoWAL {
// restore from snapshot
if loadAndStartErr == nil {
if n.opts.JoinAddr != "" {
c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second)
if err != nil {
return err
}
client := api.NewRaftMembershipClient(c.Conn)
defer func() {
_ = c.Conn.Close()
}()
joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
defer joinCancel()
resp, err := client.Join(joinCtx, &api.JoinRequest{
Addr: n.opts.Addr,
})
if err != nil {
return err
}
n.Config.ID = resp.RaftID
if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.raftNode = raft.StartNode(n.Config, []raft.Peer{})
if err := n.registerNodes(resp.Members); err != nil {
n.raftLogger.Close(ctx)
return err
}
} else {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
peer, err := n.newRaftLogs(n.opts.ID)
if err != nil {
return err
}
n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
n.campaignWhenAble = true
log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
}
atomic.StoreUint32(&n.isMember, 1)
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}
if n.opts.JoinAddr != "" {
log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
// first member of cluster
if n.opts.JoinAddr == "" {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
peer, err := n.newRaftLogs(n.opts.ID)
if err != nil {
return err
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
return nil
}
n.campaignWhenAble = true
n.raftNode = raft.RestartNode(n.Config)
atomic.StoreUint32(&n.isMember, 1)
// join to existing cluster
conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := api.NewRaftMembershipClient(conn)
joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
defer joinCancel()
resp, err := client.Join(joinCtx, &api.JoinRequest{
Addr: n.opts.Addr,
})
if err != nil {
return err
}
n.Config.ID = resp.RaftID
if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.bootstrapMembers = resp.Members
n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)
return nil
}
@ -372,6 +408,9 @@ func (n *Node) done() {
n.leadershipBroadcast.Close()
n.cluster.PeersBroadcast.Close()
n.memoryStore.Close()
if n.transport != nil {
n.transport.Stop()
}
close(n.doneCh)
}
@ -391,6 +430,12 @@ func (n *Node) Run(ctx context.Context) error {
ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
ctx, cancel := context.WithCancel(ctx)
for _, node := range n.bootstrapMembers {
if err := n.registerNode(node); err != nil {
log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
}
}
defer func() {
cancel()
n.stop(ctx)
@ -414,7 +459,6 @@ func (n *Node) Run(ctx context.Context) error {
select {
case <-n.ticker.C():
n.raftNode.Tick()
n.cluster.Tick()
case rd := <-n.raftNode.Ready():
raftConfig := n.getCurrentRaftConfig()
@ -423,10 +467,10 @@ func (n *Node) Run(ctx context.Context) error {
return errors.Wrap(err, "failed to save entries to storage")
}
if len(rd.Messages) != 0 {
for _, msg := range rd.Messages {
// Send raft messages to peers
if err := n.send(ctx, rd.Messages); err != nil {
log.G(ctx).WithError(err).Error("failed to send message to members")
if err := n.transport.Send(msg); err != nil {
log.G(ctx).WithError(err).Error("failed to send message to member")
}
}
@ -435,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error {
// saveToStorage.
if !raft.IsEmptySnap(rd.Snapshot) {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
}
n.appliedIndex = rd.Snapshot.Metadata.Index
n.snapshotMeta = rd.Snapshot.Metadata
@ -555,6 +599,40 @@ func (n *Node) Run(ctx context.Context) error {
}
}
func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
snapCluster, err := n.clusterSnapshot(data)
if err != nil {
return err
}
oldMembers := n.cluster.Members()
for _, member := range snapCluster.Members {
delete(oldMembers, member.RaftID)
}
for _, removedMember := range snapCluster.Removed {
n.cluster.RemoveMember(removedMember)
if err := n.transport.RemovePeer(removedMember); err != nil {
log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", removedMember)
}
delete(oldMembers, removedMember)
}
for id, member := range oldMembers {
n.cluster.ClearMember(id)
if err := n.transport.RemovePeer(member.RaftID); err != nil {
log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
}
}
for _, node := range snapCluster.Members {
if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
log.G(ctx).WithError(err).Error("failed to register node from snapshot")
}
}
return nil
}
func (n *Node) needsSnapshot(ctx context.Context) bool {
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys()
@ -798,22 +876,27 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
// checkHealth tries to contact an aspiring member through its advertised address
// and checks if its raft server is running.
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
conn, err := n.ConnectToMember(addr, timeout)
conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
if err != nil {
return err
}
defer conn.Close()
if timeout != 0 {
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = tctx
}
defer conn.Conn.Close()
if err := conn.HealthCheck(ctx); err != nil {
healthClient := api.NewHealthClient(conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}
@ -841,11 +924,15 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID
return n.configure(ctx, cc)
}
// updateMember submits a configuration change to change a member's address.
func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
// updateNodeBlocking runs synchronous job to update node address in whole cluster.
func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
m := n.cluster.GetMember(id)
if m == nil {
return errors.Errorf("member %x is not found for update", id)
}
node := api.RaftMember{
RaftID: raftID,
NodeID: nodeID,
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: addr,
}
@ -856,7 +943,7 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode,
NodeID: raftID,
NodeID: id,
Context: meta,
}
@ -864,6 +951,18 @@ func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nod
return n.configure(ctx, cc)
}
// UpdateNode submits a configuration change to change a member's address.
func (n *Node) UpdateNode(id uint64, addr string) {
ctx, cancel := n.WithContext(context.Background())
defer cancel()
// spawn updating info in raft in background to unblock transport
go func() {
if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
}
}()
}
// Leave asks to a member of the raft to remove
// us from the raft cluster. This method is called
// from a member who is willing to leave its raft
@ -897,7 +996,31 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
// CanRemoveMember checks if a member can be removed from
// the context of the current node.
func (n *Node) CanRemoveMember(id uint64) bool {
return n.cluster.CanRemoveMember(n.Config.ID, id)
members := n.cluster.Members()
nreachable := 0 // reachable managers after removal
for _, m := range members {
if m.RaftID == id {
continue
}
// Local node from where the remove is issued
if m.RaftID == n.Config.ID {
nreachable++
continue
}
if n.transport.Active(m.RaftID) {
nreachable++
}
}
nquorum := (len(members)-1)/2 + 1
if nreachable < nquorum {
return false
}
return true
}
func (n *Node) removeMember(ctx context.Context, id uint64) error {
@ -915,7 +1038,7 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {
n.membershipLock.Lock()
defer n.membershipLock.Unlock()
if n.cluster.CanRemoveMember(n.Config.ID, id) {
if n.CanRemoveMember(id) {
cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
@ -956,6 +1079,34 @@ func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaf
return log.G(ctx).WithFields(fields)
}
func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
// too early
if !n.IsMember() {
return nil
}
p, ok := peer.FromContext(ctx)
if !ok {
return nil
}
oldAddr, err := n.transport.PeerAddr(id)
if err != nil {
return err
}
newHost, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return err
}
_, officialPort, err := net.SplitHostPort(oldAddr)
if err != nil {
return err
}
newAddr := net.JoinHostPort(newHost, officialPort)
if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil {
return err
}
return nil
}
// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
@ -969,32 +1120,25 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error())
return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
}
var sourceHost string
peer, ok := peer.FromContext(ctx)
if ok {
sourceHost, _, _ = net.SplitHostPort(peer.Addr.String())
}
n.cluster.ReportActive(msg.Message.From, sourceHost)
ctx, cancel := n.WithContext(ctx)
defer cancel()
if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
}
// Reject vote requests from unreachable peers
if msg.Message.Type == raftpb.MsgVote {
member := n.cluster.GetMember(msg.Message.From)
if member == nil || member.Conn == nil {
if member == nil {
n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
return &api.ProcessRaftMessageResponse{}, nil
}
healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancel()
if err := member.HealthCheck(healthCtx); err != nil {
if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil
}
@ -1064,17 +1208,11 @@ func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
if leader == n.Config.ID {
return nil, raftselector.ErrIsLeader
}
l := n.cluster.GetMember(leader)
if l == nil {
return nil, errors.New("no leader found")
conn, err := n.transport.PeerConn(leader)
if err != nil {
return nil, errors.Wrap(err, "failed to get connection to leader")
}
if !n.cluster.Active(leader) {
return nil, errors.New("leader marked as inactive")
}
if l.Conn == nil {
return nil, errors.New("no connection to leader in member list")
}
return l.Conn, nil
return conn, nil
}
// LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
@ -1122,8 +1260,12 @@ func (n *Node) registerNode(node *api.RaftMember) error {
// and are adding ourself now with the remotely-reachable
// address.
if existingMember.Addr != node.Addr {
if node.RaftID != n.Config.ID {
if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
return err
}
}
member.RaftMember = node
member.Conn = existingMember.Conn
n.cluster.AddMember(member)
}
@ -1132,11 +1274,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
// Avoid opening a connection to the local node
if node.RaftID != n.Config.ID {
// We don't want to impose a timeout on the grpc connection. It
// should keep retrying as long as necessary, in case the peer
// is temporarily unavailable.
var err error
if member, err = n.ConnectToMember(node.Addr, 0); err != nil {
if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
return err
}
}
@ -1144,8 +1282,8 @@ func (n *Node) registerNode(node *api.RaftMember) error {
member.RaftMember = node
err := n.cluster.AddMember(member)
if err != nil {
if member.Conn != nil {
_ = member.Conn.Close()
if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
}
return err
}
@ -1153,17 +1291,6 @@ func (n *Node) registerNode(node *api.RaftMember) error {
return nil
}
// registerNodes registers a set of nodes in the cluster
func (n *Node) registerNodes(nodes []*api.RaftMember) error {
for _, node := range nodes {
if err := n.registerNode(node); err != nil {
return err
}
}
return nil
}
// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
@ -1209,7 +1336,7 @@ func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
leader := false
if member.RaftID != n.Config.ID {
if !n.cluster.Active(member.RaftID) {
if !n.transport.Active(member.RaftID) {
reachability = api.RaftMemberStatus_UNREACHABLE
}
}
@ -1294,183 +1421,6 @@ func (n *Node) saveToStorage(
return nil
}
// Sends a series of messages to members in the raft
func (n *Node) send(ctx context.Context, messages []raftpb.Message) error {
members := n.cluster.Members()
n.stopMu.RLock()
defer n.stopMu.RUnlock()
for _, m := range messages {
// Process locally
if m.To == n.Config.ID {
if err := n.raftNode.Step(ctx, m); err != nil {
return err
}
continue
}
if m.Type == raftpb.MsgProp {
// We don't forward proposals to the leader. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
continue
}
ch := make(chan struct{})
n.asyncTasks.Add(1)
go n.sendToMember(ctx, members, m, n.lastSendToMember[m.To], ch)
n.lastSendToMember[m.To] = ch
}
return nil
}
func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.Member, m raftpb.Message, lastSend <-chan struct{}, thisSend chan<- struct{}) {
defer n.asyncTasks.Done()
defer close(thisSend)
if lastSend != nil {
waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer waitCancel()
select {
case <-lastSend:
case <-waitCtx.Done():
return
}
select {
case <-waitCtx.Done():
return
default:
}
}
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer cancel()
if n.cluster.IsIDRemoved(m.To) {
// Should not send to removed members
return
}
var conn *membership.Member
if toMember, ok := members[m.To]; ok {
conn = toMember
} else {
// If we are being asked to send to a member that's not in
// our member list, that could indicate that the current leader
// was added while we were offline. Try to resolve its address.
log.G(ctx).Warningf("sending message to an unrecognized member ID %x", m.To)
// Choose a random member
var (
queryMember *membership.Member
id uint64
)
for id, queryMember = range members {
if id != n.Config.ID {
break
}
}
if queryMember == nil || queryMember.RaftID == n.Config.ID {
log.G(ctx).Error("could not find cluster member to query for leader address")
return
}
resp, err := api.NewRaftClient(queryMember.Conn).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: m.To})
if err != nil {
log.G(ctx).WithError(err).Errorf("could not resolve address of member ID %x", m.To)
return
}
conn, err = n.ConnectToMember(resp.Addr, n.opts.SendTimeout)
if err != nil {
log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, resp.Addr)
return
}
// The temporary connection is only used for this message.
// Eventually, we should catch up and add a long-lived
// connection to the member list.
defer conn.Conn.Close()
}
_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
n.raftNode.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if !n.IsMember() {
// node is removed from cluster or stopped
return
}
n.raftNode.ReportUnreachable(m.To)
lastSeenHost := n.cluster.LastSeenHost(m.To)
if lastSeenHost != "" {
// Check if address has changed
officialHost, officialPort, _ := net.SplitHostPort(conn.Addr)
if officialHost != lastSeenHost {
reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort)
log.G(ctx).Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr)
if err := n.handleAddressChange(ctx, conn, reconnectAddr); err != nil {
log.G(ctx).WithError(err).Error("failed to hande address change")
}
return
}
}
// Bounce the connection
newConn, err := n.ConnectToMember(conn.Addr, 0)
if err != nil {
log.G(ctx).WithError(err).Errorf("could connect to member ID %x at %s", m.To, conn.Addr)
return
}
err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn, conn.Addr, false)
if err != nil {
log.G(ctx).WithError(err).Error("failed to replace connection to raft member")
newConn.Conn.Close()
}
} else if m.Type == raftpb.MsgSnap {
n.raftNode.ReportSnapshot(m.To, raft.SnapshotFinish)
}
}
func (n *Node) handleAddressChange(ctx context.Context, member *membership.Member, reconnectAddr string) error {
newConn, err := n.ConnectToMember(reconnectAddr, 0)
if err != nil {
return errors.Wrapf(err, "could connect to member ID %x at observed address %s", member.RaftID, reconnectAddr)
}
healthCtx, cancelHealth := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancelHealth()
if err := newConn.HealthCheck(healthCtx); err != nil {
return errors.Wrapf(err, "%x failed health check at observed address %s", member.RaftID, reconnectAddr)
}
if err := n.cluster.ReplaceMemberConnection(member.RaftID, member, newConn, reconnectAddr, false); err != nil {
newConn.Conn.Close()
return errors.Wrap(err, "failed to replace connection to raft member")
}
// If we're the leader, write the address change to raft
updateCtx, cancelUpdate := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancelUpdate()
if err := n.updateMember(updateCtx, reconnectAddr, member.RaftID, member.NodeID); err != nil {
return errors.Wrap(err, "failed to update member address in raft")
}
return nil
}
// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
@ -1681,32 +1631,13 @@ func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error
return err
}
oldMember := n.cluster.GetMember(newMember.RaftID)
if oldMember == nil {
return ErrMemberUnknown
}
if oldMember.NodeID != newMember.NodeID {
// Should never happen; this is a sanity check
log.G(ctx).Errorf("node ID mismatch on node update (old: %x, new: %x)", oldMember.NodeID, newMember.NodeID)
return errors.New("node ID mismatch match on node update")
}
if oldMember.Addr == newMember.Addr || oldMember.Conn == nil {
// nothing to do
if newMember.RaftID == n.Config.ID {
return nil
}
newConn, err := n.ConnectToMember(newMember.Addr, 0)
if err != nil {
return errors.Errorf("could connect to member ID %x at %s: %v", newMember.RaftID, newMember.Addr, err)
}
if err := n.cluster.ReplaceMemberConnection(newMember.RaftID, oldMember, newConn, newMember.Addr, true); err != nil {
newConn.Conn.Close()
if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
return err
}
return nil
return n.cluster.UpdateMember(newMember.RaftID, newMember)
}
// applyRemoveNode is called when we receive a ConfChange
@ -1724,11 +1655,11 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
}
if cc.NodeID == n.Config.ID {
// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()
n.removeRaftFunc()
n.NodeRemoved()
// if there are only 2 nodes in the cluster, and leader is leaving
// before closing the connection, leader has to ensure that follower gets
// noticed about this raft conf change commit. Otherwise, follower would
@ -1738,24 +1669,15 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
// while n.asyncTasks.Wait() could be helpful in this case
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
} else {
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
return err
}
}
return n.cluster.RemoveMember(cc.NodeID)
}
// ConnectToMember returns a member object with an initialized
// connection to communicate with other raft members
func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) {
conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
if err != nil {
return nil, err
}
return &membership.Member{
Conn: conn,
}, nil
}
// SubscribeLeadership returns channel to which events about leadership change
// will be sent in form of raft.LeadershipState. Also cancel func is returned -
// it should be called when listener is no longer interested in events.

View file

@ -60,10 +60,26 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
n.Config.ID = raftNode.RaftID
if snapshot != nil {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil {
snapCluster, err := n.clusterSnapshot(snapshot.Data)
if err != nil {
return err
}
var bootstrapMembers []*api.RaftMember
if forceNewCluster {
for _, m := range snapCluster.Members {
if m.RaftID != n.Config.ID {
n.cluster.RemoveMember(m.RaftID)
continue
}
bootstrapMembers = append(bootstrapMembers, m)
}
} else {
bootstrapMembers = snapCluster.Members
}
n.bootstrapMembers = bootstrapMembers
for _, removedMember := range snapCluster.Removed {
n.cluster.RemoveMember(removedMember)
}
}
ents, st := waldata.Entries, waldata.HardState
@ -215,40 +231,18 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
<-viewStarted
}
func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error {
func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
var snapshot api.Snapshot
if err := snapshot.Unmarshal(data); err != nil {
return err
return snapshot.Membership, err
}
if snapshot.Version != api.Snapshot_V0 {
return fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
}
if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
return err
return snapshot.Membership, err
}
oldMembers := n.cluster.Members()
for _, member := range snapshot.Membership.Members {
if forceNewCluster && member.RaftID != n.Config.ID {
n.cluster.RemoveMember(member.RaftID)
} else {
if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil {
return err
}
}
delete(oldMembers, member.RaftID)
}
for _, removedMember := range snapshot.Membership.Removed {
n.cluster.RemoveMember(removedMember)
delete(oldMembers, removedMember)
}
for member := range oldMembers {
n.cluster.ClearMember(member)
}
return nil
return snapshot.Membership, nil
}

View file

@ -0,0 +1,299 @@
package transport
import (
"fmt"
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/pkg/errors"
)
type peer struct {
id uint64
tr *Transport
msgc chan raftpb.Message
ctx context.Context
cancel context.CancelFunc
done chan struct{}
mu sync.Mutex
cc *grpc.ClientConn
addr string
newAddr string
active bool
becameActive time.Time
}
func newPeer(id uint64, addr string, tr *Transport) (*peer, error) {
cc, err := tr.dial(addr)
if err != nil {
return nil, errors.Wrapf(err, "failed to create conn for %x with addr %s", id, addr)
}
ctx, cancel := context.WithCancel(tr.ctx)
ctx = log.WithField(ctx, "peer_id", fmt.Sprintf("%x", id))
p := &peer{
id: id,
addr: addr,
cc: cc,
tr: tr,
ctx: ctx,
cancel: cancel,
msgc: make(chan raftpb.Message, 4096),
done: make(chan struct{}),
}
go p.run(ctx)
return p, nil
}
func (p *peer) send(m raftpb.Message) (err error) {
p.mu.Lock()
defer func() {
if err != nil {
p.active = false
p.becameActive = time.Time{}
}
p.mu.Unlock()
}()
select {
case <-p.ctx.Done():
return p.ctx.Err()
default:
}
select {
case p.msgc <- m:
case <-p.ctx.Done():
return p.ctx.Err()
default:
p.tr.config.ReportUnreachable(p.id)
return errors.Errorf("peer is unreachable")
}
return nil
}
func (p *peer) update(addr string) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.addr == addr {
return nil
}
cc, err := p.tr.dial(addr)
if err != nil {
return err
}
p.cc.Close()
p.cc = cc
p.addr = addr
return nil
}
func (p *peer) updateAddr(addr string) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.addr == addr {
return nil
}
log.G(p.ctx).Debugf("peer %x updated to address %s, it will be used if old failed", p.id, addr)
p.newAddr = addr
return nil
}
func (p *peer) conn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
return p.cc
}
func (p *peer) address() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.addr
}
func (p *peer) resolveAddr(ctx context.Context, id uint64) (string, error) {
resp, err := api.NewRaftClient(p.conn()).ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: id})
if err != nil {
return "", errors.Wrap(err, "failed to resolve address")
}
return resp.Addr, nil
}
func (p *peer) reportSnapshot(failure bool) {
if failure {
p.tr.config.ReportSnapshot(p.id, raft.SnapshotFailure)
return
}
p.tr.config.ReportSnapshot(p.id, raft.SnapshotFinish)
}
func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
defer cancel()
_, err := api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
p.tr.config.NodeRemoved()
}
if m.Type == raftpb.MsgSnap {
if err != nil {
p.tr.config.ReportSnapshot(m.To, raft.SnapshotFailure)
} else {
}
}
p.reportSnapshot(err != nil)
if err != nil {
p.tr.config.ReportUnreachable(m.To)
return err
}
return nil
}
func healthCheckConn(ctx context.Context, cc *grpc.ClientConn) error {
resp, err := api.NewHealthClient(cc).Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return errors.Wrap(err, "failed to check health")
}
if resp.Status != api.HealthCheckResponse_SERVING {
return errors.Errorf("health check returned status %s", resp.Status)
}
return nil
}
func (p *peer) healthCheck(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
defer cancel()
return healthCheckConn(ctx, p.conn())
}
func (p *peer) setActive() {
p.mu.Lock()
if !p.active {
p.active = true
p.becameActive = time.Now()
}
p.mu.Unlock()
}
func (p *peer) setInactive() {
p.mu.Lock()
p.active = false
p.becameActive = time.Time{}
p.mu.Unlock()
}
func (p *peer) activeTime() time.Time {
p.mu.Lock()
defer p.mu.Unlock()
return p.becameActive
}
func (p *peer) drain() error {
ctx, cancel := context.WithTimeout(context.Background(), 16*time.Second)
defer cancel()
for {
select {
case m, ok := <-p.msgc:
if !ok {
// all messages proceeded
return nil
}
if err := p.sendProcessMessage(ctx, m); err != nil {
return errors.Wrap(err, "send drain message")
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (p *peer) handleAddressChange(ctx context.Context) error {
p.mu.Lock()
newAddr := p.newAddr
p.newAddr = ""
p.mu.Unlock()
if newAddr == "" {
return nil
}
cc, err := p.tr.dial(newAddr)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
defer cancel()
if err := healthCheckConn(ctx, cc); err != nil {
cc.Close()
return err
}
// there is possibility of race if host changing address too fast, but
// it's unlikely and eventually thing should be settled
p.mu.Lock()
p.cc.Close()
p.cc = cc
p.addr = newAddr
p.tr.config.UpdateNode(p.id, p.addr)
p.mu.Unlock()
return nil
}
func (p *peer) run(ctx context.Context) {
defer func() {
p.mu.Lock()
p.active = false
p.becameActive = time.Time{}
// at this point we can be sure that nobody will write to msgc
if p.msgc != nil {
close(p.msgc)
}
p.mu.Unlock()
if err := p.drain(); err != nil {
log.G(ctx).WithError(err).Error("failed to drain message queue")
}
close(p.done)
}()
if err := p.healthCheck(ctx); err == nil {
p.setActive()
}
for {
select {
case <-ctx.Done():
return
default:
}
select {
case m := <-p.msgc:
// we do not propagate context here, because this operation should be finished
// or timed out for correct raft work.
err := p.sendProcessMessage(context.Background(), m)
if err != nil {
log.G(ctx).WithError(err).Debugf("failed to send message %s", m.Type)
p.setInactive()
if err := p.handleAddressChange(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to change address after failure")
}
continue
}
p.setActive()
case <-ctx.Done():
return
}
}
}
func (p *peer) stop() {
p.cancel()
<-p.done
}

View file

@ -0,0 +1,382 @@
// Package transport provides grpc transport layer for raft.
// All methods are non-blocking.
package transport
import (
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/log"
"github.com/pkg/errors"
)
// ErrIsNotFound indicates that peer was never added to transport.
var ErrIsNotFound = errors.New("peer not found")
// Raft is interface which represents Raft API for transport package.
type Raft interface {
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status raft.SnapshotStatus)
IsIDRemoved(id uint64) bool
UpdateNode(id uint64, addr string)
NodeRemoved()
}
// Config for Transport
type Config struct {
HeartbeatInterval time.Duration
SendTimeout time.Duration
Credentials credentials.TransportCredentials
RaftID string
Raft
}
// Transport is structure which manages remote raft peers and sends messages
// to them.
type Transport struct {
config *Config
unknownc chan raftpb.Message
mu sync.Mutex
peers map[uint64]*peer
stopped bool
ctx context.Context
cancel context.CancelFunc
done chan struct{}
deferredConns map[*grpc.ClientConn]*time.Timer
}
// New returns new Transport with specified Config.
func New(cfg *Config) *Transport {
ctx, cancel := context.WithCancel(context.Background())
if cfg.RaftID != "" {
ctx = log.WithField(ctx, "raft_id", cfg.RaftID)
}
t := &Transport{
peers: make(map[uint64]*peer),
config: cfg,
unknownc: make(chan raftpb.Message),
done: make(chan struct{}),
ctx: ctx,
cancel: cancel,
deferredConns: make(map[*grpc.ClientConn]*time.Timer),
}
go t.run(ctx)
return t
}
func (t *Transport) run(ctx context.Context) {
defer func() {
log.G(ctx).Debug("stop transport")
t.mu.Lock()
defer t.mu.Unlock()
t.stopped = true
for _, p := range t.peers {
p.stop()
p.cc.Close()
}
for cc, timer := range t.deferredConns {
timer.Stop()
cc.Close()
}
t.deferredConns = nil
close(t.done)
}()
for {
select {
case <-ctx.Done():
return
default:
}
select {
case m := <-t.unknownc:
if err := t.sendUnknownMessage(ctx, m); err != nil {
log.G(ctx).WithError(err).Warnf("ignored message %s to unknown peer %x", m.Type, m.To)
}
case <-ctx.Done():
return
}
}
}
// Stop stops transport and waits until it finished
func (t *Transport) Stop() {
t.cancel()
<-t.done
}
// Send sends raft message to remote peers.
func (t *Transport) Send(m raftpb.Message) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return errors.New("transport stopped")
}
if t.config.IsIDRemoved(m.To) {
return errors.Errorf("refusing to send message %s to removed member %x", m.Type, m.To)
}
p, ok := t.peers[m.To]
if !ok {
log.G(t.ctx).Warningf("sending message %s to an unrecognized member ID %x", m.Type, m.To)
select {
// we need to process messages to unknown peers in separate goroutine
// to not block sender
case t.unknownc <- m:
case <-t.ctx.Done():
return t.ctx.Err()
default:
return errors.New("unknown messages queue is full")
}
return nil
}
if err := p.send(m); err != nil {
return errors.Wrapf(err, "failed to send message %x to %x", m.Type, m.To)
}
return nil
}
// AddPeer adds new peer with id and address addr to Transport.
// If there is already peer with such id in Transport it will return error if
// address is different (UpdatePeer should be used) or nil otherwise.
func (t *Transport) AddPeer(id uint64, addr string) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return errors.New("transport stopped")
}
if ep, ok := t.peers[id]; ok {
if ep.address() == addr {
return nil
}
return errors.Errorf("peer %x already added with addr %s", id, ep.addr)
}
log.G(t.ctx).Debugf("transport: add peer %x with address %s", id, addr)
p, err := newPeer(id, addr, t)
if err != nil {
return errors.Wrapf(err, "failed to create peer %x with addr %s", id, addr)
}
t.peers[id] = p
return nil
}
// RemovePeer removes peer from Transport and wait for it to stop.
func (t *Transport) RemovePeer(id uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return errors.New("transport stopped")
}
p, ok := t.peers[id]
if !ok {
return ErrIsNotFound
}
delete(t.peers, id)
cc := p.conn()
p.stop()
timer := time.AfterFunc(8*time.Second, func() {
t.mu.Lock()
if !t.stopped {
delete(t.deferredConns, cc)
cc.Close()
}
t.mu.Unlock()
})
// store connection and timer for cleaning up on stop
t.deferredConns[cc] = timer
return nil
}
// UpdatePeer updates peer with new address. It replaces connection immediately.
func (t *Transport) UpdatePeer(id uint64, addr string) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return errors.New("transport stopped")
}
p, ok := t.peers[id]
if !ok {
return ErrIsNotFound
}
if err := p.update(addr); err != nil {
return err
}
log.G(t.ctx).Debugf("peer %x updated to address %s", id, addr)
return nil
}
// UpdatePeerAddr updates peer with new address, but delays connection creation.
// New address won't be used until first failure on old address.
func (t *Transport) UpdatePeerAddr(id uint64, addr string) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return errors.New("transport stopped")
}
p, ok := t.peers[id]
if !ok {
return ErrIsNotFound
}
if err := p.updateAddr(addr); err != nil {
return err
}
return nil
}
// PeerConn returns raw grpc connection to peer.
func (t *Transport) PeerConn(id uint64) (*grpc.ClientConn, error) {
t.mu.Lock()
defer t.mu.Unlock()
p, ok := t.peers[id]
if !ok {
return nil, ErrIsNotFound
}
p.mu.Lock()
active := p.active
p.mu.Unlock()
if !active {
return nil, errors.New("peer is inactive")
}
return p.conn(), nil
}
// PeerAddr returns address of peer with id.
func (t *Transport) PeerAddr(id uint64) (string, error) {
t.mu.Lock()
defer t.mu.Unlock()
p, ok := t.peers[id]
if !ok {
return "", ErrIsNotFound
}
return p.address(), nil
}
// HealthCheck checks health of particular peer.
func (t *Transport) HealthCheck(ctx context.Context, id uint64) error {
t.mu.Lock()
p, ok := t.peers[id]
t.mu.Unlock()
if !ok {
return ErrIsNotFound
}
ctx, cancel := t.withContext(ctx)
defer cancel()
return p.healthCheck(ctx)
}
// Active returns true if node was recently active and false otherwise.
func (t *Transport) Active(id uint64) bool {
t.mu.Lock()
defer t.mu.Unlock()
p, ok := t.peers[id]
if !ok {
return false
}
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}
func (t *Transport) longestActive() (*peer, error) {
var longest *peer
var longestTime time.Time
t.mu.Lock()
defer t.mu.Unlock()
for _, p := range t.peers {
becameActive := p.activeTime()
if becameActive.IsZero() {
continue
}
if longest == nil {
longest = p
continue
}
if becameActive.Before(longestTime) {
longest = p
longestTime = becameActive
}
}
if longest == nil {
return nil, errors.New("failed to find longest active peer")
}
return longest, nil
}
func (t *Transport) dial(addr string) (*grpc.ClientConn, error) {
grpcOptions := []grpc.DialOption{
grpc.WithBackoffMaxDelay(8 * time.Second),
}
if t.config.Credentials != nil {
grpcOptions = append(grpcOptions, grpc.WithTransportCredentials(t.config.Credentials))
} else {
grpcOptions = append(grpcOptions, grpc.WithInsecure())
}
if t.config.SendTimeout > 0 {
grpcOptions = append(grpcOptions, grpc.WithTimeout(t.config.SendTimeout))
}
cc, err := grpc.Dial(addr, grpcOptions...)
if err != nil {
return nil, err
}
return cc, nil
}
func (t *Transport) withContext(ctx context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-t.ctx.Done():
cancel()
}
}()
return ctx, cancel
}
func (t *Transport) resolvePeer(ctx context.Context, id uint64) (*peer, error) {
longestActive, err := t.longestActive()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, t.config.SendTimeout)
defer cancel()
addr, err := longestActive.resolveAddr(ctx, id)
if err != nil {
return nil, err
}
return newPeer(id, addr, t)
}
func (t *Transport) sendUnknownMessage(ctx context.Context, m raftpb.Message) error {
p, err := t.resolvePeer(ctx, m.To)
if err != nil {
return errors.Wrapf(err, "failed to resolve peer")
}
defer p.cancel()
if err := p.sendProcessMessage(ctx, m); err != nil {
return errors.Wrapf(err, "failed to send message")
}
return nil
}

View file

@ -14,6 +14,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/swarmkit/agent"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
@ -98,6 +99,9 @@ type Config struct {
// Availability allows a user to control the current scheduling status of a node
Availability api.NodeSpec_Availability
// PluginGetter provides access to docker's plugin inventory.
PluginGetter plugingetter.PluginGetter
}
// Node implements the primary node functionality for a member of a swarm
@ -683,6 +687,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
AutoLockManagers: n.config.AutoLockManagers,
UnlockKey: n.unlockKey,
Availability: n.config.Availability,
PluginGetter: n.config.PluginGetter,
})
if err != nil {
return err

View file

@ -91,9 +91,9 @@ func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) {
// https://github.com/LK4D4/sample
//
// The first link applies exponential distribution weight choice reservior
// The first link applies exponential distribution weight choice reservoir
// sampling. This may be relevant if we view the master selection as a
// distributed reservior sampling problem.
// distributed reservoir sampling problem.
// bias to zero-weighted remotes have same probability. otherwise, we
// always select first entry when all are zero.