Переглянути джерело

Fix race on clearing swarm nodes on stop

On stop there were multiple places that marked
`cluster.node` nil. Now stop waits for the node to
set itself nil.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 1a8a473017299c5e999d55d14634874826062fce)
Tonis Tiigi 9 роки тому
батько
коміт
1f4e2f33ef
1 змінених файлів з 99 додано та 115 видалено
  1. 99 115
      daemon/cluster/cluster.go

+ 99 - 115
daemon/cluster/cluster.go

@@ -86,18 +86,23 @@ type Config struct {
 // manager.
 type Cluster struct {
 	sync.RWMutex
-	root           string
-	config         Config
-	configEvent    chan struct{} // todo: make this array and goroutine safe
-	node           *swarmagent.Node
+	*node
+	root        string
+	config      Config
+	configEvent chan struct{} // todo: make this array and goroutine safe
+	listenAddr  string
+	stop        bool
+	err         error
+	cancelDelay func()
+}
+
+type node struct {
+	*swarmagent.Node
+	done           chan struct{}
+	ready          bool
 	conn           *grpc.ClientConn
 	client         swarmapi.ControlClient
-	ready          bool
-	listenAddr     string
-	err            error
 	reconnectDelay time.Duration
-	stop           bool
-	cancelDelay    func()
 }
 
 // New creates a new Cluster instance using provided config.
@@ -107,10 +112,9 @@ func New(config Config) (*Cluster, error) {
 		return nil, err
 	}
 	c := &Cluster{
-		root:           root,
-		config:         config,
-		configEvent:    make(chan struct{}, 10),
-		reconnectDelay: initialReconnectDelay,
+		root:        root,
+		config:      config,
+		configEvent: make(chan struct{}, 10),
 	}
 
 	st, err := c.loadState()
@@ -121,7 +125,7 @@ func New(config Config) (*Cluster, error) {
 		return nil, err
 	}
 
-	n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
+	n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
 	if err != nil {
 		return nil, err
 	}
@@ -130,12 +134,10 @@ func New(config Config) (*Cluster, error) {
 	case <-time.After(swarmConnectTimeout):
 		logrus.Errorf("swarm component could not be started before timeout was reached")
 	case <-n.Ready():
-	case <-ctx.Done():
-	}
-	if ctx.Err() != nil {
-		return nil, fmt.Errorf("swarm component could not be started")
+	case <-n.done:
+		return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
 	}
-	go c.reconnectOnFailure(ctx)
+	go c.reconnectOnFailure(n)
 	return c, nil
 }
 
@@ -166,20 +168,20 @@ func (c *Cluster) saveState() error {
 	return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
 }
 
-func (c *Cluster) reconnectOnFailure(ctx context.Context) {
+func (c *Cluster) reconnectOnFailure(n *node) {
 	for {
-		<-ctx.Done()
+		<-n.done
 		c.Lock()
 		if c.stop || c.node != nil {
 			c.Unlock()
 			return
 		}
-		c.reconnectDelay *= 2
-		if c.reconnectDelay > maxReconnectDelay {
-			c.reconnectDelay = maxReconnectDelay
+		n.reconnectDelay *= 2
+		if n.reconnectDelay > maxReconnectDelay {
+			n.reconnectDelay = maxReconnectDelay
 		}
-		logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds())
-		delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay)
+		logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
+		delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
 		c.cancelDelay = cancel
 		c.Unlock()
 		<-delayCtx.Done()
@@ -192,22 +194,23 @@ func (c *Cluster) reconnectOnFailure(ctx context.Context) {
 			return
 		}
 		var err error
-		_, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
+		n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
 		if err != nil {
 			c.err = err
-			ctx = delayCtx
+			close(n.done)
 		}
 		c.Unlock()
 	}
 }
 
