discovery.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package daemon
  2. import (
  3. "errors"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/pkg/discovery"
  10. // Register the libkv backends for discovery.
  11. _ "github.com/docker/docker/pkg/discovery/kv"
  12. )
  13. const (
  14. // defaultDiscoveryHeartbeat is the default value for discovery heartbeat interval.
  15. defaultDiscoveryHeartbeat = 20 * time.Second
  16. // defaultDiscoveryTTLFactor is the default TTL factor for discovery
  17. defaultDiscoveryTTLFactor = 3
  18. )
  19. var errDiscoveryDisabled = errors.New("discovery is disabled")
  20. type discoveryReloader interface {
  21. discovery.Watcher
  22. Stop()
  23. Reload(backend, address string, clusterOpts map[string]string) error
  24. ReadyCh() <-chan struct{}
  25. }
  26. type daemonDiscoveryReloader struct {
  27. backend discovery.Backend
  28. ticker *time.Ticker
  29. term chan bool
  30. readyCh chan struct{}
  31. }
  32. func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  33. return d.backend.Watch(stopCh)
  34. }
  35. func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
  36. return d.readyCh
  37. }
  38. func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
  39. var (
  40. heartbeat = defaultDiscoveryHeartbeat
  41. ttl = defaultDiscoveryTTLFactor * defaultDiscoveryHeartbeat
  42. )
  43. if hb, ok := clusterOpts["discovery.heartbeat"]; ok {
  44. h, err := strconv.Atoi(hb)
  45. if err != nil {
  46. return time.Duration(0), time.Duration(0), err
  47. }
  48. heartbeat = time.Duration(h) * time.Second
  49. ttl = defaultDiscoveryTTLFactor * heartbeat
  50. }
  51. if tstr, ok := clusterOpts["discovery.ttl"]; ok {
  52. t, err := strconv.Atoi(tstr)
  53. if err != nil {
  54. return time.Duration(0), time.Duration(0), err
  55. }
  56. ttl = time.Duration(t) * time.Second
  57. if _, ok := clusterOpts["discovery.heartbeat"]; !ok {
  58. h := int(t / defaultDiscoveryTTLFactor)
  59. heartbeat = time.Duration(h) * time.Second
  60. }
  61. if ttl <= heartbeat {
  62. return time.Duration(0), time.Duration(0),
  63. fmt.Errorf("discovery.ttl timer must be greater than discovery.heartbeat")
  64. }
  65. }
  66. return heartbeat, ttl, nil
  67. }
  68. // initDiscovery initializes the nodes discovery subsystem by connecting to the specified backend
  69. // and starts a registration loop to advertise the current node under the specified address.
  70. func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[string]string) (discoveryReloader, error) {
  71. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  72. if err != nil {
  73. return nil, err
  74. }
  75. reloader := &daemonDiscoveryReloader{
  76. backend: backend,
  77. ticker: time.NewTicker(heartbeat),
  78. term: make(chan bool),
  79. readyCh: make(chan struct{}),
  80. }
  81. // We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
  82. // but we never actually Watch() for nodes appearing and disappearing for the moment.
  83. go reloader.advertiseHeartbeat(advertiseAddress)
  84. return reloader, nil
  85. }
  86. // advertiseHeartbeat registers the current node against the discovery backend using the specified
  87. // address. The function never returns, as registration against the backend comes with a TTL and
  88. // requires regular heartbeats.
  89. func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
  90. var ready bool
  91. if err := d.initHeartbeat(address); err == nil {
  92. ready = true
  93. close(d.readyCh)
  94. }
  95. for {
  96. select {
  97. case <-d.ticker.C:
  98. if err := d.backend.Register(address); err != nil {
  99. logrus.Warnf("Registering as %q in discovery failed: %v", address, err)
  100. } else {
  101. if !ready {
  102. close(d.readyCh)
  103. ready = true
  104. }
  105. }
  106. case <-d.term:
  107. return
  108. }
  109. }
  110. }
  111. // initHeartbeat is used to do the first heartbeat. It uses a tight loop until
  112. // either the timeout period is reached or the heartbeat is successful and returns.
  113. func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
  114. // Setup a short ticker until the first heartbeat has succeeded
  115. t := time.NewTicker(500 * time.Millisecond)
  116. defer t.Stop()
  117. // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
  118. timeout := time.After(60 * time.Second)
  119. for {
  120. select {
  121. case <-timeout:
  122. return errors.New("timeout waiting for initial discovery")
  123. case <-d.term:
  124. return errors.New("terminated")
  125. case <-t.C:
  126. if err := d.backend.Register(address); err == nil {
  127. return nil
  128. }
  129. }
  130. }
  131. }
  132. // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
  133. func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
  134. d.Stop()
  135. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  136. if err != nil {
  137. return err
  138. }
  139. d.backend = backend
  140. d.ticker = time.NewTicker(heartbeat)
  141. d.readyCh = make(chan struct{})
  142. go d.advertiseHeartbeat(advertiseAddress)
  143. return nil
  144. }
  145. // Stop terminates the discovery advertising.
  146. func (d *daemonDiscoveryReloader) Stop() {
  147. d.ticker.Stop()
  148. d.term <- true
  149. }
  150. func parseDiscoveryOptions(backendAddress string, clusterOpts map[string]string) (time.Duration, discovery.Backend, error) {
  151. heartbeat, ttl, err := discoveryOpts(clusterOpts)
  152. if err != nil {
  153. return 0, nil, err
  154. }
  155. backend, err := discovery.New(backendAddress, heartbeat, ttl, clusterOpts)
  156. if err != nil {
  157. return 0, nil, err
  158. }
  159. return heartbeat, backend, nil
  160. }
  161. // modifiedDiscoverySettings returns whether the discovery configuration has been modified or not.
  162. func modifiedDiscoverySettings(config *Config, backendType, advertise string, clusterOpts map[string]string) bool {
  163. if config.ClusterStore != backendType || config.ClusterAdvertise != advertise {
  164. return true
  165. }
  166. if (config.ClusterOpts == nil && clusterOpts == nil) ||
  167. (config.ClusterOpts == nil && len(clusterOpts) == 0) ||
  168. (len(config.ClusterOpts) == 0 && clusterOpts == nil) {
  169. return false
  170. }
  171. return !reflect.DeepEqual(config.ClusterOpts, clusterOpts)
  172. }