Browse Source

Return membership status on join without timeout

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 1973cee0cd86da3c7b7002a3fed7af2557df538c)
Tonis Tiigi 9 years ago
parent
commit
0cac3c4c23
3 changed files with 84 additions and 56 deletions
  1. 43 26
      daemon/cluster/cluster.go
  2. 16 7
      integration-cli/daemon_swarm.go
  3. 25 23
      integration-cli/docker_api_swarm_test.go

+ 43 - 26
daemon/cluster/cluster.go

@@ -28,25 +28,28 @@ import (
 
 
 const swarmDirName = "swarm"
 const swarmDirName = "swarm"
 const controlSocket = "control.sock"
 const controlSocket = "control.sock"
-const swarmConnectTimeout = 10 * time.Second
+const swarmConnectTimeout = 20 * time.Second
 const stateFile = "docker-state.json"
 const stateFile = "docker-state.json"
 
 
 const (
 const (
 	initialReconnectDelay = 100 * time.Millisecond
 	initialReconnectDelay = 100 * time.Millisecond
-	maxReconnectDelay     = 10 * time.Second
+	maxReconnectDelay     = 30 * time.Second
 )
 )
 
 
 // ErrNoManager is returned then a manager-only function is called on non-manager
 // ErrNoManager is returned then a manager-only function is called on non-manager
-var ErrNoManager = fmt.Errorf("this node is not participating as a Swarm manager")
+var ErrNoManager = fmt.Errorf("This node is not participating as a Swarm manager")
 
 
 // ErrNoSwarm is returned on leaving a cluster that was never initialized
 // ErrNoSwarm is returned on leaving a cluster that was never initialized
-var ErrNoSwarm = fmt.Errorf("this node is not part of Swarm")
+var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
 
 
 // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
 // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
-var ErrSwarmExists = fmt.Errorf("this node is already part of a Swarm")
+var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
+
+// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
+var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
 
 
 // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
 // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
-var ErrSwarmJoinTimeoutReached = fmt.Errorf("timeout reached before node was joined")
+var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
 
 
 type state struct {
 type state struct {
 	ListenAddr string
 	ListenAddr string
@@ -249,13 +252,14 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre
 // Init initializes new cluster from user provided request.
 // Init initializes new cluster from user provided request.
 func (c *Cluster) Init(req types.InitRequest) (string, error) {
 func (c *Cluster) Init(req types.InitRequest) (string, error) {
 	c.Lock()
 	c.Lock()
-	if c.node != nil {
+	if node := c.node; node != nil {
 		c.Unlock()
 		c.Unlock()
 		if !req.ForceNewCluster {
 		if !req.ForceNewCluster {
-			return "", ErrSwarmExists
+			return "", errSwarmExists(node)
 		}
 		}
 		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 		defer cancel()
 		defer cancel()
+		c.cancelReconnect()
 		if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
 		if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
 			return "", err
 			return "", err
 		}
 		}
@@ -297,9 +301,9 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
 // Join makes current Cluster part of an existing swarm cluster.
 // Join makes current Cluster part of an existing swarm cluster.
 func (c *Cluster) Join(req types.JoinRequest) error {
 func (c *Cluster) Join(req types.JoinRequest) error {
 	c.Lock()
 	c.Lock()
-	if c.node != nil {
+	if node := c.node; node != nil {
 		c.Unlock()
 		c.Unlock()
-		return ErrSwarmExists
+		return errSwarmExists(node)
 	}
 	}
 	// todo: check current state existing
 	// todo: check current state existing
 	if len(req.RemoteAddrs) == 0 {
 	if len(req.RemoteAddrs) == 0 {
@@ -312,23 +316,29 @@ func (c *Cluster) Join(req types.JoinRequest) error {
 	}
 	}
 	c.Unlock()
 	c.Unlock()
 
 
-	select {
-	case <-time.After(swarmConnectTimeout):
-		go c.reconnectOnFailure(ctx)
-		if nodeid := n.NodeID(); nodeid != "" {
-			return fmt.Errorf("Timeout reached before node was joined. Your cluster settings may be preventing this node from automatically joining. To accept this node into cluster run `docker node accept %v` in an existing cluster manager", nodeid)
-		}
-		return ErrSwarmJoinTimeoutReached
-	case <-n.Ready():
-		go c.reconnectOnFailure(ctx)
-		return nil
-	case <-ctx.Done():
-		c.RLock()
-		defer c.RUnlock()
-		if c.err != nil {
-			return c.err
+	certificateRequested := n.CertificateRequested()
+	for {
+		select {
+		case <-certificateRequested:
+			if n.NodeMembership() == swarmapi.NodeMembershipPending {
+				return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
+			}
+			certificateRequested = nil
+		case <-time.After(swarmConnectTimeout):
+			// attempt to connect will continue in background, also reconnecting
+			go c.reconnectOnFailure(ctx)
+			return ErrSwarmJoinTimeoutReached
+		case <-n.Ready():
+			go c.reconnectOnFailure(ctx)
+			return nil
+		case <-ctx.Done():
+			c.RLock()
+			defer c.RUnlock()
+			if c.err != nil {
+				return c.err
+			}
+			return ctx.Err()
 		}
 		}
-		return ctx.Err()
 	}
 	}
 }
 }
 
 
@@ -1004,6 +1014,13 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int,
 	return
 	return
 }
 }
 
 
+func errSwarmExists(node *swarmagent.Node) error {
+	if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
+		return ErrPendingSwarmExists
+	}
+	return ErrSwarmExists
+}
+
 func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
 func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	for conn := range node.ListenControlSocket(ctx) {
 	for conn := range node.ListenControlSocket(ctx) {

+ 16 - 7
integration-cli/daemon_swarm.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"fmt"
 	"net/http"
 	"net/http"
 	"strings"
 	"strings"
+	"time"
 
 
 	"github.com/docker/docker/pkg/integration/checker"
 	"github.com/docker/docker/pkg/integration/checker"
 	"github.com/docker/engine-api/types"
 	"github.com/docker/engine-api/types"
@@ -167,14 +168,22 @@ func (d *SwarmDaemon) getNode(c *check.C, id string) *swarm.Node {
 	return &node
 	return &node
 }
 }
 
 
-func (d *SwarmDaemon) updateNode(c *check.C, node *swarm.Node, f ...nodeConstructor) {
-	for _, fn := range f {
-		fn(node)
+func (d *SwarmDaemon) updateNode(c *check.C, id string, f ...nodeConstructor) {
+	for i := 0; ; i++ {
+		node := d.getNode(c, id)
+		for _, fn := range f {
+			fn(node)
+		}
+		url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index)
+		status, out, err := d.SockRequest("POST", url, node.Spec)
+		if i < 10 && strings.Contains(string(out), "update out of sequence") {
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		c.Assert(err, checker.IsNil)
+		c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+		return
 	}
 	}
-	url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index)
-	status, out, err := d.SockRequest("POST", url, node.Spec)
-	c.Assert(err, checker.IsNil)
-	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
 }
 }
 
 
 func (d *SwarmDaemon) listNodes(c *check.C) []swarm.Node {
 func (d *SwarmDaemon) listNodes(c *check.C) []swarm.Node {

+ 25 - 23
integration-cli/docker_api_swarm_test.go

@@ -82,7 +82,7 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin
 	err := d2.Join(d1.listenAddr, "", "", false)
 	err := d2.Join(d1.listenAddr, "", "", false)
 	c.Assert(err, checker.NotNil)
 	c.Assert(err, checker.NotNil)
 	if secret == "" {
 	if secret == "" {
-		c.Assert(err.Error(), checker.Contains, "Timeout reached")
+		c.Assert(err.Error(), checker.Contains, "needs to be accepted")
 		info, err := d2.info()
 		info, err := d2.info()
 		c.Assert(err, checker.IsNil)
 		c.Assert(err, checker.IsNil)
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
@@ -97,23 +97,25 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
 	}
 	}
 	d3 := s.AddDaemon(c, false, false)
 	d3 := s.AddDaemon(c, false, false)
-	go func() {
-		for i := 0; ; i++ {
-			info, err := d3.info()
-			c.Assert(err, checker.IsNil)
-			if info.NodeID != "" {
-				d1.updateNode(c, d1.getNode(c, info.NodeID), func(n *swarm.Node) {
-					n.Spec.Membership = swarm.NodeMembershipAccepted
-				})
-				return
-			}
-			if i >= 10 {
-				c.Errorf("could not find nodeID")
-			}
-			time.Sleep(300 * time.Millisecond)
+	c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.NotNil)
+	info, err := d3.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
+	c.Assert(len(info.NodeID), checker.GreaterThan, 5)
+	d1.updateNode(c, info.NodeID, func(n *swarm.Node) {
+		n.Spec.Membership = swarm.NodeMembershipAccepted
+	})
+	for i := 0; ; i++ {
+		info, err := d3.info()
+		c.Assert(err, checker.IsNil)
+		if info.LocalNodeState == swarm.LocalNodeStateActive {
+			break
 		}
 		}
-	}()
-	c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.IsNil)
+		if i > 10 {
+			c.Errorf("node did not become active")
+		}
+		time.Sleep(200 * time.Millisecond)
+	}
 }
 }
 
 
 func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) {
 func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) {
@@ -236,7 +238,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) {
 	c.Assert(info.ControlAvailable, checker.Equals, false)
 	c.Assert(info.ControlAvailable, checker.Equals, false)
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
 
 
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Role = swarm.NodeRoleManager
 		n.Spec.Role = swarm.NodeRoleManager
 	})
 	})
 
 