-func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) {
+func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) {
 	if err := c.config.Backend.IsSwarmCompatible(); err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 	c.node = nil
 	c.cancelDelay = nil
-	node, err := swarmagent.NewNode(&swarmagent.NodeConfig{
+	c.stop = false
+	n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
 		Hostname:         c.config.Name,
 		ForceNewCluster:  forceNewCluster,
 		ListenControlAPI: filepath.Join(c.root, controlSocket),
@@ -222,85 +225,76 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre
 		IsManager:        ismanager,
 	})
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
-	ctx, cancel := context.WithCancel(context.Background())
-	if err := node.Start(ctx); err != nil {
-		return nil, nil, err
+	ctx := context.Background()
+	if err := n.Start(ctx); err != nil {
+		return nil, err
+	}
+	node := &node{
+		Node:           n,
+		done:           make(chan struct{}),
+		reconnectDelay: initialReconnectDelay,
 	}
-
 	c.node = node
 	c.listenAddr = listenAddr
 	c.saveState()
 	c.config.Backend.SetClusterProvider(c)
 	go func() {
-		err := node.Err(ctx)
+		err := n.Err(ctx)
 		if err != nil {
 			logrus.Errorf("cluster exited with error: %v", err)
 		}
 		c.Lock()
-		c.conn = nil
-		c.client = nil
 		c.node = nil
-		c.ready = false
 		c.err = err
 		c.Unlock()
-		cancel()
+		close(node.done)
 	}()
 
 	go func() {
 		select {
-		case <-node.Ready():
-			c.Lock()
-			c.reconnectDelay = initialReconnectDelay
-			c.Unlock()
-		case <-ctx.Done():
-		}
-		if ctx.Err() == nil {
+		case <-n.Ready():
 			c.Lock()
-			c.ready = true
+			node.ready = true
 			c.err = nil
 			c.Unlock()
+		case <-ctx.Done():
 		}
 		c.configEvent <- struct{}{}
 	}()
 
 	go func() {
-		for conn := range node.ListenControlSocket(ctx) {
+		for conn := range n.ListenControlSocket(ctx) {
 			c.Lock()
-			if c.conn != conn {
-				c.client = swarmapi.NewControlClient(conn)
-			}
-			if c.conn != nil {
-				c.client = nil
+			if node.conn != conn {
+				if conn == nil {
+					node.client = nil
+				} else {
+					node.client = swarmapi.NewControlClient(conn)
+				}
 			}
-			c.conn = conn
+			node.conn = conn
 			c.Unlock()
 			c.configEvent <- struct{}{}
 		}
 	}()
 
-	return node, ctx, nil
+	return node, nil
 }
 
 // Init initializes new cluster from user provided request.
 func (c *Cluster) Init(req types.InitRequest) (string, error) {
 	c.Lock()
 	if node := c.node; node != nil {
-		c.Unlock()
 		if !req.ForceNewCluster {
+			c.Unlock()
 			return "", errSwarmExists(node)
 		}
-		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
-		defer cancel()
-		c.cancelReconnect()
-		if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
+		if err := c.stopNode(); err != nil {
+			c.Unlock()
 			return "", err
 		}
-		c.Lock()
-		c.node = nil
-		c.conn = nil
-		c.ready = false
 	}
 
 	if err := validateAndSanitizeInitRequest(&req); err != nil {
@@ -309,7 +303,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
 	}
 
 	// todo: check current state existing
-	n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
+	n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
 	if err != nil {
 		c.Unlock()
 		return "", err
@@ -321,20 +315,17 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
 		if err := initClusterSpec(n, req.Spec); err != nil {
 			return "", err
 		}
-		go c.reconnectOnFailure(ctx)
+		go c.reconnectOnFailure(n)
 		return n.NodeID(), nil
-	case <-ctx.Done():
+	case <-n.done:
 		c.RLock()
 		defer c.RUnlock()
-		if c.err != nil {
-			if !req.ForceNewCluster { // if failure on first attempt don't keep state
-				if err := c.clearState(); err != nil {
-					return "", err
-				}
+		if !req.ForceNewCluster { // if failure on first attempt don't keep state
+			if err := c.clearState(); err != nil {
+				return "", err
 			}
-			return "", c.err
 		}
-		return "", ctx.Err()
+		return "", c.err
 	}
 }
 
@@ -350,7 +341,7 @@ func (c *Cluster) Join(req types.JoinRequest) error {
 		return err
 	}
 	// todo: check current state existing
-	n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
+	n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
 	if err != nil {
 		c.Unlock()
 		return err
@@ -367,28 +358,41 @@ func (c *Cluster) Join(req types.JoinRequest) error {
 			certificateRequested = nil
 		case <-time.After(swarmConnectTimeout):
 			// attempt to connect will continue in background, also reconnecting
-			go c.reconnectOnFailure(ctx)
+			go c.reconnectOnFailure(n)
 			return ErrSwarmJoinTimeoutReached
 		case <-n.Ready():
-			go c.reconnectOnFailure(ctx)
+			go c.reconnectOnFailure(n)
 			return nil
-		case <-ctx.Done():
+		case <-n.done:
 			c.RLock()
 			defer c.RUnlock()
-			if c.err != nil {
-				return c.err
-			}
-			return ctx.Err()
+			return c.err
 		}
 	}
 }
 
-func (c *Cluster) cancelReconnect() {
+// stopNode is a helper that stops the active c.node and waits until it has
+// shut down. Call while keeping the cluster lock.
+func (c *Cluster) stopNode() error {
+	if c.node == nil {
+		return nil
+	}
 	c.stop = true
 	if c.cancelDelay != nil {
 		c.cancelDelay()
 		c.cancelDelay = nil
 	}
+	node := c.node
+	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+	defer cancel()
+	// TODO: can't hold lock on stop because it calls back to network
+	c.Unlock()
+	defer c.Lock()
+	if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
+		return err
+	}
+	<-node.done
+	return nil
 }
 
 // Leave shuts down Cluster and removes current state.
@@ -422,14 +426,11 @@ func (c *Cluster) Leave(force bool) error {
 		c.Unlock()
 		return fmt.Errorf(msg)
 	}
-	c.cancelReconnect()
-	c.Unlock()
-
-	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
-	defer cancel()
-	if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
+	if err := c.stopNode(); err != nil {
+		c.Unlock()
 		return err
 	}
+	c.Unlock()
 	if nodeID := node.NodeID(); nodeID != "" {
 		for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
 			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
@@ -437,11 +438,6 @@ func (c *Cluster) Leave(force bool) error {
 			}
 		}
 	}
-	c.Lock()
-	defer c.Unlock()
-	c.node = nil
-	c.conn = nil
-	c.ready = false
 	c.configEvent <- struct{}{}
 	// todo: cleanup optional?
 	if err := c.clearState(); err != nil {
@@ -531,7 +527,7 @@ func (c *Cluster) IsManager() bool {
 func (c *Cluster) IsAgent() bool {
 	c.RLock()
 	defer c.RUnlock()
-	return c.ready
+	return c.node != nil && c.ready
 }
 
 // GetListenAddress returns the listening address for current manager's
@@ -539,7 +535,7 @@ func (c *Cluster) IsAgent() bool {
 func (c *Cluster) GetListenAddress() string {
 	c.RLock()
 	defer c.RUnlock()
-	if c.conn != nil {
+	if c.isActiveManager() {
 		return c.listenAddr
 	}
 	return ""
@@ -594,7 +590,6 @@ func (c *Cluster) Info() types.Info {
 	if c.err != nil {
 		info.Error = c.err.Error()
 	}
-
 	if c.isActiveManager() {
 		info.ControlAvailable = true
 		if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
@@ -623,7 +618,7 @@ func (c *Cluster) Info() types.Info {
 
 // isActiveManager should not be called without a read lock
 func (c *Cluster) isActiveManager() bool {
-	return c.conn != nil
+	return c.node != nil && c.conn != nil
 }
 
 // errNoManager returns error describing why manager commands can't be used.
@@ -1023,7 +1018,7 @@ func (c *Cluster) Cleanup() {
 		c.Unlock()
 		return
 	}
-
+	defer c.Unlock()
 	if c.isActiveManager() {
 		active, reachable, unreachable, err := c.managerStats()
 		if err == nil {
@@ -1033,18 +1028,7 @@ func (c *Cluster) Cleanup() {
 			}
 		}
 	}
-	c.cancelReconnect()
-	c.Unlock()
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
-	defer cancel()
-	if err := node.Stop(ctx); err != nil {
-		logrus.Errorf("error cleaning up cluster: %v", err)
-	}
-	c.Lock()
-	c.node = nil
-	c.ready = false
-	c.conn = nil
-	c.Unlock()
+	c.stopNode()
 }
 
 func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
@@ -1139,14 +1123,14 @@ func validateAddr(addr string) (string, error) {
 	return strings.TrimPrefix(newaddr, "tcp://"), nil
 }
 
-func errSwarmExists(node *swarmagent.Node) error {
+func errSwarmExists(node *node) error {
 	if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
 		return ErrPendingSwarmExists
 	}
 	return ErrSwarmExists
 }
 
-func initClusterSpec(node *swarmagent.Node, spec types.Spec) error {
+func initClusterSpec(node *node, spec types.Spec) error {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	for conn := range node.ListenControlSocket(ctx) {
 		if ctx.Err() != nil {