Ver código fonte

Merge pull request #1735 from fcrisciani/init_race_cond

Fix for swarm init/leave race condition
Madhu Venugopal 8 anos atrás
pai
commit
e969e6ef61

+ 26 - 35
libnetwork/agent.go

@@ -13,6 +13,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/go-events"
 	"github.com/docker/go-events"
+	"github.com/docker/libnetwork/cluster"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
@@ -40,7 +41,7 @@ type agent struct {
 	bindAddr          string
 	bindAddr          string
 	advertiseAddr     string
 	advertiseAddr     string
 	dataPathAddr      string
 	dataPathAddr      string
-	epTblCancel       func()
+	coreCancelFuncs   []func()
 	driverCancelFuncs map[string][]func()
 	driverCancelFuncs map[string][]func()
 	sync.Mutex
 	sync.Mutex
 }
 }
@@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
 	return nil
 	return nil
 }
 }
 
 
-func (c *controller) agentSetup() error {
-	c.Lock()
-	clusterProvider := c.cfg.Daemon.ClusterProvider
-	agent := c.agent
-	c.Unlock()
+func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
+	agent := c.getAgent()
 
 
-	if clusterProvider == nil {
-		msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
-		logrus.Errorf(msg)
-		return fmt.Errorf(msg)
+	// If the agent is already present there is no need to try to initilize it again
+	if agent != nil {
+		return nil
 	}
 	}
 
 
 	bindAddr := clusterProvider.GetLocalAddress()
 	bindAddr := clusterProvider.GetLocalAddress()
@@ -221,15 +218,15 @@ func (c *controller) agentSetup() error {
 		listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
 		listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
 	if advAddr != "" && agent == nil {
 	if advAddr != "" && agent == nil {
 		if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
 		if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
-			logrus.Errorf("Error in agentInit : %v", err)
-		} else {
-			c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
-				if capability.DataScope == datastore.GlobalScope {
-					c.agentDriverNotify(driver)
-				}
-				return false
-			})
+			logrus.Errorf("error in agentInit: %v", err)
+			return err
 		}
 		}
+		c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
+			if capability.DataScope == datastore.GlobalScope {
+				c.agentDriverNotify(driver)
+			}
+			return false
+		})
 	}
 	}
 
 
 	if len(remoteAddrList) > 0 {
 	if len(remoteAddrList) > 0 {
@@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
 		}
 		}
 	}
 	}
 
 
-	c.Lock()
-	if c.agent != nil && c.agentInitDone != nil {
-		close(c.agentInitDone)
-		c.agentInitDone = nil
-		c.agentStopDone = make(chan struct{})
-	}
-	c.Unlock()
-
 	return nil
 	return nil
 }
 }
 
 
@@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
 }
 }
 
 
 func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
 func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
-	if !c.isAgent() {
-		return nil
-	}
-
 	bindAddr, err := resolveAddr(bindAddrOrInterface)
 	bindAddr, err := resolveAddr(bindAddrOrInterface)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	keys, tags := c.getKeys(subsysGossip)
+	keys, _ := c.getKeys(subsysGossip)
 	hostname, _ := os.Hostname()
 	hostname, _ := os.Hostname()
 	nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
 	nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
 	logrus.Info("Gossip cluster hostname ", nodeName)
 	logrus.Info("Gossip cluster hostname ", nodeName)
@@ -312,8 +297,11 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
 		return err
 		return err
 	}
 	}
 
 
+	var cancelList []func()
 	ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
 	ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
+	cancelList = append(cancelList, cancel)
 	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
 	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
+	cancelList = append(cancelList, cancel)
 
 
 	c.Lock()
 	c.Lock()
 	c.agent = &agent{
 	c.agent = &agent{
@@ -321,7 +309,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
 		bindAddr:          bindAddr,
 		bindAddr:          bindAddr,
 		advertiseAddr:     advertiseAddr,
 		advertiseAddr:     advertiseAddr,
 		dataPathAddr:      dataPathAddr,
 		dataPathAddr:      dataPathAddr,
-		epTblCancel:       cancel,
+		coreCancelFuncs:   cancelList,
 		driverCancelFuncs: make(map[string][]func()),
 		driverCancelFuncs: make(map[string][]func()),
 	}
 	}
 	c.Unlock()
 	c.Unlock()