@@ -255,7 +257,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) {
 		time.Sleep(100 * time.Millisecond)
 		time.Sleep(100 * time.Millisecond)
 	}
 	}
 
 
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Role = swarm.NodeRoleWorker
 		n.Spec.Role = swarm.NodeRoleWorker
 	})
 	})
 
 
@@ -466,7 +468,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeUpdate(c *check.C) {
 
 
 	nodes := d.listNodes(c)
 	nodes := d.listNodes(c)
 
 
-	d.updateNode(c, d.getNode(c, nodes[0].ID), func(n *swarm.Node) {
+	d.updateNode(c, nodes[0].ID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 	})
 	})
 
 
@@ -489,14 +491,14 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) {
 	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances)
 	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances)
 
 
 	// drain d2, all containers should move to d1
 	// drain d2, all containers should move to d1
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityDrain
 		n.Spec.Availability = swarm.NodeAvailabilityDrain
 	})
 	})
 	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances)
 	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances)
 	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 0)
 	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 0)
 
 
 	// set d2 back to active
 	// set d2 back to active
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityActive
 		n.Spec.Availability = swarm.NodeAvailabilityActive
 	})
 	})
 
 
@@ -516,7 +518,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) {
 	d2ContainerCount := len(d2.activeContainers())
 	d2ContainerCount := len(d2.activeContainers())
 
 
 	// set d2 to paused, scale service up, only d1 gets new tasks
 	// set d2 to paused, scale service up, only d1 gets new tasks
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 	})
 	})