Browse Source

Return membership status on join without timeout

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
Tonis Tiigi 9 years ago
parent
commit
1973cee0cd
3 changed files with 84 additions and 56 deletions
  1. 43 26
      daemon/cluster/cluster.go
  2. 16 7
      integration-cli/daemon_swarm.go
  3. 25 23
      integration-cli/docker_api_swarm_test.go

+ 43 - 26
daemon/cluster/cluster.go

@@ -28,25 +28,28 @@ import (
 
 const swarmDirName = "swarm"
 const controlSocket = "control.sock"
-const swarmConnectTimeout = 10 * time.Second
+const swarmConnectTimeout = 20 * time.Second
 const stateFile = "docker-state.json"
 
 const (
 	initialReconnectDelay = 100 * time.Millisecond
-	maxReconnectDelay     = 10 * time.Second
+	maxReconnectDelay     = 30 * time.Second
 )
 
 // ErrNoManager is returned then a manager-only function is called on non-manager
-var ErrNoManager = fmt.Errorf("this node is not participating as a Swarm manager")
+var ErrNoManager = fmt.Errorf("This node is not participating as a Swarm manager")
 
 // ErrNoSwarm is returned on leaving a cluster that was never initialized
-var ErrNoSwarm = fmt.Errorf("this node is not part of Swarm")
+var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm")
 
 // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
-var ErrSwarmExists = fmt.Errorf("this node is already part of a Swarm")
+var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
+
+// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
+var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
 
 // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
-var ErrSwarmJoinTimeoutReached = fmt.Errorf("timeout reached before node was joined")
+var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.")
 
 type state struct {
 	ListenAddr string
@@ -249,13 +252,14 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre
 // Init initializes new cluster from user provided request.
 func (c *Cluster) Init(req types.InitRequest) (string, error) {
 	c.Lock()
-	if c.node != nil {
+	if node := c.node; node != nil {
 		c.Unlock()
 		if !req.ForceNewCluster {
-			return "", ErrSwarmExists
+			return "", errSwarmExists(node)
 		}
 		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 		defer cancel()
+		c.cancelReconnect()
 		if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
 			return "", err
 		}
@@ -297,9 +301,9 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
 // Join makes current Cluster part of an existing swarm cluster.
 func (c *Cluster) Join(req types.JoinRequest) error {
 	c.Lock()
-	if c.node != nil {
+	if node := c.node; node != nil {
 		c.Unlock()
-		return ErrSwarmExists
+		return errSwarmExists(node)
 	}
 	// todo: check current state existing
 	if len(req.RemoteAddrs) == 0 {
@@ -312,23 +316,29 @@ func (c *Cluster) Join(req types.JoinRequest) error {
 	}
 	c.Unlock()
 
-	select {
-	case <-time.After(swarmConnectTimeout):
-		go c.reconnectOnFailure(ctx)
-		if nodeid := n.NodeID(); nodeid != "" {
-			return fmt.Errorf("Timeout reached before node was joined. Your cluster settings may be preventing this node from automatically joining. To accept this node into cluster run `docker node accept %v` in an existing cluster manager", nodeid)
-		}
-		return ErrSwarmJoinTimeoutReached
-	case <-n.Ready():
-		go c.reconnectOnFailure(ctx)
-		return nil
-	case <-ctx.Done():
-		c.RLock()
-		defer c.RUnlock()
-		if c.err != nil {
-			return c.err
+	certificateRequested := n.CertificateRequested()
+	for {
+		select {
+		case <-certificateRequested:
+			if n.NodeMembership() == swarmapi.NodeMembershipPending {
+				return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID())
+			}
+			certificateRequested = nil
+		case <-time.After(swarmConnectTimeout):
+			// attempt to connect will continue in background, also reconnecting
+			go c.reconnectOnFailure(ctx)
+			return ErrSwarmJoinTimeoutReached
+		case <-n.Ready():
+			go c.reconnectOnFailure(ctx)
+			return nil
+		case <-ctx.Done():
+			c.RLock()
+			defer c.RUnlock()
+			if c.err != nil {
+				return c.err
+			}
+			return ctx.Err()
 		}
-		return ctx.Err()
 	}
 }
 
@@ -1004,6 +1014,13 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int,
 	return
 }
 
+func errSwarmExists(node *swarmagent.Node) error {
+	if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
+		return ErrPendingSwarmExists
+	}
+	return ErrSwarmExists
+}
+
 func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	for conn := range node.ListenControlSocket(ctx) {

+ 16 - 7
integration-cli/daemon_swarm.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"net/http"
 	"strings"
+	"time"
 
 	"github.com/docker/docker/pkg/integration/checker"
 	"github.com/docker/engine-api/types"
@@ -167,14 +168,22 @@ func (d *SwarmDaemon) getNode(c *check.C, id string) *swarm.Node {
 	return &node
 }
 
-func (d *SwarmDaemon) updateNode(c *check.C, node *swarm.Node, f ...nodeConstructor) {
-	for _, fn := range f {
-		fn(node)
+func (d *SwarmDaemon) updateNode(c *check.C, id string, f ...nodeConstructor) {
+	for i := 0; ; i++ {
+		node := d.getNode(c, id)
+		for _, fn := range f {
+			fn(node)
+		}
+		url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index)
+		status, out, err := d.SockRequest("POST", url, node.Spec)
+		if i < 10 && strings.Contains(string(out), "update out of sequence") {
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		c.Assert(err, checker.IsNil)
+		c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
+		return
 	}
-	url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index)
-	status, out, err := d.SockRequest("POST", url, node.Spec)
-	c.Assert(err, checker.IsNil)
-	c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out)))
 }
 
 func (d *SwarmDaemon) listNodes(c *check.C) []swarm.Node {

+ 25 - 23
integration-cli/docker_api_swarm_test.go

@@ -82,7 +82,7 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin
 	err := d2.Join(d1.listenAddr, "", "", false)
 	c.Assert(err, checker.NotNil)
 	if secret == "" {
-		c.Assert(err.Error(), checker.Contains, "Timeout reached")
+		c.Assert(err.Error(), checker.Contains, "needs to be accepted")
 		info, err := d2.info()
 		c.Assert(err, checker.IsNil)
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
@@ -97,23 +97,25 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin
 		c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)
 	}
 	d3 := s.AddDaemon(c, false, false)
-	go func() {
-		for i := 0; ; i++ {
-			info, err := d3.info()
-			c.Assert(err, checker.IsNil)
-			if info.NodeID != "" {
-				d1.updateNode(c, d1.getNode(c, info.NodeID), func(n *swarm.Node) {
-					n.Spec.Membership = swarm.NodeMembershipAccepted
-				})
-				return
-			}
-			if i >= 10 {
-				c.Errorf("could not find nodeID")
-			}
-			time.Sleep(300 * time.Millisecond)
+	c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.NotNil)
+	info, err := d3.info()
+	c.Assert(err, checker.IsNil)
+	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending)
+	c.Assert(len(info.NodeID), checker.GreaterThan, 5)
+	d1.updateNode(c, info.NodeID, func(n *swarm.Node) {
+		n.Spec.Membership = swarm.NodeMembershipAccepted
+	})
+	for i := 0; ; i++ {
+		info, err := d3.info()
+		c.Assert(err, checker.IsNil)
+		if info.LocalNodeState == swarm.LocalNodeStateActive {
+			break
 		}
-	}()
-	c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.IsNil)
+		if i > 10 {
+			c.Errorf("node did not become active")
+		}
+		time.Sleep(200 * time.Millisecond)
+	}
 }
 
 func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) {
@@ -236,7 +238,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) {
 	c.Assert(info.ControlAvailable, checker.Equals, false)
 	c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive)
 
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Role = swarm.NodeRoleManager
 	})
 
@@ -255,7 +257,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) {
 		time.Sleep(100 * time.Millisecond)
 	}
 
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Role = swarm.NodeRoleWorker
 	})
 
@@ -466,7 +468,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeUpdate(c *check.C) {
 
 	nodes := d.listNodes(c)
 
-	d.updateNode(c, d.getNode(c, nodes[0].ID), func(n *swarm.Node) {
+	d.updateNode(c, nodes[0].ID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 	})
 
@@ -489,14 +491,14 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) {
 	waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances)
 
 	// drain d2, all containers should move to d1
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityDrain
 	})
 	waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances)
 	waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 0)
 
 	// set d2 back to active
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityActive
 	})
 
@@ -516,7 +518,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) {
 	d2ContainerCount := len(d2.activeContainers())
 
 	// set d2 to paused, scale service up, only d1 gets new tasks
-	d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) {
+	d1.updateNode(c, d2.NodeID, func(n *swarm.Node) {
 		n.Spec.Availability = swarm.NodeAvailabilityPause
 	})