|
@@ -1,204 +0,0 @@
|
|
|
-package discovery // import "github.com/docker/docker/daemon/discovery"
|
|
|
-
|
|
|
-import (
|
|
|
- "errors"
|
|
|
- "fmt"
|
|
|
- "strconv"
|
|
|
- "time"
|
|
|
-
|
|
|
- "github.com/docker/docker/pkg/discovery"
|
|
|
- "github.com/sirupsen/logrus"
|
|
|
-
|
|
|
- // Register the libkv backends for discovery.
|
|
|
- _ "github.com/docker/docker/pkg/discovery/kv"
|
|
|
-)
|
|
|
-
|
|
|
-const (
|
|
|
- // defaultDiscoveryHeartbeat is the default value for discovery heartbeat interval.
|
|
|
- defaultDiscoveryHeartbeat = 20 * time.Second
|
|
|
- // defaultDiscoveryTTLFactor is the default TTL factor for discovery
|
|
|
- defaultDiscoveryTTLFactor = 3
|
|
|
-)
|
|
|
-
|
|
|
-// ErrDiscoveryDisabled is an error returned if the discovery is disabled
|
|
|
-var ErrDiscoveryDisabled = errors.New("discovery is disabled")
|
|
|
-
|
|
|
-// Reloader is the discovery reloader of the daemon
|
|
|
-type Reloader interface {
|
|
|
- discovery.Watcher
|
|
|
- Stop()
|
|
|
- Reload(backend, address string, clusterOpts map[string]string) error
|
|
|
- ReadyCh() <-chan struct{}
|
|
|
-}
|
|
|
-
|
|
|
-type daemonDiscoveryReloader struct {
|
|
|
- backend discovery.Backend
|
|
|
- ticker *time.Ticker
|
|
|
- term chan bool
|
|
|
- readyCh chan struct{}
|
|
|
-}
|
|
|
-
|
|
|
-func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
|
|
- return d.backend.Watch(stopCh)
|
|
|
-}
|
|
|
-
|
|
|
-func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
|
|
|
- return d.readyCh
|
|
|
-}
|
|
|
-
|
|
|
-func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
|
|
|
- var (
|
|
|
- heartbeat = defaultDiscoveryHeartbeat
|
|
|
- ttl = defaultDiscoveryTTLFactor * defaultDiscoveryHeartbeat
|
|
|
- )
|
|
|
-
|
|
|
- if hb, ok := clusterOpts["discovery.heartbeat"]; ok {
|
|
|
- h, err := strconv.Atoi(hb)
|
|
|
- if err != nil {
|
|
|
- return time.Duration(0), time.Duration(0), err
|
|
|
- }
|
|
|
-
|
|
|
- if h <= 0 {
|
|
|
- return time.Duration(0), time.Duration(0),
|
|
|
- fmt.Errorf("discovery.heartbeat must be positive")
|
|
|
- }
|
|
|
-
|
|
|
- heartbeat = time.Duration(h) * time.Second
|
|
|
- ttl = defaultDiscoveryTTLFactor * heartbeat
|
|
|
- }
|
|
|
-
|
|
|
- if tstr, ok := clusterOpts["discovery.ttl"]; ok {
|
|
|
- t, err := strconv.Atoi(tstr)
|
|
|
- if err != nil {
|
|
|
- return time.Duration(0), time.Duration(0), err
|
|
|
- }
|
|
|
-
|
|
|
- if t <= 0 {
|
|
|
- return time.Duration(0), time.Duration(0),
|
|
|
- fmt.Errorf("discovery.ttl must be positive")
|
|
|
- }
|
|
|
-
|
|
|
- ttl = time.Duration(t) * time.Second
|
|
|
-
|
|
|
- if _, ok := clusterOpts["discovery.heartbeat"]; !ok {
|
|
|
- heartbeat = time.Duration(t) * time.Second / time.Duration(defaultDiscoveryTTLFactor)
|
|
|
- }
|
|
|
-
|
|
|
- if ttl <= heartbeat {
|
|
|
- return time.Duration(0), time.Duration(0),
|
|
|
- fmt.Errorf("discovery.ttl timer must be greater than discovery.heartbeat")
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return heartbeat, ttl, nil
|
|
|
-}
|
|
|
-
|
|
|
-// Init initializes the nodes discovery subsystem by connecting to the specified backend
|
|
|
-// and starts a registration loop to advertise the current node under the specified address.
|
|
|
-func Init(backendAddress, advertiseAddress string, clusterOpts map[string]string) (Reloader, error) {
|
|
|
- heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- reloader := &daemonDiscoveryReloader{
|
|
|
- backend: backend,
|
|
|
- ticker: time.NewTicker(heartbeat),
|
|
|
- term: make(chan bool),
|
|
|
- readyCh: make(chan struct{}),
|
|
|
- }
|
|
|
- // We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
|
|
|
- // but we never actually Watch() for nodes appearing and disappearing for the moment.
|
|
|
- go reloader.advertiseHeartbeat(advertiseAddress)
|
|
|
- return reloader, nil
|
|
|
-}
|
|
|
-
|
|
|
-// advertiseHeartbeat registers the current node against the discovery backend using the specified
|
|
|
-// address. The function never returns, as registration against the backend comes with a TTL and
|
|
|
-// requires regular heartbeats.
|
|
|
-func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
|
|
|
- var ready bool
|
|
|
- if err := d.initHeartbeat(address); err == nil {
|
|
|
- ready = true
|
|
|
- close(d.readyCh)
|
|
|
- } else {
|
|
|
- logrus.WithError(err).Debug("First discovery heartbeat failed")
|
|
|
- }
|
|
|
-
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-d.ticker.C:
|
|
|
- if err := d.backend.Register(address); err != nil {
|
|
|
- logrus.Warnf("Registering as %q in discovery failed: %v", address, err)
|
|
|
- } else {
|
|
|
- if !ready {
|
|
|
- close(d.readyCh)
|
|
|
- ready = true
|
|
|
- }
|
|
|
- }
|
|
|
- case <-d.term:
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// initHeartbeat is used to do the first heartbeat. It uses a tight loop until
|
|
|
-// either the timeout period is reached or the heartbeat is successful and returns.
|
|
|
-func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
|
|
|
- // Setup a short ticker until the first heartbeat has succeeded
|
|
|
- t := time.NewTicker(500 * time.Millisecond)
|
|
|
- defer t.Stop()
|
|
|
-
|
|
|
- // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
|
|
|
- timeout := time.NewTimer(60 * time.Second)
|
|
|
- defer timeout.Stop()
|
|
|
-
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-timeout.C:
|
|
|
- return errors.New("timeout waiting for initial discovery")
|
|
|
- case <-d.term:
|
|
|
- return errors.New("terminated")
|
|
|
- case <-t.C:
|
|
|
- if err := d.backend.Register(address); err == nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
|
|
|
-func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
|
|
|
- d.Stop()
|
|
|
-
|
|
|
- heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- d.backend = backend
|
|
|
- d.ticker = time.NewTicker(heartbeat)
|
|
|
- d.readyCh = make(chan struct{})
|
|
|
-
|
|
|
- go d.advertiseHeartbeat(advertiseAddress)
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// Stop terminates the discovery advertising.
|
|
|
-func (d *daemonDiscoveryReloader) Stop() {
|
|
|
- d.ticker.Stop()
|
|
|
- d.term <- true
|
|
|
-}
|
|
|
-
|
|
|
-func parseDiscoveryOptions(backendAddress string, clusterOpts map[string]string) (time.Duration, discovery.Backend, error) {
|
|
|
- heartbeat, ttl, err := discoveryOpts(clusterOpts)
|
|
|
- if err != nil {
|
|
|
- return 0, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- backend, err := discovery.New(backendAddress, heartbeat, ttl, clusterOpts)
|
|
|
- if err != nil {
|
|
|
- return 0, nil, err
|
|
|
- }
|
|
|
- return heartbeat, backend, nil
|
|
|
-}
|