@@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
 	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
 	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
 
 
 	drvEnc := discoverapi.DriverEncryptionConfig{}
 	drvEnc := discoverapi.DriverEncryptionConfig{}
-	keys, tags = c.getKeys(subsysIPSec)
+	keys, tags := c.getKeys(subsysIPSec)
 	drvEnc.Keys = keys
 	drvEnc.Keys = keys
 	drvEnc.Tags = tags
 	drvEnc.Tags = tags
 
 
@@ -399,14 +387,17 @@ func (c *controller) agentClose() {
 			cancelList = append(cancelList, cancel)
 			cancelList = append(cancelList, cancel)
 		}
 		}
 	}
 	}
+
+	// Add also the cancel functions for the network db
+	for _, cancel := range agent.coreCancelFuncs {
+		cancelList = append(cancelList, cancel)
+	}
 	agent.Unlock()
 	agent.Unlock()
 
 
 	for _, cancel := range cancelList {
 	for _, cancel := range cancelList {
 		cancel()
 		cancel()
 	}
 	}
 
 
-	agent.epTblCancel()
-
 	agent.networkDB.Close()
 	agent.networkDB.Close()
 }
 }
 
 

+ 15 - 1
libnetwork/cluster/provider.go

@@ -5,6 +5,20 @@ import (
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
 
 
+const (
+	// EventSocketChange control socket changed
+	EventSocketChange = iota
+	// EventNodeReady cluster node in ready state
+	EventNodeReady
+	// EventNodeLeave node is leaving the cluster
+	EventNodeLeave
+	// EventNetworkKeysAvailable network keys correctly configured in the networking layer
+	EventNetworkKeysAvailable
+)
+
+// ConfigEventType type of the event produced by the cluster
+type ConfigEventType uint8
+
 // Provider provides clustering config details
 // Provider provides clustering config details
 type Provider interface {
 type Provider interface {
 	IsManager() bool
 	IsManager() bool
@@ -14,7 +28,7 @@ type Provider interface {
 	GetAdvertiseAddress() string
 	GetAdvertiseAddress() string
 	GetDataPathAddress() string
 	GetDataPathAddress() string
 	GetRemoteAddressList() []string
 	GetRemoteAddressList() []string
-	ListenClusterEvents() <-chan struct{}
+	ListenClusterEvents() <-chan ConfigEventType
 	AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
 	AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
 	DetachNetwork(string, string) error
 	DetachNetwork(string, string) error
 	UpdateAttachment(string, string, *network.NetworkingConfig) error
 	UpdateAttachment(string, string, *network.NetworkingConfig) error

+ 5 - 4
libnetwork/cmd/dnet/dnet.go

@@ -27,6 +27,7 @@ import (
 	"github.com/docker/docker/pkg/term"
 	"github.com/docker/docker/pkg/term"
 	"github.com/docker/libnetwork"
 	"github.com/docker/libnetwork"
 	"github.com/docker/libnetwork/api"
 	"github.com/docker/libnetwork/api"
+	"github.com/docker/libnetwork/cluster"
 	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
@@ -234,7 +235,7 @@ type dnetConnection struct {
 	// addr holds the client address.
 	// addr holds the client address.
 	addr          string
 	addr          string
 	Orchestration *NetworkOrchestration
 	Orchestration *NetworkOrchestration
-	configEvent   chan struct{}
+	configEvent   chan cluster.ConfigEventType
 }
 }
 
 
 // NetworkOrchestration exported
 // NetworkOrchestration exported
@@ -275,7 +276,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
 	controller.SetClusterProvider(d)
 	controller.SetClusterProvider(d)
 
 
 	if d.Orchestration.Agent || d.Orchestration.Manager {
 	if d.Orchestration.Agent || d.Orchestration.Manager {
-		d.configEvent <- struct{}{}
+		d.configEvent <- cluster.EventNodeReady
 	}
 	}
 
 
 	createDefaultNetwork(controller)
 	createDefaultNetwork(controller)
@@ -335,7 +336,7 @@ func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
 func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
 func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
 }
 }
 
 
-func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
+func (d *dnetConnection) ListenClusterEvents() <-chan cluster.ConfigEventType {
 	return d.configEvent
 	return d.configEvent
 }
 }
 
 
@@ -438,7 +439,7 @@ func newDnetConnection(val string) (*dnetConnection, error) {
 		return nil, errors.New("dnet currently only supports tcp transport")
 		return nil, errors.New("dnet currently only supports tcp transport")
 	}
 	}
 
 
-	return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
+	return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan cluster.ConfigEventType, 10)}, nil
 }
 }
 
 
 func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
 func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {

+ 1 - 3
libnetwork/config/config.go

@@ -34,7 +34,6 @@ type DaemonCfg struct {
 	Labels          []string
 	Labels          []string
 	DriverCfg       map[string]interface{}
 	DriverCfg       map[string]interface{}
 	ClusterProvider cluster.Provider
 	ClusterProvider cluster.Provider
-	DisableProvider chan struct{}
 }
 }
 
 
 // ClusterCfg represents cluster configuration
 // ClusterCfg represents cluster configuration
@@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
 func ParseConfigOptions(cfgOptions ...Option) *Config {
 func ParseConfigOptions(cfgOptions ...Option) *Config {
 	cfg := &Config{
 	cfg := &Config{
 		Daemon: DaemonCfg{
 		Daemon: DaemonCfg{
-			DriverCfg:       make(map[string]interface{}),
-			DisableProvider: make(chan struct{}, 10),
+			DriverCfg: make(map[string]interface{}),
 		},
 		},
 		Scopes: make(map[string]*datastore.ScopeCfg),
 		Scopes: make(map[string]*datastore.ScopeCfg),
 	}
 	}

+ 74 - 47
libnetwork/controller.go

@@ -244,15 +244,24 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
 }
 }
 
 
 func (c *controller) SetClusterProvider(provider cluster.Provider) {
 func (c *controller) SetClusterProvider(provider cluster.Provider) {
+	var sameProvider bool
 	c.Lock()
 	c.Lock()
-	c.cfg.Daemon.ClusterProvider = provider
-	disableProviderCh := c.cfg.Daemon.DisableProvider
-	c.Unlock()
-	if provider != nil {
-		go c.clusterAgentInit()
+	// Avoids to spawn multiple goroutine for the same cluster provider
+	if c.cfg.Daemon.ClusterProvider == provider {
+		// If the cluster provider is already set, there is already a go routine spawned
+		// that is listening for events, so nothing to do here
+		sameProvider = true
 	} else {
 	} else {
-		disableProviderCh <- struct{}{}
+		c.cfg.Daemon.ClusterProvider = provider
+	}
+	c.Unlock()
+
+	if provider == nil || sameProvider {
+		return
 	}
 	}
+	// We don't want to spawn a new go routine if the previous one did not exit yet
+	c.AgentStopWait()
+	go c.clusterAgentInit()
 }
 }
 
 
 func isValidClusteringIP(addr string) bool {
 func isValidClusteringIP(addr string) bool {
@@ -262,12 +271,6 @@ func isValidClusteringIP(addr string) bool {
 // libnetwork side of agent depends on the keys. On the first receipt of
 // libnetwork side of agent depends on the keys. On the first receipt of
 // keys setup the agent. For subsequent key set handle the key change
 // keys setup the agent. For subsequent key set handle the key change
 func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
 func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
-	c.Lock()
-	existingKeys := c.keys
-	clusterConfigAvailable := c.clusterConfigAvailable
-	agent := c.agent
-	c.Unlock()
-
 	subsysKeys := make(map[string]int)
 	subsysKeys := make(map[string]int)
 	for _, key := range keys {
 	for _, key := range keys {
 		if key.Subsystem != subsysGossip &&
 		if key.Subsystem != subsysGossip &&
@@ -282,19 +285,8 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
 		}
 		}
 	}
 	}
 
 
-	if len(existingKeys) == 0 {
-		c.Lock()
-		c.keys = keys
-		c.Unlock()
-		if agent != nil {
-			return (fmt.Errorf("libnetwork agent setup without keys"))
-		}
-		if clusterConfigAvailable {
-			return c.agentSetup()
-		}
-		logrus.Debug("received encryption keys before cluster config")
-		return nil
-	}
+	agent := c.getAgent()
+
 	if agent == nil {
 	if agent == nil {
 		c.Lock()
 		c.Lock()
 		c.keys = keys
 		c.keys = keys
@@ -312,24 +304,32 @@ func (c *controller) getAgent() *agent {
 
 
 func (c *controller) clusterAgentInit() {
 func (c *controller) clusterAgentInit() {
 	clusterProvider := c.cfg.Daemon.ClusterProvider
 	clusterProvider := c.cfg.Daemon.ClusterProvider
+	var keysAvailable bool
 	for {
 	for {
-		select {
-		case <-clusterProvider.ListenClusterEvents():
-			if !c.isDistributedControl() {
-				c.Lock()
-				c.clusterConfigAvailable = true
-				keys := c.keys
-				c.Unlock()
-				// agent initialization needs encryption keys and bind/remote IP which
-				// comes from the daemon cluster events
-				if len(keys) > 0 {
-					c.agentSetup()
+		eventType := <-clusterProvider.ListenClusterEvents()
+		// The events: EventSocketChange, EventNodeReady and EventNetworkKeysAvailable are not ordered
+		// when all the condition for the agent initialization are met then proceed with it
+		switch eventType {
+		case cluster.EventNetworkKeysAvailable:
+			// Validates that the keys are actually available before starting the initialization
+			// This will handle old spurious messages left on the channel
+			c.Lock()
+			keysAvailable = c.keys != nil
+			c.Unlock()
+			fallthrough
+		case cluster.EventSocketChange, cluster.EventNodeReady:
+			if keysAvailable && !c.isDistributedControl() {
+				c.agentOperationStart()
+				if err := c.agentSetup(clusterProvider); err != nil {
+					c.agentStopComplete()
+				} else {
+					c.agentInitComplete()
 				}
 				}
 			}
 			}
-		case <-c.cfg.Daemon.DisableProvider:
+		case cluster.EventNodeLeave:
+			keysAvailable = false
+			c.agentOperationStart()
 			c.Lock()
 			c.Lock()
-			c.clusterConfigAvailable = false
-			c.agentInitDone = make(chan struct{})
 			c.keys = nil
 			c.keys = nil
 			c.Unlock()
 			c.Unlock()
 
 
@@ -343,20 +343,14 @@ func (c *controller) clusterAgentInit() {
 			c.agentClose()
 			c.agentClose()
 			c.cleanupServiceBindings("")
 			c.cleanupServiceBindings("")
 
 
-			c.Lock()
-			if c.agentStopDone != nil {
-				close(c.agentStopDone)
-				c.agentStopDone = nil
-			}
-			c.Unlock()
+			c.agentStopComplete()
 
 
 			return
 			return
 		}
 		}
 	}
 	}
 }
 }
 
 
-// AgentInitWait waits for agent initialization to be completed in the
-// controller.
+// AgentInitWait waits for agent initialization to be completed in the controller.
 func (c *controller) AgentInitWait() {
 func (c *controller) AgentInitWait() {
 	c.Lock()
 	c.Lock()
 	agentInitDone := c.agentInitDone
 	agentInitDone := c.agentInitDone
@@ -367,6 +361,7 @@ func (c *controller) AgentInitWait() {
 	}
 	}
 }
 }
 
 
+// AgentStopWait waits for the Agent stop to be completed in the controller
 func (c *controller) AgentStopWait() {
 func (c *controller) AgentStopWait() {
 	c.Lock()
 	c.Lock()
 	agentStopDone := c.agentStopDone
 	agentStopDone := c.agentStopDone
@@ -376,6 +371,38 @@ func (c *controller) AgentStopWait() {
 	}
 	}
 }
 }
 
 
+// agentOperationStart marks the start of an Agent Init or Agent Stop
+func (c *controller) agentOperationStart() {
+	c.Lock()
+	if c.agentInitDone == nil {
+		c.agentInitDone = make(chan struct{})
+	}
+	if c.agentStopDone == nil {
+		c.agentStopDone = make(chan struct{})
+	}
+	c.Unlock()
+}
+
+// agentInitComplete notifies the successful completion of the Agent initialization
+func (c *controller) agentInitComplete() {
+	c.Lock()
+	if c.agentInitDone != nil {
+		close(c.agentInitDone)
+		c.agentInitDone = nil
+	}
+	c.Unlock()
+}
+
+// agentStopComplete notifies the successful completion of the Agent stop
+func (c *controller) agentStopComplete() {
+	c.Lock()
+	if c.agentStopDone != nil {
+		close(c.agentStopDone)
+		c.agentStopDone = nil
+	}
+	c.Unlock()
+}
+
 func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
 func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
 	if c.cfg == nil {
 	if c.cfg == nil {
 		return nil
 		return nil