Browse Source

Merge pull request #28907 from tonistiigi/cluster-refactor

Switch cluster locking strategy
Tõnis Tiigi 8 years ago
parent
commit
eefbf1ddd3

File diff suppressed because it is too large
+ 248 - 461
daemon/cluster/cluster.go


+ 296 - 0
daemon/cluster/noderunner.go

@@ -0,0 +1,296 @@
+package cluster
+
+import (
+	"path/filepath"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	types "github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/daemon/cluster/executor/container"
+	swarmapi "github.com/docker/swarmkit/api"
+	swarmnode "github.com/docker/swarmkit/node"
+	"github.com/pkg/errors"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
+type nodeRunner struct {
+	nodeState
+	mu             sync.RWMutex
+	done           chan struct{} // closed when swarmNode exits
+	ready          chan struct{} // closed when swarmNode becomes active
+	reconnectDelay time.Duration
+	config         nodeStartConfig
+
+	repeatedRun     bool
+	cancelReconnect func()
+	stopping        bool
+	cluster         *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
+}
+
+// nodeStartConfig holds configuration needed to start a new node. Exported
+// fields of this structure are saved to disk in json. Unexported fields
+// contain data that shouldn't be persisted between daemon reloads.
+type nodeStartConfig struct {
+	// LocalAddr is this machine's local IP or hostname, if specified.
+	LocalAddr string
+	// RemoteAddr is the address that was given to "swarm join". It is used
+	// to find LocalAddr if necessary.
+	RemoteAddr string
+	// ListenAddr is the address we bind to, including a port.
+	ListenAddr string
+	// AdvertiseAddr is the address other nodes should connect to,
+	// including a port.
+	AdvertiseAddr   string
+	joinAddr        string
+	forceNewCluster bool
+	joinToken       string
+	lockKey         []byte
+	autolock        bool
+}
+
+func (n *nodeRunner) Ready() chan error {
+	c := make(chan error, 1)
+	n.mu.RLock()
+	ready, done := n.ready, n.done
+	n.mu.RUnlock()
+	go func() {
+		select {
+		case <-ready:
+		case <-done:
+		}
+		select {
+		case <-ready:
+		default:
+			n.mu.RLock()
+			c <- n.err
+			n.mu.RUnlock()
+		}
+		close(c)
+	}()
+	return c
+}
+
+func (n *nodeRunner) Start(conf nodeStartConfig) error {
+	n.mu.Lock()
+	defer n.mu.Unlock()
+
+	n.reconnectDelay = initialReconnectDelay
+
+	return n.start(conf)
+}
+
+func (n *nodeRunner) start(conf nodeStartConfig) error {
+	var control string
+	if runtime.GOOS == "windows" {
+		control = `\\.\pipe\` + controlSocket
+	} else {
+		control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
+	}
+
+	node, err := swarmnode.New(&swarmnode.Config{
+		Hostname:           n.cluster.config.Name,
+		ForceNewCluster:    conf.forceNewCluster,
+		ListenControlAPI:   control,
+		ListenRemoteAPI:    conf.ListenAddr,
+		AdvertiseRemoteAPI: conf.AdvertiseAddr,
+		JoinAddr:           conf.joinAddr,
+		StateDir:           n.cluster.root,
+		JoinToken:          conf.joinToken,
+		Executor:           container.NewExecutor(n.cluster.config.Backend),
+		HeartbeatTick:      1,
+		ElectionTick:       3,
+		UnlockKey:          conf.lockKey,
+		AutoLockManagers:   conf.autolock,
+	})
+	if err != nil {
+		return err
+	}
+	if err := node.Start(context.Background()); err != nil {
+		return err
+	}
+
+	n.done = make(chan struct{})
+	n.ready = make(chan struct{})
+	n.swarmNode = node
+	n.config = conf
+	savePersistentState(n.cluster.root, conf)
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	go func() {
+		n.handleNodeExit(node)
+		cancel()
+	}()
+
+	go n.handleReadyEvent(ctx, node, n.ready)
+	go n.handleControlSocketChange(ctx, node)
+
+	return nil
+}
+
+func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
+	for conn := range node.ListenControlSocket(ctx) {
+		n.mu.Lock()
+		if n.grpcConn != conn {
+			if conn == nil {
+				n.controlClient = nil
+				n.logsClient = nil
+			} else {
+				n.controlClient = swarmapi.NewControlClient(conn)
+				n.logsClient = swarmapi.NewLogsClient(conn)
+			}
+		}
+		n.grpcConn = conn
+		n.mu.Unlock()
+		n.cluster.configEvent <- struct{}{}
+	}
+}
+
+func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
+	select {
+	case <-node.Ready():
+		n.mu.Lock()
+		n.err = nil
+		n.mu.Unlock()
+		close(ready)
+	case <-ctx.Done():
+	}
+	n.cluster.configEvent <- struct{}{}
+}
+
+func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
+	err := detectLockedError(node.Err(context.Background()))
+	if err != nil {
+		logrus.Errorf("cluster exited with error: %v", err)
+	}
+	n.mu.Lock()
+	n.swarmNode = nil
+	n.err = err
+	close(n.done)
+	select {
+	case <-n.ready:
+		n.enableReconnectWatcher()
+	default:
+		if n.repeatedRun {
+			n.enableReconnectWatcher()
+		}
+	}
+	n.repeatedRun = true
+	n.mu.Unlock()
+}
+
+// Stop stops the current swarm node if it is running.
+func (n *nodeRunner) Stop() error {
+	n.mu.Lock()
+	if n.cancelReconnect != nil { // between restarts
+		n.cancelReconnect()
+		n.cancelReconnect = nil
+	}
+	if n.swarmNode == nil {
+		n.mu.Unlock()
+		return nil
+	}
+	n.stopping = true
+	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+	defer cancel()
+	if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
+		n.mu.Unlock()
+		return err
+	}
+	n.mu.Unlock()
+	<-n.done
+	return nil
+}
+
+func (n *nodeRunner) State() nodeState {
+	if n == nil {
+		return nodeState{status: types.LocalNodeStateInactive}
+	}
+	n.mu.RLock()
+	defer n.mu.RUnlock()
+
+	ns := n.nodeState
+
+	if ns.err != nil || n.cancelReconnect != nil {
+		if errors.Cause(ns.err) == ErrSwarmLocked {
+			ns.status = types.LocalNodeStateLocked
+		} else {
+			ns.status = types.LocalNodeStateError
+		}
+	} else {
+		select {
+		case <-n.ready:
+			ns.status = types.LocalNodeStateActive
+		default:
+			ns.status = types.LocalNodeStatePending
+		}
+	}
+
+	return ns
+}
+
+func (n *nodeRunner) enableReconnectWatcher() {
+	if n.stopping {
+		return
+	}
+	n.reconnectDelay *= 2
+	if n.reconnectDelay > maxReconnectDelay {
+		n.reconnectDelay = maxReconnectDelay
+	}
+	logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
+	delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
+	n.cancelReconnect = cancel
+
+	config := n.config
+	go func() {
+		<-delayCtx.Done()
+		if delayCtx.Err() != context.DeadlineExceeded {
+			return
+		}
+		n.mu.Lock()
+		defer n.mu.Unlock()
+		if n.stopping {
+			return
+		}
+		config.RemoteAddr = n.cluster.getRemoteAddress()
+		config.joinAddr = config.RemoteAddr
+		if err := n.start(config); err != nil {
+			n.err = err
+		}
+	}()
+}
+
+// nodeState represents information about the current state of the cluster and
+// provides access to the grpc clients.
+type nodeState struct {
+	swarmNode       *swarmnode.Node
+	grpcConn        *grpc.ClientConn
+	controlClient   swarmapi.ControlClient
+	logsClient      swarmapi.LogsClient
+	status          types.LocalNodeState
+	actualLocalAddr string
+	err             error
+}
+
+// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
+func (ns nodeState) IsActiveManager() bool {
+	return ns.controlClient != nil
+}
+
+// IsManager returns true if node is a manager.
+func (ns nodeState) IsManager() bool {
+	return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
+}
+
+// NodeID returns node's ID or empty string if node is inactive.
+func (ns nodeState) NodeID() string {
+	if ns.swarmNode != nil {
+		return ns.swarmNode.NodeID()
+	}
+	return ""
+}

+ 30 - 25
daemon/cluster/secrets.go

@@ -9,17 +9,18 @@ import (
 
 
 // GetSecret returns a secret from a managed swarm cluster
 // GetSecret returns a secret from a managed swarm cluster
 func (c *Cluster) GetSecret(id string) (types.Secret, error) {
 func (c *Cluster) GetSecret(id string) (types.Secret, error) {
-	c.RLock()
-	defer c.RUnlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 
-	if !c.isActiveManager() {
-		return types.Secret{}, c.errNoManager()
+	state := c.currentNodeState()
+	if !state.IsActiveManager() {
+		return types.Secret{}, c.errNoManager(state)
 	}
 	}
 
 
 	ctx, cancel := c.getRequestContext()
 	ctx, cancel := c.getRequestContext()
 	defer cancel()
 	defer cancel()
 
 
-	r, err := c.node.client.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id})
+	r, err := state.controlClient.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id})
 	if err != nil {
 	if err != nil {
 		return types.Secret{}, err
 		return types.Secret{}, err
 	}
 	}
@@ -29,11 +30,12 @@ func (c *Cluster) GetSecret(id string) (types.Secret, error) {
 
 
 // GetSecrets returns all secrets of a managed swarm cluster.
 // GetSecrets returns all secrets of a managed swarm cluster.
 func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret, error) {
 func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret, error) {
-	c.RLock()
-	defer c.RUnlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 
-	if !c.isActiveManager() {
-		return nil, c.errNoManager()
+	state := c.currentNodeState()
+	if !state.IsActiveManager() {
+		return nil, c.errNoManager(state)
 	}
 	}
 
 
 	filters, err := newListSecretsFilters(options.Filters)
 	filters, err := newListSecretsFilters(options.Filters)
@@ -43,7 +45,7 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret
 	ctx, cancel := c.getRequestContext()
 	ctx, cancel := c.getRequestContext()
 	defer cancel()
 	defer cancel()
 
 
-	r, err := c.node.client.ListSecrets(ctx,
+	r, err := state.controlClient.ListSecrets(ctx,
 		&swarmapi.ListSecretsRequest{Filters: filters})
 		&swarmapi.ListSecretsRequest{Filters: filters})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -60,11 +62,12 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret
 
 
 // CreateSecret creates a new secret in a managed swarm cluster.
 // CreateSecret creates a new secret in a managed swarm cluster.
 func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
 func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
-	c.RLock()
-	defer c.RUnlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 
-	if !c.isActiveManager() {
-		return "", c.errNoManager()
+	state := c.currentNodeState()
+	if !state.IsActiveManager() {
+		return "", c.errNoManager(state)
 	}
 	}
 
 
 	ctx, cancel := c.getRequestContext()
 	ctx, cancel := c.getRequestContext()
@@ -72,7 +75,7 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
 
 
 	secretSpec := convert.SecretSpecToGRPC(s)
 	secretSpec := convert.SecretSpecToGRPC(s)
 
 
-	r, err := c.node.client.CreateSecret(ctx,
+	r, err := state.controlClient.CreateSecret(ctx,
 		&swarmapi.CreateSecretRequest{Spec: &secretSpec})
 		&swarmapi.CreateSecretRequest{Spec: &secretSpec})
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
@@ -83,11 +86,12 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) {
 
 
 // RemoveSecret removes a secret from a managed swarm cluster.
 // RemoveSecret removes a secret from a managed swarm cluster.
 func (c *Cluster) RemoveSecret(id string) error {
 func (c *Cluster) RemoveSecret(id string) error {
-	c.RLock()
-	defer c.RUnlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 
-	if !c.isActiveManager() {
-		return c.errNoManager()
+	state := c.currentNodeState()
+	if !state.IsActiveManager() {
+		return c.errNoManager(state)
 	}
 	}
 
 
 	ctx, cancel := c.getRequestContext()
 	ctx, cancel := c.getRequestContext()
@@ -97,7 +101,7 @@ func (c *Cluster) RemoveSecret(id string) error {
 		SecretID: id,
 		SecretID: id,
 	}
 	}
 
 
-	if _, err := c.node.client.RemoveSecret(ctx, req); err != nil {
+	if _, err := state.controlClient.RemoveSecret(ctx, req); err != nil {
 		return err
 		return err
 	}
 	}
 	return nil
 	return nil
@@ -106,11 +110,12 @@ func (c *Cluster) RemoveSecret(id string) error {
 // UpdateSecret updates a secret in a managed swarm cluster.
 // UpdateSecret updates a secret in a managed swarm cluster.
 // Note: this is not exposed to the CLI but is available from the API only
 // Note: this is not exposed to the CLI but is available from the API only
 func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error {
 func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error {
-	c.RLock()
-	defer c.RUnlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 
-	if !c.isActiveManager() {
-		return c.errNoManager()
+	state := c.currentNodeState()
+	if !state.IsActiveManager() {
+		return c.errNoManager(state)
 	}
 	}
 
 
 	ctx, cancel := c.getRequestContext()
 	ctx, cancel := c.getRequestContext()
@@ -118,7 +123,7 @@ func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec)
 
 
 	secretSpec := convert.SecretSpecToGRPC(spec)
 	secretSpec := convert.SecretSpecToGRPC(spec)
 
 
-	if _, err := c.client.UpdateSecret(ctx,
+	if _, err := state.controlClient.UpdateSecret(ctx,
 		&swarmapi.UpdateSecretRequest{
 		&swarmapi.UpdateSecretRequest{
 			SecretID: id,
 			SecretID: id,
 			SecretVersion: &swarmapi.Version{
 			SecretVersion: &swarmapi.Version{

+ 56 - 0
daemon/cluster/utils.go

@@ -0,0 +1,56 @@
+package cluster
+
+import (
+	"encoding/json"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+
+	"github.com/docker/docker/pkg/ioutils"
+)
+
+func loadPersistentState(root string) (*nodeStartConfig, error) {
+	dt, err := ioutil.ReadFile(filepath.Join(root, stateFile))
+	if err != nil {
+		return nil, err
+	}
+	// missing certificate means no actual state to restore from
+	if _, err := os.Stat(filepath.Join(root, "certificates/swarm-node.crt")); err != nil {
+		if os.IsNotExist(err) {
+			clearPersistentState(root)
+		}
+		return nil, err
+	}
+	var st nodeStartConfig
+	if err := json.Unmarshal(dt, &st); err != nil {
+		return nil, err
+	}
+	return &st, nil
+}
+
+func savePersistentState(root string, config nodeStartConfig) error {
+	dt, err := json.Marshal(config)
+	if err != nil {
+		return err
+	}
+	return ioutils.AtomicWriteFile(filepath.Join(root, stateFile), dt, 0600)
+}
+
+func clearPersistentState(root string) error {
+	// todo: backup this data instead of removing?
+	if err := os.RemoveAll(root); err != nil {
+		return err
+	}
+	if err := os.MkdirAll(root, 0700); err != nil {
+		return err
+	}
+	return nil
+}
+
+func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
+	return reachable-2 <= unreachable
+}
+
+func isLastManager(reachable, unreachable int) bool {
+	return reachable == 1 && unreachable == 0
+}

+ 3 - 1
integration-cli/docker_api_swarm_test.go

@@ -75,6 +75,8 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) {
 	d1 := s.AddDaemon(c, false, false)
 	d1 := s.AddDaemon(c, false, false)
 	c.Assert(d1.Init(swarm.InitRequest{}), checker.IsNil)
 	c.Assert(d1.Init(swarm.InitRequest{}), checker.IsNil)
 
 
+	// todo: error message differs depending if some components of token are valid
+
 	d2 := s.AddDaemon(c, false, false)
 	d2 := s.AddDaemon(c, false, false)
 	err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}})
 	err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}})
 	c.Assert(err, checker.NotNil)
 	c.Assert(err, checker.NotNil)
@@ -85,7 +87,7 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) {
 
 
 	err = d2.Join(swarm.JoinRequest{JoinToken: "foobaz", RemoteAddrs: []string{d1.listenAddr}})
 	err = d2.Join(swarm.JoinRequest{JoinToken: "foobaz", RemoteAddrs: []string{d1.listenAddr}})
 	c.Assert(err, checker.NotNil)
 	c.Assert(err, checker.NotNil)
-	c.Assert(err.Error(), checker.Contains, "join token is necessary")
+	c.Assert(err.Error(), checker.Contains, "invalid join token")
 	info, err = d2.info()
 	info, err = d2.info()
 	c.Assert(err, checker.IsNil)
 	c.Assert(err, checker.IsNil)
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)

Some files were not shown because too many files changed in this diff