Merge pull request #32906 from fcrisciani/init_race_cond
Race condition fix for swarm join/leave
This commit is contained in:
commit
77d5a0996f
12 changed files with 183 additions and 102 deletions
|
@ -300,6 +300,11 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) {
|
|||
if err != nil {
|
||||
logrus.Fatalf("Error creating cluster component: %v", err)
|
||||
}
|
||||
d.SetCluster(c)
|
||||
err = c.Start()
|
||||
if err != nil {
|
||||
logrus.Fatalf("Error starting cluster component: %v", err)
|
||||
}
|
||||
|
||||
// Restart all autostart containers which has a swarm endpoint
|
||||
// and is not yet running now that we have successfully
|
||||
|
@ -316,7 +321,6 @@ func (cli *DaemonCli) start(opts daemonOptions) (err error) {
|
|||
|
||||
cli.d = d
|
||||
|
||||
d.SetCluster(c)
|
||||
initRouter(api, d, c)
|
||||
|
||||
cli.setupConfigReloadTrap()
|
||||
|
|
|
@ -2,12 +2,14 @@ package daemon
|
|||
|
||||
import (
|
||||
apitypes "github.com/docker/docker/api/types"
|
||||
lncluster "github.com/docker/libnetwork/cluster"
|
||||
)
|
||||
|
||||
// Cluster is the interface for github.com/docker/docker/daemon/cluster.(*Cluster).
|
||||
type Cluster interface {
|
||||
ClusterStatus
|
||||
NetworkManager
|
||||
SendClusterEvent(event lncluster.ConfigEventType)
|
||||
}
|
||||
|
||||
// ClusterStatus interface provides information about the Swarm status of the Cluster
|
||||
|
|
|
@ -51,6 +51,7 @@ import (
|
|||
types "github.com/docker/docker/api/types/swarm"
|
||||
executorpkg "github.com/docker/docker/daemon/cluster/executor"
|
||||
"github.com/docker/docker/pkg/signal"
|
||||
lncluster "github.com/docker/libnetwork/cluster"
|
||||
swarmapi "github.com/docker/swarmkit/api"
|
||||
swarmnode "github.com/docker/swarmkit/node"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -115,7 +116,7 @@ type Cluster struct {
|
|||
root string
|
||||
runtimeRoot string
|
||||
config Config
|
||||
configEvent chan struct{} // todo: make this array and goroutine safe
|
||||
configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe
|
||||
attachers map[string]*attacher
|
||||
}
|
||||
|
||||
|
@ -147,22 +148,30 @@ func New(config Config) (*Cluster, error) {
|
|||
c := &Cluster{
|
||||
root: root,
|
||||
config: config,
|
||||
configEvent: make(chan struct{}, 10),
|
||||
configEvent: make(chan lncluster.ConfigEventType, 10),
|
||||
runtimeRoot: config.RuntimeRoot,
|
||||
attachers: make(map[string]*attacher),
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Start the Cluster instance
|
||||
// TODO The split between New and Start can be join again when the SendClusterEvent
|
||||
// method is no longer required
|
||||
func (c *Cluster) Start() error {
|
||||
root := filepath.Join(c.config.Root, swarmDirName)
|
||||
|
||||
nodeConfig, err := loadPersistentState(root)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return c, nil
|
||||
return nil
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
nr, err := c.newNodeRunner(*nodeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
c.nr = nr
|
||||
|
||||
|
@ -172,10 +181,10 @@ func New(config Config) (*Cluster, error) {
|
|||
case err := <-nr.Ready():
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("swarm component could not be started")
|
||||
return c, nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return c, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
|
||||
|
@ -308,7 +317,7 @@ func (c *Cluster) getRemoteAddressList() []string {
|
|||
// ListenClusterEvents returns a channel that receives messages on cluster
|
||||
// participation changes.
|
||||
// todo: make cancelable and accessible to multiple callers
|
||||
func (c *Cluster) ListenClusterEvents() <-chan struct{} {
|
||||
func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
|
||||
return c.configEvent
|
||||
}
|
||||
|
||||
|
@ -413,3 +422,13 @@ func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeSta
|
|||
|
||||
return fn(ctx, state)
|
||||
}
|
||||
|
||||
// SendClusterEvent allows to send cluster events on the configEvent channel
|
||||
// TODO This method should not be exposed.
|
||||
// Currently it is used to notify the network controller that the keys are
|
||||
// available
|
||||
func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.configEvent <- event
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
types "github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/daemon/cluster/executor/container"
|
||||
lncluster "github.com/docker/libnetwork/cluster"
|
||||
swarmapi "github.com/docker/swarmkit/api"
|
||||
swarmnode "github.com/docker/swarmkit/node"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -162,7 +163,7 @@ func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmn
|
|||
}
|
||||
n.grpcConn = conn
|
||||
n.mu.Unlock()
|
||||
n.cluster.configEvent <- struct{}{}
|
||||
n.cluster.SendClusterEvent(lncluster.EventSocketChange)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,7 +176,7 @@ func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node,
|
|||
close(ready)
|
||||
case <-ctx.Done():
|
||||
}
|
||||
n.cluster.configEvent <- struct{}{}
|
||||
n.cluster.SendClusterEvent(lncluster.EventNodeReady)
|
||||
}
|
||||
|
||||
func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
|
||||
|
@ -217,6 +218,7 @@ func (n *nodeRunner) Stop() error {
|
|||
if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
|
||||
return err
|
||||
}
|
||||
n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
|
||||
<-n.done
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -388,7 +388,6 @@ func (c *Cluster) Leave(force bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
c.configEvent <- struct{}{}
|
||||
// todo: cleanup optional?
|
||||
if err := clearPersistentState(c.root); err != nil {
|
||||
return err
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/docker/docker/pkg/plugingetter"
|
||||
"github.com/docker/docker/runconfig"
|
||||
"github.com/docker/libnetwork"
|
||||
lncluster "github.com/docker/libnetwork/cluster"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/ipamapi"
|
||||
networktypes "github.com/docker/libnetwork/types"
|
||||
|
@ -207,7 +208,6 @@ func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip
|
|||
|
||||
func (daemon *Daemon) releaseIngress(id string) {
|
||||
controller := daemon.netController
|
||||
|
||||
if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
|
||||
logrus.Errorf("Failed to delete ingress sandbox: %v", err)
|
||||
}
|
||||
|
@ -233,13 +233,17 @@ func (daemon *Daemon) releaseIngress(id string) {
|
|||
logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// SetNetworkBootstrapKeys sets the bootstrap keys.
|
||||
func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error {
|
||||
return daemon.netController.SetKeys(keys)
|
||||
err := daemon.netController.SetKeys(keys)
|
||||
if err == nil {
|
||||
// Upon successful key setting dispatch the keys available event
|
||||
daemon.cluster.SendClusterEvent(lncluster.EventNetworkKeysAvailable)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateAttachment notifies the attacher about the attachment config.
|
||||
|
|
|
@ -1980,3 +1980,24 @@ func (s *DockerSwarmSuite) TestSwarmInitUnspecifiedDataPathAddr(c *check.C) {
|
|||
c.Assert(err, checker.NotNil)
|
||||
c.Assert(out, checker.Contains, "data path address must be a non-zero IP")
|
||||
}
|
||||
|
||||
func (s *DockerSwarmSuite) TestSwarmJoinLeave(c *check.C) {
|
||||
d := s.AddDaemon(c, true, true)
|
||||
|
||||
out, err := d.Cmd("swarm", "join-token", "-q", "worker")
|
||||
c.Assert(err, checker.IsNil)
|
||||
c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
|
||||
|
||||
token := strings.TrimSpace(out)
|
||||
|
||||
// Verify that back to back join/leave does not cause panics
|
||||
d1 := s.AddDaemon(c, false, false)
|
||||
for i := 0; i < 10; i++ {
|
||||
out, err = d1.Cmd("swarm", "join", "--token", token, d.ListenAddr)
|
||||
c.Assert(err, checker.IsNil)
|
||||
c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
|
||||
|
||||
_, err = d1.Cmd("swarm", "leave")
|
||||
c.Assert(err, checker.IsNil)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ github.com/imdario/mergo 0.2.1
|
|||
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
|
||||
|
||||
#get libnetwork packages
|
||||
github.com/docker/libnetwork b015d4b1bcf4e666d8950651c8cc825a02842e7a
|
||||
github.com/docker/libnetwork 6786135bf7de08ec26a72a6f7e4291d27d113a3f
|
||||
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
|
||||
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
|
||||
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec
|
||||
|
|
61
vendor/github.com/docker/libnetwork/agent.go
generated
vendored
61
vendor/github.com/docker/libnetwork/agent.go
generated
vendored
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/docker/go-events"
|
||||
"github.com/docker/libnetwork/cluster"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/discoverapi"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
|
@ -40,7 +41,7 @@ type agent struct {
|
|||
bindAddr string
|
||||
advertiseAddr string
|
||||
dataPathAddr string
|
||||
epTblCancel func()
|
||||
coreCancelFuncs []func()
|
||||
driverCancelFuncs map[string][]func()
|
||||
sync.Mutex
|
||||
}
|
||||
|
@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) agentSetup() error {
|
||||
c.Lock()
|
||||
clusterProvider := c.cfg.Daemon.ClusterProvider
|
||||
agent := c.agent
|
||||
c.Unlock()
|
||||
func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
|
||||
agent := c.getAgent()
|
||||
|
||||
if clusterProvider == nil {
|
||||
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
|
||||
logrus.Errorf(msg)
|
||||
return fmt.Errorf(msg)
|
||||
// If the agent is already present there is no need to try to initilize it again
|
||||
if agent != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
bindAddr := clusterProvider.GetLocalAddress()
|
||||
|
@ -221,15 +218,15 @@ func (c *controller) agentSetup() error {
|
|||
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
|
||||
if advAddr != "" && agent == nil {
|
||||
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
|
||||
logrus.Errorf("Error in agentInit : %v", err)
|
||||
} else {
|
||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||
if capability.DataScope == datastore.GlobalScope {
|
||||
c.agentDriverNotify(driver)
|
||||
}
|
||||
return false
|
||||
})
|
||||
logrus.Errorf("error in agentInit: %v", err)
|
||||
return err
|
||||
}
|
||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||
if capability.DataScope == datastore.GlobalScope {
|
||||
c.agentDriverNotify(driver)
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
if len(remoteAddrList) > 0 {
|
||||
|
@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
|
|||
}
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
if c.agent != nil && c.agentInitDone != nil {
|
||||
close(c.agentInitDone)
|
||||
c.agentInitDone = nil
|
||||
c.agentStopDone = make(chan struct{})
|
||||
}
|
||||
c.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
|
|||
}
|
||||
|
||||
func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
|
||||
if !c.isAgent() {
|
||||
return nil
|
||||
}
|
||||
|
||||
bindAddr, err := resolveAddr(bindAddrOrInterface)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keys, tags := c.getKeys(subsysGossip)
|
||||
keys, _ := c.getKeys(subsysGossip)
|
||||
hostname, _ := os.Hostname()
|
||||
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
|
||||
logrus.Info("Gossip cluster hostname ", nodeName)
|
||||
|
@ -312,8 +297,11 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
return err
|
||||
}
|
||||
|
||||
var cancelList []func()
|
||||
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
|
||||
cancelList = append(cancelList, cancel)
|
||||
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
|
||||
cancelList = append(cancelList, cancel)
|
||||
|
||||
c.Lock()
|
||||
c.agent = &agent{
|
||||
|
@ -321,7 +309,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
bindAddr: bindAddr,
|
||||
advertiseAddr: advertiseAddr,
|
||||
dataPathAddr: dataPathAddr,
|
||||
epTblCancel: cancel,
|
||||
coreCancelFuncs: cancelList,
|
||||
driverCancelFuncs: make(map[string][]func()),
|
||||
}
|
||||
c.Unlock()
|
||||
|
@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
|
||||
|
||||
drvEnc := discoverapi.DriverEncryptionConfig{}
|
||||
keys, tags = c.getKeys(subsysIPSec)
|
||||
keys, tags := c.getKeys(subsysIPSec)
|
||||
drvEnc.Keys = keys
|
||||
drvEnc.Tags = tags
|
||||
|
||||
|
@ -399,14 +387,17 @@ func (c *controller) agentClose() {
|
|||
cancelList = append(cancelList, cancel)
|
||||
}
|
||||
}
|
||||
|
||||
// Add also the cancel functions for the network db
|
||||
for _, cancel := range agent.coreCancelFuncs {
|
||||
cancelList = append(cancelList, cancel)
|
||||
}
|
||||
agent.Unlock()
|
||||
|
||||
for _, cancel := range cancelList {
|
||||
cancel()
|
||||
}
|
||||
|
||||
agent.epTblCancel()
|
||||
|
||||
agent.networkDB.Close()
|
||||
}
|
||||
|
||||
|
|
16
vendor/github.com/docker/libnetwork/cluster/provider.go
generated
vendored
16
vendor/github.com/docker/libnetwork/cluster/provider.go
generated
vendored
|
@ -5,6 +5,20 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
// EventSocketChange control socket changed
|
||||
EventSocketChange = iota
|
||||
// EventNodeReady cluster node in ready state
|
||||
EventNodeReady
|
||||
// EventNodeLeave node is leaving the cluster
|
||||
EventNodeLeave
|
||||
// EventNetworkKeysAvailable network keys correctly configured in the networking layer
|
||||
EventNetworkKeysAvailable
|
||||
)
|
||||
|
||||
// ConfigEventType type of the event produced by the cluster
|
||||
type ConfigEventType uint8
|
||||
|
||||
// Provider provides clustering config details
|
||||
type Provider interface {
|
||||
IsManager() bool
|
||||
|
@ -14,7 +28,7 @@ type Provider interface {
|
|||
GetAdvertiseAddress() string
|
||||
GetDataPathAddress() string
|
||||
GetRemoteAddressList() []string
|
||||
ListenClusterEvents() <-chan struct{}
|
||||
ListenClusterEvents() <-chan ConfigEventType
|
||||
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
|
||||
DetachNetwork(string, string) error
|
||||
UpdateAttachment(string, string, *network.NetworkingConfig) error
|
||||
|
|
4
vendor/github.com/docker/libnetwork/config/config.go
generated
vendored
4
vendor/github.com/docker/libnetwork/config/config.go
generated
vendored
|
@ -34,7 +34,6 @@ type DaemonCfg struct {
|
|||
Labels []string
|
||||
DriverCfg map[string]interface{}
|
||||
ClusterProvider cluster.Provider
|
||||
DisableProvider chan struct{}
|
||||
}
|
||||
|
||||
// ClusterCfg represents cluster configuration
|
||||
|
@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
|
|||
func ParseConfigOptions(cfgOptions ...Option) *Config {
|
||||
cfg := &Config{
|
||||
Daemon: DaemonCfg{
|
||||
DriverCfg: make(map[string]interface{}),
|
||||
DisableProvider: make(chan struct{}, 10),
|
||||
DriverCfg: make(map[string]interface{}),
|
||||
},
|
||||
Scopes: make(map[string]*datastore.ScopeCfg),
|
||||
}
|
||||
|
|
121
vendor/github.com/docker/libnetwork/controller.go
generated
vendored
121
vendor/github.com/docker/libnetwork/controller.go
generated
vendored
|
@ -244,15 +244,24 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
}
|
||||
|
||||
func (c *controller) SetClusterProvider(provider cluster.Provider) {
|
||||
var sameProvider bool
|
||||
c.Lock()
|
||||
c.cfg.Daemon.ClusterProvider = provider
|
||||
disableProviderCh := c.cfg.Daemon.DisableProvider
|
||||
c.Unlock()
|
||||
if provider != nil {
|
||||
go c.clusterAgentInit()
|
||||
// Avoids to spawn multiple goroutine for the same cluster provider
|
||||
if c.cfg.Daemon.ClusterProvider == provider {
|
||||
// If the cluster provider is already set, there is already a go routine spawned
|
||||
// that is listening for events, so nothing to do here
|
||||
sameProvider = true
|
||||
} else {
|
||||
disableProviderCh <- struct{}{}
|
||||
c.cfg.Daemon.ClusterProvider = provider
|
||||
}
|
||||
c.Unlock()
|
||||
|
||||
if provider == nil || sameProvider {
|
||||
return
|
||||
}
|
||||
// We don't want to spawn a new go routine if the previous one did not exit yet
|
||||
c.AgentStopWait()
|
||||
go c.clusterAgentInit()
|
||||
}
|
||||
|
||||
func isValidClusteringIP(addr string) bool {
|
||||
|
@ -262,12 +271,6 @@ func isValidClusteringIP(addr string) bool {
|
|||
// libnetwork side of agent depends on the keys. On the first receipt of
|
||||
// keys setup the agent. For subsequent key set handle the key change
|
||||
func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
|
||||
c.Lock()
|
||||
existingKeys := c.keys
|
||||
clusterConfigAvailable := c.clusterConfigAvailable
|
||||
agent := c.agent
|
||||
c.Unlock()
|
||||
|
||||
subsysKeys := make(map[string]int)
|
||||
for _, key := range keys {
|
||||
if key.Subsystem != subsysGossip &&
|
||||
|
@ -282,19 +285,8 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
|
|||
}
|
||||
}
|
||||
|
||||
if len(existingKeys) == 0 {
|
||||
c.Lock()
|
||||
c.keys = keys
|
||||
c.Unlock()
|
||||
if agent != nil {
|
||||
return (fmt.Errorf("libnetwork agent setup without keys"))
|
||||
}
|
||||
if clusterConfigAvailable {
|
||||
return c.agentSetup()
|
||||
}
|
||||
logrus.Debug("received encryption keys before cluster config")
|
||||
return nil
|
||||
}
|
||||
agent := c.getAgent()
|
||||
|
||||
if agent == nil {
|
||||
c.Lock()
|
||||
c.keys = keys
|
||||
|
@ -312,24 +304,32 @@ func (c *controller) getAgent() *agent {
|
|||
|
||||
func (c *controller) clusterAgentInit() {
|
||||
clusterProvider := c.cfg.Daemon.ClusterProvider
|
||||
var keysAvailable bool
|
||||
for {
|
||||
select {
|
||||
case <-clusterProvider.ListenClusterEvents():
|
||||
if !c.isDistributedControl() {
|
||||
c.Lock()
|
||||
c.clusterConfigAvailable = true
|
||||
keys := c.keys
|
||||
c.Unlock()
|
||||
// agent initialization needs encryption keys and bind/remote IP which
|
||||
// comes from the daemon cluster events
|
||||
if len(keys) > 0 {
|
||||
c.agentSetup()
|
||||
eventType := <-clusterProvider.ListenClusterEvents()
|
||||
// The events: EventSocketChange, EventNodeReady and EventNetworkKeysAvailable are not ordered
|
||||
// when all the condition for the agent initialization are met then proceed with it
|
||||
switch eventType {
|
||||
case cluster.EventNetworkKeysAvailable:
|
||||
// Validates that the keys are actually available before starting the initialization
|
||||
// This will handle old spurious messages left on the channel
|
||||
c.Lock()
|
||||
keysAvailable = c.keys != nil
|
||||
c.Unlock()
|
||||
fallthrough
|
||||
case cluster.EventSocketChange, cluster.EventNodeReady:
|
||||
if keysAvailable && !c.isDistributedControl() {
|
||||
c.agentOperationStart()
|
||||
if err := c.agentSetup(clusterProvider); err != nil {
|
||||
c.agentStopComplete()
|
||||
} else {
|
||||
c.agentInitComplete()
|
||||
}
|
||||
}
|
||||
case <-c.cfg.Daemon.DisableProvider:
|
||||
case cluster.EventNodeLeave:
|
||||
keysAvailable = false
|
||||
c.agentOperationStart()
|
||||
c.Lock()
|
||||
c.clusterConfigAvailable = false
|
||||
c.agentInitDone = make(chan struct{})
|
||||
c.keys = nil
|
||||
c.Unlock()
|
||||
|
||||
|
@ -343,20 +343,14 @@ func (c *controller) clusterAgentInit() {
|
|||
c.agentClose()
|
||||
c.cleanupServiceBindings("")
|
||||
|
||||
c.Lock()
|
||||
if c.agentStopDone != nil {
|
||||
close(c.agentStopDone)
|
||||
c.agentStopDone = nil
|
||||
}
|
||||
c.Unlock()
|
||||
c.agentStopComplete()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AgentInitWait waits for agent initialization to be completed in the
|
||||
// controller.
|
||||
// AgentInitWait waits for agent initialization to be completed in the controller.
|
||||
func (c *controller) AgentInitWait() {
|
||||
c.Lock()
|
||||
agentInitDone := c.agentInitDone
|
||||
|
@ -367,6 +361,7 @@ func (c *controller) AgentInitWait() {
|
|||
}
|
||||
}
|
||||
|
||||
// AgentStopWait waits for the Agent stop to be completed in the controller
|
||||
func (c *controller) AgentStopWait() {
|
||||
c.Lock()
|
||||
agentStopDone := c.agentStopDone
|
||||
|
@ -376,6 +371,38 @@ func (c *controller) AgentStopWait() {
|
|||
}
|
||||
}
|
||||
|
||||
// agentOperationStart marks the start of an Agent Init or Agent Stop
|
||||
func (c *controller) agentOperationStart() {
|
||||
c.Lock()
|
||||
if c.agentInitDone == nil {
|
||||
c.agentInitDone = make(chan struct{})
|
||||
}
|
||||
if c.agentStopDone == nil {
|
||||
c.agentStopDone = make(chan struct{})
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
// agentInitComplete notifies the successful completion of the Agent initialization
|
||||
func (c *controller) agentInitComplete() {
|
||||
c.Lock()
|
||||
if c.agentInitDone != nil {
|
||||
close(c.agentInitDone)
|
||||
c.agentInitDone = nil
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
// agentStopComplete notifies the successful completion of the Agent stop
|
||||
func (c *controller) agentStopComplete() {
|
||||
c.Lock()
|
||||
if c.agentStopDone != nil {
|
||||
close(c.agentStopDone)
|
||||
c.agentStopDone = nil
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
|
||||
if c.cfg == nil {
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue