Ver código fonte

Wait for discovery on container start error

This gives discovery a chance to initialize, particularly if the K/V
store being used is in a container.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 9 anos atrás
pai
commit
2dce79e05a
3 arquivos alterados com 103 adições e 14 exclusões
  1. 30 0
      daemon/daemon.go
  2. 26 0
      daemon/daemon_test.go
  3. 47 14
      daemon/discovery.go

+ 30 - 0
daemon/daemon.go

@@ -380,6 +380,9 @@ func (daemon *Daemon) restore() error {
 					}
 					}
 				}
 				}
 			}
 			}
+
+			// Make sure networks are available before starting
+			daemon.waitForNetworks(c)
 			if err := daemon.containerStart(c); err != nil {
 			if err := daemon.containerStart(c); err != nil {
 				logrus.Errorf("Failed to start container %s: %s", c.ID, err)
 				logrus.Errorf("Failed to start container %s: %s", c.ID, err)
 			}
 			}
@@ -423,6 +426,33 @@ func (daemon *Daemon) restore() error {
 	return nil
 	return nil
 }
 }
 
 
+// waitForNetworks is used during daemon initialization when starting up containers
+// It ensures that all of a container's networks are available before the daemon tries to start the container.
+// In practice it just makes sure the discovery service is available for containers which use a network that require discovery.
+func (daemon *Daemon) waitForNetworks(c *container.Container) {
+	if daemon.discoveryWatcher == nil {
+		return
+	}
+	// Make sure if the container has a network that requires discovery that the discovery service is available before starting
+	for netName := range c.NetworkSettings.Networks {
+		// If we get `ErrNoSuchNetwork` here, it can assumed that it is due to discovery not being ready
+		// Most likely this is because the K/V store used for discovery is in a container and needs to be started
+		if _, err := daemon.netController.NetworkByName(netName); err != nil {
+			if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
+				continue
+			}
+			// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
+			// FIXME: why is this slow???
+			logrus.Debugf("Container %s waiting for network to be ready", c.Name)
+			select {
+			case <-daemon.discoveryWatcher.ReadyCh():
+			case <-time.After(60 * time.Second):
+			}
+			return
+		}
+	}
+}
+
 func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error {
 func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error {
 	if img != nil && img.Config != nil {
 	if img != nil && img.Config != nil {
 		if err := merge(config, img.Config); err != nil {
 		if err := merge(config, img.Config); err != nil {

+ 26 - 0
daemon/daemon_test.go

@@ -381,6 +381,12 @@ func TestDaemonDiscoveryReload(t *testing.T) {
 		&discovery.Entry{Host: "127.0.0.1", Port: "3333"},
 		&discovery.Entry{Host: "127.0.0.1", Port: "3333"},
 	}
 	}
 
 
+	select {
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for discovery")
+	case <-daemon.discoveryWatcher.ReadyCh():
+	}
+
 	stopCh := make(chan struct{})
 	stopCh := make(chan struct{})
 	defer close(stopCh)
 	defer close(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
@@ -414,6 +420,13 @@ func TestDaemonDiscoveryReload(t *testing.T) {
 	if err := daemon.Reload(newConfig); err != nil {
 	if err := daemon.Reload(newConfig); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+
+	select {
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for discovery")
+	case <-daemon.discoveryWatcher.ReadyCh():
+	}
+
 	ch, errCh = daemon.discoveryWatcher.Watch(stopCh)
 	ch, errCh = daemon.discoveryWatcher.Watch(stopCh)
 
 
 	select {
 	select {
@@ -450,6 +463,13 @@ func TestDaemonDiscoveryReloadFromEmptyDiscovery(t *testing.T) {
 	if err := daemon.Reload(newConfig); err != nil {
 	if err := daemon.Reload(newConfig); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+
+	select {
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for discovery")
+	case <-daemon.discoveryWatcher.ReadyCh():
+	}
+
 	stopCh := make(chan struct{})
 	stopCh := make(chan struct{})
 	defer close(stopCh)
 	defer close(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
@@ -488,6 +508,12 @@ func TestDaemonDiscoveryReloadOnlyClusterAdvertise(t *testing.T) {
 	if err := daemon.Reload(newConfig); err != nil {
 	if err := daemon.Reload(newConfig); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+
+	select {
+	case <-daemon.discoveryWatcher.ReadyCh():
+	case <-time.After(10 * time.Second):
+		t.Fatal("Timeout waiting for discovery")
+	}
 	stopCh := make(chan struct{})
 	stopCh := make(chan struct{})
 	defer close(stopCh)
 	defer close(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
 	ch, errCh := daemon.discoveryWatcher.Watch(stopCh)

+ 47 - 14
daemon/discovery.go

@@ -27,18 +27,24 @@ type discoveryReloader interface {
 	discovery.Watcher
 	discovery.Watcher
 	Stop()
 	Stop()
 	Reload(backend, address string, clusterOpts map[string]string) error
 	Reload(backend, address string, clusterOpts map[string]string) error
+	ReadyCh() <-chan struct{}
 }
 }
 
 
 type daemonDiscoveryReloader struct {
 type daemonDiscoveryReloader struct {
 	backend discovery.Backend
 	backend discovery.Backend
 	ticker  *time.Ticker
 	ticker  *time.Ticker
 	term    chan bool
 	term    chan bool
+	readyCh chan struct{}
 }
 }
 
 
 func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
 func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
 	return d.backend.Watch(stopCh)
 	return d.backend.Watch(stopCh)
 }
 }
 
 
+func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
+	return d.readyCh
+}
+
 func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
 func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
 	var (
 	var (
 		heartbeat = defaultDiscoveryHeartbeat
 		heartbeat = defaultDiscoveryHeartbeat
@@ -87,38 +93,64 @@ func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[stri
 		backend: backend,
 		backend: backend,
 		ticker:  time.NewTicker(heartbeat),
 		ticker:  time.NewTicker(heartbeat),
 		term:    make(chan bool),
 		term:    make(chan bool),
+		readyCh: make(chan struct{}),
 	}
 	}
 	// We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
 	// We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
 	// but we never actually Watch() for nodes appearing and disappearing for the moment.
 	// but we never actually Watch() for nodes appearing and disappearing for the moment.
-	reloader.advertise(advertiseAddress)
+	go reloader.advertiseHeartbeat(advertiseAddress)
 	return reloader, nil
 	return reloader, nil
 }
 }
 
 
-func (d *daemonDiscoveryReloader) advertise(address string) {
-	d.registerAddr(address)
-	go d.advertiseHeartbeat(address)
-}
-
-func (d *daemonDiscoveryReloader) registerAddr(addr string) {
-	if err := d.backend.Register(addr); err != nil {
-		log.Warnf("Registering as %q in discovery failed: %v", addr, err)
-	}
-}
-
 // advertiseHeartbeat registers the current node against the discovery backend using the specified
 // advertiseHeartbeat registers the current node against the discovery backend using the specified
 // address. The function never returns, as registration against the backend comes with a TTL and
 // address. The function never returns, as registration against the backend comes with a TTL and
 // requires regular heartbeats.
 // requires regular heartbeats.
 func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
 func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
+	var ready bool
+	if err := d.initHeartbeat(address); err == nil {
+		ready = true
+		close(d.readyCh)
+	}
+
 	for {
 	for {
 		select {
 		select {
 		case <-d.ticker.C:
 		case <-d.ticker.C:
-			d.registerAddr(address)
+			if err := d.backend.Register(address); err != nil {
+				log.Warnf("Registering as %q in discovery failed: %v", address, err)
+			} else {
+				if !ready {
+					close(d.readyCh)
+					ready = true
+				}
+			}
 		case <-d.term:
 		case <-d.term:
 			return
 			return
 		}
 		}
 	}
 	}
 }
 }
 
 
+// initHeartbeat is used to do the first heartbeat. It uses a tight loop until
+// either the timeout period is reached or the heartbeat is successful and returns.
+func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
+	// Setup a short ticker until the first heartbeat has succeeded
+	t := time.NewTicker(500 * time.Millisecond)
+	defer t.Stop()
+	// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
+	timeout := time.After(60 * time.Second)
+
+	for {
+		select {
+		case <-timeout:
+			return errors.New("timeout waiting for initial discovery")
+		case <-d.term:
+			return errors.New("terminated")
+		case <-t.C:
+			if err := d.backend.Register(address); err == nil {
+				return nil
+			}
+		}
+	}
+}
+
 // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
 // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
 func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
 func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
 	d.Stop()
 	d.Stop()
@@ -130,8 +162,9 @@ func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string
 
 
 	d.backend = backend
 	d.backend = backend
 	d.ticker = time.NewTicker(heartbeat)
 	d.ticker = time.NewTicker(heartbeat)
+	d.readyCh = make(chan struct{})
 
 
-	d.advertise(advertiseAddress)
+	go d.advertiseHeartbeat(advertiseAddress)
 	return nil
 	return nil
 }
 }