From e2ec006797fa14f59bcf7b9c23505ccdf1d3ded3 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sun, 30 Apr 2017 14:51:43 -0700 Subject: [PATCH 1/2] Fix race condition between swarm and libnetwork This commit in conjunction with a libnetwork side commit, cleans up the libnetwork SetClusterProvider logic interaction. The previous code was inducing libnetwork to spawn several go routines that were racing between each other during the agent init and close. A test got added to verify that back to back swarm init and leave are properly processed and not raise crashes Signed-off-by: Flavio Crisciani --- cmd/dockerd/daemon.go | 6 +++- daemon/cluster.go | 2 ++ daemon/cluster/cluster.go | 35 ++++++++++++++++++------ daemon/cluster/noderunner.go | 6 ++-- daemon/cluster/swarm.go | 1 - daemon/network.go | 10 +++++-- integration-cli/docker_cli_swarm_test.go | 21 ++++++++++++++ 7 files changed, 66 insertions(+), 15 deletions(-) diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 2e3f11f84e..3b6d0f014c 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -300,6 +300,11 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) { if err != nil { logrus.Fatalf("Error creating cluster component: %v", err) } + d.SetCluster(c) + err = c.Start() + if err != nil { + logrus.Fatalf("Error starting cluster component: %v", err) + } // Restart all autostart containers which has a swarm endpoint // and is not yet running now that we have successfully @@ -316,7 +321,6 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) { cli.d = d - d.SetCluster(c) initRouter(api, d, c) cli.setupConfigReloadTrap() diff --git a/daemon/cluster.go b/daemon/cluster.go index b7970edbb5..d22970bcd7 100644 --- a/daemon/cluster.go +++ b/daemon/cluster.go @@ -2,12 +2,14 @@ package daemon import ( apitypes "github.com/docker/docker/api/types" + lncluster "github.com/docker/libnetwork/cluster" ) // Cluster is the interface for github.com/docker/docker/daemon/cluster.(*Cluster). type Cluster interface { ClusterStatus NetworkManager + SendClusterEvent(event lncluster.ConfigEventType) } // ClusterStatus interface provides information about the Swarm status of the Cluster diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 6a00a4d7b8..aeefa360fd 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -51,6 +51,7 @@ import ( types "github.com/docker/docker/api/types/swarm" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/docker/pkg/signal" + lncluster "github.com/docker/libnetwork/cluster" swarmapi "github.com/docker/swarmkit/api" swarmnode "github.com/docker/swarmkit/node" "github.com/pkg/errors" @@ -115,7 +116,7 @@ type Cluster struct { root string runtimeRoot string config Config - configEvent chan struct{} // todo: make this array and goroutine safe + configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe attachers map[string]*attacher } @@ -147,22 +148,30 @@ func New(config Config) (*Cluster, error) { c := &Cluster{ root: root, config: config, - configEvent: make(chan struct{}, 10), + configEvent: make(chan lncluster.ConfigEventType, 10), runtimeRoot: config.RuntimeRoot, attachers: make(map[string]*attacher), } + return c, nil +} + +// Start the Cluster instance +// TODO The split between New and Start can be join again when the SendClusterEvent +// method is no longer required +func (c *Cluster) Start() error { + root := filepath.Join(c.config.Root, swarmDirName) nodeConfig, err := loadPersistentState(root) if err != nil { if os.IsNotExist(err) { - return c, nil + return nil } - return nil, err + return err } nr, err := c.newNodeRunner(*nodeConfig) if err != nil { - return nil, err + return err } c.nr = nr @@ -172,10 +181,10 @@ func New(config Config) (*Cluster, error) { case err := <-nr.Ready(): if err != nil { logrus.WithError(err).Error("swarm component could not be started") - return c, nil + return nil } } - return c, nil + return nil } func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) { @@ -308,7 +317,7 @@ func (c *Cluster) getRemoteAddressList() []string { // ListenClusterEvents returns a channel that receives messages on cluster // participation changes. // todo: make cancelable and accessible to multiple callers -func (c *Cluster) ListenClusterEvents() <-chan struct{} { +func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType { return c.configEvent } @@ -413,3 +422,13 @@ func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeSta return fn(ctx, state) } + +// SendClusterEvent allows to send cluster events on the configEvent channel +// TODO This method should not be exposed. +// Currently it is used to notify the network controller that the keys are +// available +func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) { + c.mu.RLock() + defer c.mu.RUnlock() + c.configEvent <- event +} diff --git a/daemon/cluster/noderunner.go b/daemon/cluster/noderunner.go index 13947fa7b0..2b18d99da1 100644 --- a/daemon/cluster/noderunner.go +++ b/daemon/cluster/noderunner.go @@ -11,6 +11,7 @@ import ( "github.com/Sirupsen/logrus" types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/executor/container" + lncluster "github.com/docker/libnetwork/cluster" swarmapi "github.com/docker/swarmkit/api" swarmnode "github.com/docker/swarmkit/node" "github.com/pkg/errors" @@ -162,7 +163,7 @@ func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmn } n.grpcConn = conn n.mu.Unlock() - n.cluster.configEvent <- struct{}{} + n.cluster.SendClusterEvent(lncluster.EventSocketChange) } } @@ -175,7 +176,7 @@ func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, close(ready) case <-ctx.Done(): } - n.cluster.configEvent <- struct{}{} + n.cluster.SendClusterEvent(lncluster.EventNodeReady) } func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) { @@ -217,6 +218,7 @@ func (n *nodeRunner) Stop() error { if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { return err } + n.cluster.SendClusterEvent(lncluster.EventNodeLeave) <-n.done return nil } diff --git a/daemon/cluster/swarm.go b/daemon/cluster/swarm.go index 8db4f3621f..8a8d5bf808 100644 --- a/daemon/cluster/swarm.go +++ b/daemon/cluster/swarm.go @@ -388,7 +388,6 @@ func (c *Cluster) Leave(force bool) error { } } - c.configEvent <- struct{}{} // todo: cleanup optional? if err := clearPersistentState(c.root); err != nil { return err diff --git a/daemon/network.go b/daemon/network.go index df39c7c96e..781953686a 100644 --- a/daemon/network.go +++ b/daemon/network.go @@ -16,6 +16,7 @@ import ( "github.com/docker/docker/pkg/plugingetter" "github.com/docker/docker/runconfig" "github.com/docker/libnetwork" + lncluster "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" networktypes "github.com/docker/libnetwork/types" @@ -207,7 +208,6 @@ func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip func (daemon *Daemon) releaseIngress(id string) { controller := daemon.netController - if err := controller.SandboxDestroy("ingress-sbox"); err != nil { logrus.Errorf("Failed to delete ingress sandbox: %v", err) } @@ -233,13 +233,17 @@ func (daemon *Daemon) releaseIngress(id string) { logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err) return } - return } // SetNetworkBootstrapKeys sets the bootstrap keys. func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error { - return daemon.netController.SetKeys(keys) + err := daemon.netController.SetKeys(keys) + if err == nil { + // Upon successful key setting dispatch the keys available event + daemon.cluster.SendClusterEvent(lncluster.EventNetworkKeysAvailable) + } + return err } // UpdateAttachment notifies the attacher about the attachment config. diff --git a/integration-cli/docker_cli_swarm_test.go b/integration-cli/docker_cli_swarm_test.go index 722dbabf4c..9e45167304 100644 --- a/integration-cli/docker_cli_swarm_test.go +++ b/integration-cli/docker_cli_swarm_test.go @@ -1980,3 +1980,24 @@ func (s *DockerSwarmSuite) TestSwarmInitUnspecifiedDataPathAddr(c *check.C) { c.Assert(err, checker.NotNil) c.Assert(out, checker.Contains, "data path address must be a non-zero IP") } + +func (s *DockerSwarmSuite) TestSwarmJoinLeave(c *check.C) { + d := s.AddDaemon(c, true, true) + + out, err := d.Cmd("swarm", "join-token", "-q", "worker") + c.Assert(err, checker.IsNil) + c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "") + + token := strings.TrimSpace(out) + + // Verify that back to back join/leave does not cause panics + d1 := s.AddDaemon(c, false, false) + for i := 0; i < 10; i++ { + out, err = d1.Cmd("swarm", "join", "--token", token, d.ListenAddr) + c.Assert(err, checker.IsNil) + c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "") + + _, err = d1.Cmd("swarm", "leave") + c.Assert(err, checker.IsNil) + } +} From 385176980ef6c6f322c78d76bd2f98875e6aee69 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 9 May 2017 18:27:34 -0700 Subject: [PATCH 2/2] Libnetwork vendoring Signed-off-by: Flavio Crisciani --- vendor.conf | 2 +- vendor/github.com/docker/libnetwork/agent.go | 61 ++++----- .../docker/libnetwork/cluster/provider.go | 16 ++- .../docker/libnetwork/config/config.go | 4 +- .../docker/libnetwork/controller.go | 121 +++++++++++------- 5 files changed, 117 insertions(+), 87 deletions(-) diff --git a/vendor.conf b/vendor.conf index 1dfcb16d3b..adfa5982cd 100644 --- a/vendor.conf +++ b/vendor.conf @@ -25,7 +25,7 @@ github.com/imdario/mergo 0.2.1 golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 #get libnetwork packages -github.com/docker/libnetwork b015d4b1bcf4e666d8950651c8cc825a02842e7a +github.com/docker/libnetwork 6786135bf7de08ec26a72a6f7e4291d27d113a3f github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/vendor/github.com/docker/libnetwork/agent.go b/vendor/github.com/docker/libnetwork/agent.go index f81f6c322a..b4b7bdf693 100644 --- a/vendor/github.com/docker/libnetwork/agent.go +++ b/vendor/github.com/docker/libnetwork/agent.go @@ -13,6 +13,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" + "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/discoverapi" "github.com/docker/libnetwork/driverapi" @@ -40,7 +41,7 @@ type agent struct { bindAddr string advertiseAddr string dataPathAddr string - epTblCancel func() + coreCancelFuncs []func() driverCancelFuncs map[string][]func() sync.Mutex } @@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { return nil } -func (c *controller) agentSetup() error { - c.Lock() - clusterProvider := c.cfg.Daemon.ClusterProvider - agent := c.agent - c.Unlock() +func (c *controller) agentSetup(clusterProvider cluster.Provider) error { + agent := c.getAgent() - if clusterProvider == nil { - msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset" - logrus.Errorf(msg) - return fmt.Errorf(msg) + // If the agent is already present there is no need to try to initilize it again + if agent != nil { + return nil } bindAddr := clusterProvider.GetLocalAddress() @@ -221,15 +218,15 @@ func (c *controller) agentSetup() error { listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList) if advAddr != "" && agent == nil { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil { - logrus.Errorf("Error in agentInit : %v", err) - } else { - c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { - if capability.DataScope == datastore.GlobalScope { - c.agentDriverNotify(driver) - } - return false - }) + logrus.Errorf("error in agentInit: %v", err) + return err } + c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { + if capability.DataScope == datastore.GlobalScope { + c.agentDriverNotify(driver) + } + return false + }) } if len(remoteAddrList) > 0 { @@ -238,14 +235,6 @@ func (c *controller) agentSetup() error { } } - c.Lock() - if c.agent != nil && c.agentInitDone != nil { - close(c.agentInitDone) - c.agentInitDone = nil - c.agentStopDone = make(chan struct{}) - } - c.Unlock() - return nil } @@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) { } func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error { - if !c.isAgent() { - return nil - } - bindAddr, err := resolveAddr(bindAddrOrInterface) if err != nil { return err } - keys, tags := c.getKeys(subsysGossip) + keys, _ := c.getKeys(subsysGossip) hostname, _ := os.Hostname() nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) logrus.Info("Gossip cluster hostname ", nodeName) @@ -312,8 +297,11 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d return err } + var cancelList []func() ch, cancel := nDB.Watch(libnetworkEPTable, "", "") + cancelList = append(cancelList, cancel) nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "") + cancelList = append(cancelList, cancel) c.Lock() c.agent = &agent{ @@ -321,7 +309,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d bindAddr: bindAddr, advertiseAddr: advertiseAddr, dataPathAddr: dataPathAddr, - epTblCancel: cancel, + coreCancelFuncs: cancelList, driverCancelFuncs: make(map[string][]func()), } c.Unlock() @@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d go c.handleTableEvents(nodeCh, c.handleNodeTableEvent) drvEnc := discoverapi.DriverEncryptionConfig{} - keys, tags = c.getKeys(subsysIPSec) + keys, tags := c.getKeys(subsysIPSec) drvEnc.Keys = keys drvEnc.Tags = tags @@ -399,14 +387,17 @@ func (c *controller) agentClose() { cancelList = append(cancelList, cancel) } } + + // Add also the cancel functions for the network db + for _, cancel := range agent.coreCancelFuncs { + cancelList = append(cancelList, cancel) + } agent.Unlock() for _, cancel := range cancelList { cancel() } - agent.epTblCancel() - agent.networkDB.Close() } diff --git a/vendor/github.com/docker/libnetwork/cluster/provider.go b/vendor/github.com/docker/libnetwork/cluster/provider.go index 36563080b3..491ccfd4b8 100644 --- a/vendor/github.com/docker/libnetwork/cluster/provider.go +++ b/vendor/github.com/docker/libnetwork/cluster/provider.go @@ -5,6 +5,20 @@ import ( "golang.org/x/net/context" ) +const ( + // EventSocketChange control socket changed + EventSocketChange = iota + // EventNodeReady cluster node in ready state + EventNodeReady + // EventNodeLeave node is leaving the cluster + EventNodeLeave + // EventNetworkKeysAvailable network keys correctly configured in the networking layer + EventNetworkKeysAvailable +) + +// ConfigEventType type of the event produced by the cluster +type ConfigEventType uint8 + // Provider provides clustering config details type Provider interface { IsManager() bool @@ -14,7 +28,7 @@ type Provider interface { GetAdvertiseAddress() string GetDataPathAddress() string GetRemoteAddressList() []string - ListenClusterEvents() <-chan struct{} + ListenClusterEvents() <-chan ConfigEventType AttachNetwork(string, string, []string) (*network.NetworkingConfig, error) DetachNetwork(string, string) error UpdateAttachment(string, string, *network.NetworkingConfig) error diff --git a/vendor/github.com/docker/libnetwork/config/config.go b/vendor/github.com/docker/libnetwork/config/config.go index ca87e3ac4d..3acb4320c4 100644 --- a/vendor/github.com/docker/libnetwork/config/config.go +++ b/vendor/github.com/docker/libnetwork/config/config.go @@ -34,7 +34,6 @@ type DaemonCfg struct { Labels []string DriverCfg map[string]interface{} ClusterProvider cluster.Provider - DisableProvider chan struct{} } // ClusterCfg represents cluster configuration @@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) { func ParseConfigOptions(cfgOptions ...Option) *Config { cfg := &Config{ Daemon: DaemonCfg{ - DriverCfg: make(map[string]interface{}), - DisableProvider: make(chan struct{}, 10), + DriverCfg: make(map[string]interface{}), }, Scopes: make(map[string]*datastore.ScopeCfg), } diff --git a/vendor/github.com/docker/libnetwork/controller.go b/vendor/github.com/docker/libnetwork/controller.go index 6a5eda04de..8b2a983d51 100644 --- a/vendor/github.com/docker/libnetwork/controller.go +++ b/vendor/github.com/docker/libnetwork/controller.go @@ -244,15 +244,24 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { } func (c *controller) SetClusterProvider(provider cluster.Provider) { + var sameProvider bool c.Lock() - c.cfg.Daemon.ClusterProvider = provider - disableProviderCh := c.cfg.Daemon.DisableProvider - c.Unlock() - if provider != nil { - go c.clusterAgentInit() + // Avoids to spawn multiple goroutine for the same cluster provider + if c.cfg.Daemon.ClusterProvider == provider { + // If the cluster provider is already set, there is already a go routine spawned + // that is listening for events, so nothing to do here + sameProvider = true } else { - disableProviderCh <- struct{}{} + c.cfg.Daemon.ClusterProvider = provider } + c.Unlock() + + if provider == nil || sameProvider { + return + } + // We don't want to spawn a new go routine if the previous one did not exit yet + c.AgentStopWait() + go c.clusterAgentInit() } func isValidClusteringIP(addr string) bool { @@ -262,12 +271,6 @@ func isValidClusteringIP(addr string) bool { // libnetwork side of agent depends on the keys. On the first receipt of // keys setup the agent. For subsequent key set handle the key change func (c *controller) SetKeys(keys []*types.EncryptionKey) error { - c.Lock() - existingKeys := c.keys - clusterConfigAvailable := c.clusterConfigAvailable - agent := c.agent - c.Unlock() - subsysKeys := make(map[string]int) for _, key := range keys { if key.Subsystem != subsysGossip && @@ -282,19 +285,8 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error { } } - if len(existingKeys) == 0 { - c.Lock() - c.keys = keys - c.Unlock() - if agent != nil { - return (fmt.Errorf("libnetwork agent setup without keys")) - } - if clusterConfigAvailable { - return c.agentSetup() - } - logrus.Debug("received encryption keys before cluster config") - return nil - } + agent := c.getAgent() + if agent == nil { c.Lock() c.keys = keys @@ -312,24 +304,32 @@ func (c *controller) getAgent() *agent { func (c *controller) clusterAgentInit() { clusterProvider := c.cfg.Daemon.ClusterProvider + var keysAvailable bool for { - select { - case <-clusterProvider.ListenClusterEvents(): - if !c.isDistributedControl() { - c.Lock() - c.clusterConfigAvailable = true - keys := c.keys - c.Unlock() - // agent initialization needs encryption keys and bind/remote IP which - // comes from the daemon cluster events - if len(keys) > 0 { - c.agentSetup() + eventType := <-clusterProvider.ListenClusterEvents() + // The events: EventSocketChange, EventNodeReady and EventNetworkKeysAvailable are not ordered + // when all the condition for the agent initialization are met then proceed with it + switch eventType { + case cluster.EventNetworkKeysAvailable: + // Validates that the keys are actually available before starting the initialization + // This will handle old spurious messages left on the channel + c.Lock() + keysAvailable = c.keys != nil + c.Unlock() + fallthrough + case cluster.EventSocketChange, cluster.EventNodeReady: + if keysAvailable && !c.isDistributedControl() { + c.agentOperationStart() + if err := c.agentSetup(clusterProvider); err != nil { + c.agentStopComplete() + } else { + c.agentInitComplete() } } - case <-c.cfg.Daemon.DisableProvider: + case cluster.EventNodeLeave: + keysAvailable = false + c.agentOperationStart() c.Lock() - c.clusterConfigAvailable = false - c.agentInitDone = make(chan struct{}) c.keys = nil c.Unlock() @@ -343,20 +343,14 @@ func (c *controller) clusterAgentInit() { c.agentClose() c.cleanupServiceBindings("") - c.Lock() - if c.agentStopDone != nil { - close(c.agentStopDone) - c.agentStopDone = nil - } - c.Unlock() + c.agentStopComplete() return } } } -// AgentInitWait waits for agent initialization to be completed in the -// controller. +// AgentInitWait waits for agent initialization to be completed in the controller. func (c *controller) AgentInitWait() { c.Lock() agentInitDone := c.agentInitDone @@ -367,6 +361,7 @@ func (c *controller) AgentInitWait() { } } +// AgentStopWait waits for the Agent stop to be completed in the controller func (c *controller) AgentStopWait() { c.Lock() agentStopDone := c.agentStopDone @@ -376,6 +371,38 @@ func (c *controller) AgentStopWait() { } } +// agentOperationStart marks the start of an Agent Init or Agent Stop +func (c *controller) agentOperationStart() { + c.Lock() + if c.agentInitDone == nil { + c.agentInitDone = make(chan struct{}) + } + if c.agentStopDone == nil { + c.agentStopDone = make(chan struct{}) + } + c.Unlock() +} + +// agentInitComplete notifies the successful completion of the Agent initialization +func (c *controller) agentInitComplete() { + c.Lock() + if c.agentInitDone != nil { + close(c.agentInitDone) + c.agentInitDone = nil + } + c.Unlock() +} + +// agentStopComplete notifies the successful completion of the Agent stop +func (c *controller) agentStopComplete() { + c.Lock() + if c.agentStopDone != nil { + close(c.agentStopDone) + c.agentStopDone = nil + } + c.Unlock() +} + func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { if c.cfg == nil { return nil