From 22b34d64496c9b6ebae5e2b4a98ecd9a172cc557 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Sun, 19 Jun 2016 19:00:45 -0700 Subject: [PATCH 1/3] Switch node management tests to api types Signed-off-by: Tonis Tiigi --- integration-cli/check_test.go | 14 +++-- integration-cli/daemon_swarm.go | 41 +++++--------- integration-cli/docker_api_swarm_test.go | 70 +++++++++++++++++------- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/integration-cli/check_test.go b/integration-cli/check_test.go index c9f57b7450..9aae2ce595 100644 --- a/integration-cli/check_test.go +++ b/integration-cli/check_test.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/cliconfig" "github.com/docker/docker/pkg/reexec" + "github.com/docker/engine-api/types/swarm" "github.com/go-check/check" ) @@ -213,12 +214,15 @@ func (s *DockerSwarmSuite) AddDaemon(c *check.C, joinSwarm, manager bool) *Swarm if joinSwarm == true { if len(s.daemons) > 0 { - c.Assert(d.Join(s.daemons[0].listenAddr, "", "", manager), check.IsNil) + c.Assert(d.Join(swarm.JoinRequest{ + RemoteAddrs: []string{s.daemons[0].listenAddr}, + Manager: manager}), check.IsNil) } else { - aa := make(map[string]bool) - aa["worker"] = true - aa["manager"] = true - c.Assert(d.Init(aa, ""), check.IsNil) + c.Assert(d.Init(swarm.InitRequest{ + Spec: swarm.Spec{ + AcceptancePolicy: autoAcceptPolicy, + }, + }), check.IsNil) } } diff --git a/integration-cli/daemon_swarm.go b/integration-cli/daemon_swarm.go index 6c18daee19..47ca4422e8 100644 --- a/integration-cli/daemon_swarm.go +++ b/integration-cli/daemon_swarm.go @@ -21,22 +21,18 @@ type SwarmDaemon struct { listenAddr string } +// default policy in tests is allow-all +var autoAcceptPolicy = swarm.AcceptancePolicy{ + Policies: []swarm.Policy{ + {Role: swarm.NodeRoleWorker, Autoaccept: true}, + {Role: swarm.NodeRoleManager, Autoaccept: true}, + }, +} + // Init initializes a new swarm cluster. -func (d *SwarmDaemon) Init(autoAccept map[string]bool, secret string) error { - req := swarm.InitRequest{ - ListenAddr: d.listenAddr, - } - for _, role := range []swarm.NodeRole{swarm.NodeRoleManager, swarm.NodeRoleWorker} { - policy := swarm.Policy{ - Role: role, - Autoaccept: autoAccept[strings.ToLower(string(role))], - } - - if secret != "" { - policy.Secret = &secret - } - - req.Spec.AcceptancePolicy.Policies = append(req.Spec.AcceptancePolicy.Policies, policy) +func (d *SwarmDaemon) Init(req swarm.InitRequest) error { + if req.ListenAddr == "" { + req.ListenAddr = d.listenAddr } status, out, err := d.SockRequest("POST", "/swarm/init", req) if status != http.StatusOK { @@ -53,17 +49,10 @@ func (d *SwarmDaemon) Init(autoAccept map[string]bool, secret string) error { return nil } -// Join joins a current daemon with existing cluster. -func (d *SwarmDaemon) Join(remoteAddr, secret, cahash string, manager bool) error { - req := swarm.JoinRequest{ - ListenAddr: d.listenAddr, - RemoteAddrs: []string{remoteAddr}, - Manager: manager, - CACertHash: cahash, - } - - if secret != "" { - req.Secret = secret +// Join joins a daemon to an existing cluster. +func (d *SwarmDaemon) Join(req swarm.JoinRequest) error { + if req.ListenAddr == "" { + req.ListenAddr = d.listenAddr } status, out, err := d.SockRequest("POST", "/swarm/join", req) if status != http.StatusOK { diff --git a/integration-cli/docker_api_swarm_test.go b/integration-cli/docker_api_swarm_test.go index bb21c7e44f..107930bfb3 100644 --- a/integration-cli/docker_api_swarm_test.go +++ b/integration-cli/docker_api_swarm_test.go @@ -38,7 +38,7 @@ func (s *DockerSwarmSuite) TestApiSwarmInit(c *check.C) { c.Assert(info.ControlAvailable, checker.Equals, false) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) - c.Assert(d2.Join(d1.listenAddr, "", "", false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) info, err = d2.info() c.Assert(err, checker.IsNil) @@ -76,10 +76,19 @@ func (s *DockerSwarmSuite) TestApiSwarmManualAcceptanceSecret(c *check.C) { func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret string) { d1 := s.AddDaemon(c, false, false) - c.Assert(d1.Init(map[string]bool{}, secret), checker.IsNil) + c.Assert(d1.Init(swarm.InitRequest{ + Spec: swarm.Spec{ + AcceptancePolicy: swarm.AcceptancePolicy{ + Policies: []swarm.Policy{ + {Role: swarm.NodeRoleWorker, Secret: &secret}, + {Role: swarm.NodeRoleManager, Secret: &secret}, + }, + }, + }, + }), checker.IsNil) d2 := s.AddDaemon(c, false, false) - err := d2.Join(d1.listenAddr, "", "", false) + err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) if secret == "" { c.Assert(err.Error(), checker.Contains, "needs to be accepted") @@ -97,7 +106,7 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) } d3 := s.AddDaemon(c, false, false) - c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.NotNil) + c.Assert(d3.Join(swarm.JoinRequest{Secret: secret, RemoteAddrs: []string{d1.listenAddr}}), checker.NotNil) info, err := d3.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending) @@ -120,26 +129,34 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { d1 := s.AddDaemon(c, false, false) - aa := make(map[string]bool) - aa["worker"] = true - c.Assert(d1.Init(aa, "foobar"), checker.IsNil) + secret := "foobar" + c.Assert(d1.Init(swarm.InitRequest{ + Spec: swarm.Spec{ + AcceptancePolicy: swarm.AcceptancePolicy{ + Policies: []swarm.Policy{ + {Role: swarm.NodeRoleWorker, Autoaccept: true, Secret: &secret}, + {Role: swarm.NodeRoleManager, Secret: &secret}, + }, + }, + }, + }), checker.IsNil) d2 := s.AddDaemon(c, false, false) - err := d2.Join(d1.listenAddr, "", "", false) + err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) c.Assert(err.Error(), checker.Contains, "secret token is necessary") info, err := d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) - err = d2.Join(d1.listenAddr, "foobaz", "", false) + err = d2.Join(swarm.JoinRequest{Secret: "foobaz", RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) c.Assert(err.Error(), checker.Contains, "secret token is necessary") info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) - c.Assert(d2.Join(d1.listenAddr, "foobar", "", false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{Secret: "foobar", RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) @@ -156,14 +173,14 @@ func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { } }) - err = d2.Join(d1.listenAddr, "foobar", "", false) + err = d2.Join(swarm.JoinRequest{Secret: "foobar", RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) c.Assert(err.Error(), checker.Contains, "secret token is necessary") info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) - c.Assert(d2.Join(d1.listenAddr, "foobaz", "", false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{Secret: "foobaz", RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) @@ -182,14 +199,14 @@ func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { } }) - err = d2.Join(d1.listenAddr, "", "", false) + err = d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) c.Assert(err.Error(), checker.Contains, "secret token is necessary") info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) - c.Assert(d2.Join(d1.listenAddr, "foobaz", "", false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{Secret: "foobaz", RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) @@ -206,7 +223,7 @@ func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { } }) - c.Assert(d2.Join(d1.listenAddr, "", "", false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) @@ -220,17 +237,26 @@ func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { func (s *DockerSwarmSuite) TestApiSwarmCAHash(c *check.C) { d1 := s.AddDaemon(c, true, true) d2 := s.AddDaemon(c, false, false) - err := d2.Join(d1.listenAddr, "", "foobar", false) + err := d2.Join(swarm.JoinRequest{CACertHash: "foobar", RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) c.Assert(err.Error(), checker.Contains, "invalid checksum digest format") c.Assert(len(d1.CACertHash), checker.GreaterThan, 0) - c.Assert(d2.Join(d1.listenAddr, "", d1.CACertHash, false), checker.IsNil) + c.Assert(d2.Join(swarm.JoinRequest{CACertHash: d1.CACertHash, RemoteAddrs: []string{d1.listenAddr}}), checker.IsNil) } func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) { d1 := s.AddDaemon(c, false, false) - c.Assert(d1.Init(map[string]bool{"worker": true}, ""), checker.IsNil) + c.Assert(d1.Init(swarm.InitRequest{ + Spec: swarm.Spec{ + AcceptancePolicy: swarm.AcceptancePolicy{ + Policies: []swarm.Policy{ + {Role: swarm.NodeRoleWorker, Autoaccept: true}, + {Role: swarm.NodeRoleManager}, + }, + }, + }, + }), checker.IsNil) d2 := s.AddDaemon(c, true, false) info, err := d2.info() @@ -561,7 +587,9 @@ func (s *DockerSwarmSuite) TestApiSwarmLeaveOnPendingJoin(c *check.C) { c.Assert(err, checker.IsNil) id = strings.TrimSpace(id) - go d2.Join("nosuchhost:1234", "", "", false) // will block on pending state + go d2.Join(swarm.JoinRequest{ + RemoteAddrs: []string{"nosuchhost:1234"}, + }) // will block on pending state for i := 0; ; i++ { info, err := d2.info() @@ -587,7 +615,9 @@ func (s *DockerSwarmSuite) TestApiSwarmLeaveOnPendingJoin(c *check.C) { // #23705 func (s *DockerSwarmSuite) TestApiSwarmRestoreOnPendingJoin(c *check.C) { d := s.AddDaemon(c, false, false) - go d.Join("nosuchhost:1234", "", "", false) // will block on pending state + go d.Join(swarm.JoinRequest{ + RemoteAddrs: []string{"nosuchhost:1234"}, + }) // will block on pending state for i := 0; ; i++ { info, err := d.info() From 1a8a473017299c5e999d55d14634874826062fce Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 20 Jun 2016 16:35:33 -0700 Subject: [PATCH 2/3] Fix race on clearing swarm nodes on stop On stop there were multiple places that marked `cluster.node` nil. Now stop waits for the node to set itself nil. Signed-off-by: Tonis Tiigi --- daemon/cluster/cluster.go | 214 ++++++++++++++++++-------------------- 1 file changed, 99 insertions(+), 115 deletions(-) diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index f8b7a85e1a..8f5cad7b99 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -89,18 +89,23 @@ type Config struct { // manager and a worker. type Cluster struct { sync.RWMutex - root string - config Config - configEvent chan struct{} // todo: make this array and goroutine safe - node *swarmagent.Node + *node + root string + config Config + configEvent chan struct{} // todo: make this array and goroutine safe + listenAddr string + stop bool + err error + cancelDelay func() +} + +type node struct { + *swarmagent.Node + done chan struct{} + ready bool conn *grpc.ClientConn client swarmapi.ControlClient - ready bool - listenAddr string - err error reconnectDelay time.Duration - stop bool - cancelDelay func() } // New creates a new Cluster instance using provided config. @@ -110,10 +115,9 @@ func New(config Config) (*Cluster, error) { return nil, err } c := &Cluster{ - root: root, - config: config, - configEvent: make(chan struct{}, 10), - reconnectDelay: initialReconnectDelay, + root: root, + config: config, + configEvent: make(chan struct{}, 10), } st, err := c.loadState() @@ -124,7 +128,7 @@ func New(config Config) (*Cluster, error) { return nil, err } - n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false) + n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false) if err != nil { return nil, err } @@ -133,12 +137,10 @@ func New(config Config) (*Cluster, error) { case <-time.After(swarmConnectTimeout): logrus.Errorf("swarm component could not be started before timeout was reached") case <-n.Ready(): - case <-ctx.Done(): + case <-n.done: + return nil, fmt.Errorf("swarm component could not be started: %v", c.err) } - if ctx.Err() != nil { - return nil, fmt.Errorf("swarm component could not be started") - } - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return c, nil } @@ -169,20 +171,20 @@ func (c *Cluster) saveState() error { return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600) } -func (c *Cluster) reconnectOnFailure(ctx context.Context) { +func (c *Cluster) reconnectOnFailure(n *node) { for { - <-ctx.Done() + <-n.done c.Lock() if c.stop || c.node != nil { c.Unlock() return } - c.reconnectDelay *= 2 - if c.reconnectDelay > maxReconnectDelay { - c.reconnectDelay = maxReconnectDelay + n.reconnectDelay *= 2 + if n.reconnectDelay > maxReconnectDelay { + n.reconnectDelay = maxReconnectDelay } - logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds()) - delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay) + logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) + delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) c.cancelDelay = cancel c.Unlock() <-delayCtx.Done() @@ -195,22 +197,23 @@ func (c *Cluster) reconnectOnFailure(ctx context.Context) { return } var err error - _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false) + n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false) if err != nil { c.err = err - ctx = delayCtx + close(n.done) } c.Unlock() } } -func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) { +func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) { if err := c.config.Backend.IsSwarmCompatible(); err != nil { - return nil, nil, err + return nil, err } c.node = nil c.cancelDelay = nil - node, err := swarmagent.NewNode(&swarmagent.NodeConfig{ + c.stop = false + n, err := swarmagent.NewNode(&swarmagent.NodeConfig{ Hostname: c.config.Name, ForceNewCluster: forceNewCluster, ListenControlAPI: filepath.Join(c.root, controlSocket), @@ -225,85 +228,76 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre IsManager: ismanager, }) if err != nil { - return nil, nil, err + return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - if err := node.Start(ctx); err != nil { - return nil, nil, err + ctx := context.Background() + if err := n.Start(ctx); err != nil { + return nil, err + } + node := &node{ + Node: n, + done: make(chan struct{}), + reconnectDelay: initialReconnectDelay, } - c.node = node c.listenAddr = listenAddr c.saveState() c.config.Backend.SetClusterProvider(c) go func() { - err := node.Err(ctx) + err := n.Err(ctx) if err != nil { logrus.Errorf("cluster exited with error: %v", err) } c.Lock() - c.conn = nil - c.client = nil c.node = nil - c.ready = false c.err = err c.Unlock() - cancel() + close(node.done) }() go func() { select { - case <-node.Ready(): + case <-n.Ready(): c.Lock() - c.reconnectDelay = initialReconnectDelay - c.Unlock() - case <-ctx.Done(): - } - if ctx.Err() == nil { - c.Lock() - c.ready = true + node.ready = true c.err = nil c.Unlock() + case <-ctx.Done(): } c.configEvent <- struct{}{} }() go func() { - for conn := range node.ListenControlSocket(ctx) { + for conn := range n.ListenControlSocket(ctx) { c.Lock() - if c.conn != conn { - c.client = swarmapi.NewControlClient(conn) + if node.conn != conn { + if conn == nil { + node.client = nil + } else { + node.client = swarmapi.NewControlClient(conn) + } } - if c.conn != nil { - c.client = nil - } - c.conn = conn + node.conn = conn c.Unlock() c.configEvent <- struct{}{} } }() - return node, ctx, nil + return node, nil } // Init initializes new cluster from user provided request. func (c *Cluster) Init(req types.InitRequest) (string, error) { c.Lock() if node := c.node; node != nil { - c.Unlock() if !req.ForceNewCluster { + c.Unlock() return "", errSwarmExists(node) } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - c.cancelReconnect() - if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + if err := c.stopNode(); err != nil { + c.Unlock() return "", err } - c.Lock() - c.node = nil - c.conn = nil - c.ready = false } if err := validateAndSanitizeInitRequest(&req); err != nil { @@ -312,7 +306,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { } // todo: check current state existing - n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false) + n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false) if err != nil { c.Unlock() return "", err @@ -324,20 +318,17 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { if err := initClusterSpec(n, req.Spec); err != nil { return "", err } - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return n.NodeID(), nil - case <-ctx.Done(): + case <-n.done: c.RLock() defer c.RUnlock() - if c.err != nil { - if !req.ForceNewCluster { // if failure on first attempt don't keep state - if err := c.clearState(); err != nil { - return "", err - } + if !req.ForceNewCluster { // if failure on first attempt don't keep state + if err := c.clearState(); err != nil { + return "", err } - return "", c.err } - return "", ctx.Err() + return "", c.err } } @@ -353,7 +344,7 @@ func (c *Cluster) Join(req types.JoinRequest) error { return err } // todo: check current state existing - n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager) + n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager) if err != nil { c.Unlock() return err @@ -370,28 +361,41 @@ func (c *Cluster) Join(req types.JoinRequest) error { certificateRequested = nil case <-time.After(swarmConnectTimeout): // attempt to connect will continue in background, also reconnecting - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return ErrSwarmJoinTimeoutReached case <-n.Ready(): - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return nil - case <-ctx.Done(): + case <-n.done: c.RLock() defer c.RUnlock() - if c.err != nil { - return c.err - } - return ctx.Err() + return c.err } } } -func (c *Cluster) cancelReconnect() { +// stopNode is a helper that stops the active c.node and waits until it has +// shut down. Call while keeping the cluster lock. +func (c *Cluster) stopNode() error { + if c.node == nil { + return nil + } c.stop = true if c.cancelDelay != nil { c.cancelDelay() c.cancelDelay = nil } + node := c.node + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + // TODO: can't hold lock on stop because it calls back to network + c.Unlock() + defer c.Lock() + if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + return err + } + <-node.done + return nil } // Leave shuts down Cluster and removes current state. @@ -425,14 +429,11 @@ func (c *Cluster) Leave(force bool) error { c.Unlock() return fmt.Errorf(msg) } - c.cancelReconnect() - c.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + if err := c.stopNode(); err != nil { + c.Unlock() return err } + c.Unlock() if nodeID := node.NodeID(); nodeID != "" { for _, id := range c.config.Backend.ListContainersForNode(nodeID) { if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { @@ -440,11 +441,6 @@ func (c *Cluster) Leave(force bool) error { } } } - c.Lock() - defer c.Unlock() - c.node = nil - c.conn = nil - c.ready = false c.configEvent <- struct{}{} // todo: cleanup optional? if err := c.clearState(); err != nil { @@ -534,7 +530,7 @@ func (c *Cluster) IsManager() bool { func (c *Cluster) IsAgent() bool { c.RLock() defer c.RUnlock() - return c.ready + return c.node != nil && c.ready } // GetListenAddress returns the listening address for current maanger's @@ -542,7 +538,7 @@ func (c *Cluster) IsAgent() bool { func (c *Cluster) GetListenAddress() string { c.RLock() defer c.RUnlock() - if c.conn != nil { + if c.isActiveManager() { return c.listenAddr } return "" @@ -597,7 +593,6 @@ func (c *Cluster) Info() types.Info { if c.err != nil { info.Error = c.err.Error() } - if c.isActiveManager() { info.ControlAvailable = true if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil { @@ -626,7 +621,7 @@ func (c *Cluster) Info() types.Info { // isActiveManager should not be called without a read lock func (c *Cluster) isActiveManager() bool { - return c.conn != nil + return c.node != nil && c.conn != nil } // GetServices returns all services of a managed swarm cluster. @@ -1014,7 +1009,7 @@ func (c *Cluster) Cleanup() { c.Unlock() return } - + defer c.Unlock() if c.isActiveManager() { active, reachable, unreachable, err := c.managerStats() if err == nil { @@ -1024,18 +1019,7 @@ func (c *Cluster) Cleanup() { } } } - c.cancelReconnect() - c.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := node.Stop(ctx); err != nil { - logrus.Errorf("error cleaning up cluster: %v", err) - } - c.Lock() - c.node = nil - c.ready = false - c.conn = nil - c.Unlock() + c.stopNode() } func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) { @@ -1130,14 +1114,14 @@ func validateAddr(addr string) (string, error) { return strings.TrimPrefix(newaddr, "tcp://"), nil } -func errSwarmExists(node *swarmagent.Node) error { +func errSwarmExists(node *node) error { if node.NodeMembership() != swarmapi.NodeMembershipAccepted { return ErrPendingSwarmExists } return ErrSwarmExists } -func initClusterSpec(node *swarmagent.Node, spec types.Spec) error { +func initClusterSpec(node *node, spec types.Spec) error { ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) for conn := range node.ListenControlSocket(ctx) { if ctx.Err() != nil { From 1acb8ef82572c52994b1ff00fe5c86aac53be4b8 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 20 Jun 2016 16:36:13 -0700 Subject: [PATCH 3/3] Add test for force-new-cluster Signed-off-by: Tonis Tiigi --- integration-cli/docker_api_swarm_test.go | 35 +++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/integration-cli/docker_api_swarm_test.go b/integration-cli/docker_api_swarm_test.go index 107930bfb3..7b8d0431ff 100644 --- a/integration-cli/docker_api_swarm_test.go +++ b/integration-cli/docker_api_swarm_test.go @@ -295,7 +295,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) { break } if i > 100 { - c.Errorf("node did not turn into manager") + c.Errorf("node did not turn into worker") } else { break } @@ -710,6 +710,39 @@ func (s *DockerSwarmSuite) TestApiSwarmInvalidAddress(c *check.C) { c.Assert(status, checker.Equals, http.StatusInternalServerError) } +func (s *DockerSwarmSuite) TestApiSwarmForceNewCluster(c *check.C) { + d1 := s.AddDaemon(c, true, true) + d2 := s.AddDaemon(c, true, true) + + instances := 2 + id := d1.createService(c, simpleTestService, setInstances(instances)) + waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances) + + c.Assert(d2.Stop(), checker.IsNil) + + time.Sleep(5 * time.Second) + + c.Assert(d1.Init(swarm.InitRequest{ + ForceNewCluster: true, + Spec: swarm.Spec{ + AcceptancePolicy: autoAcceptPolicy, + }, + }), checker.IsNil) + + waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances) + + d3 := s.AddDaemon(c, true, true) + info, err := d3.info() + c.Assert(err, checker.IsNil) + c.Assert(info.ControlAvailable, checker.Equals, true) + c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) + + instances = 4 + d3.updateService(c, d3.getService(c, id), setInstances(instances)) + + waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d3.checkActiveContainerCount), checker.Equals, instances) +} + func simpleTestService(s *swarm.Service) { var ureplicas uint64 ureplicas = 1