discovery.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package daemon
  2. import (
  3. "errors"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/pkg/discovery"
  10. // Register the libkv backends for discovery.
  11. _ "github.com/docker/docker/pkg/discovery/kv"
  12. )
  13. const (
  14. // defaultDiscoveryHeartbeat is the default value for discovery heartbeat interval.
  15. defaultDiscoveryHeartbeat = 20 * time.Second
  16. // defaultDiscoveryTTLFactor is the default TTL factor for discovery
  17. defaultDiscoveryTTLFactor = 3
  18. )
  19. var errDiscoveryDisabled = errors.New("discovery is disabled")
  20. type discoveryReloader interface {
  21. discovery.Watcher
  22. Stop()
  23. Reload(backend, address string, clusterOpts map[string]string) error
  24. ReadyCh() <-chan struct{}
  25. }
  26. type daemonDiscoveryReloader struct {
  27. backend discovery.Backend
  28. ticker *time.Ticker
  29. term chan bool
  30. readyCh chan struct{}
  31. }
  32. func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  33. return d.backend.Watch(stopCh)
  34. }
  35. func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
  36. return d.readyCh
  37. }
  38. func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
  39. var (
  40. heartbeat = defaultDiscoveryHeartbeat
  41. ttl = defaultDiscoveryTTLFactor * defaultDiscoveryHeartbeat
  42. )
  43. if hb, ok := clusterOpts["discovery.heartbeat"]; ok {
  44. h, err := strconv.Atoi(hb)
  45. if err != nil {
  46. return time.Duration(0), time.Duration(0), err
  47. }
  48. if h <= 0 {
  49. return time.Duration(0), time.Duration(0),
  50. fmt.Errorf("discovery.heartbeat must be positive")
  51. }
  52. heartbeat = time.Duration(h) * time.Second
  53. ttl = defaultDiscoveryTTLFactor * heartbeat
  54. }
  55. if tstr, ok := clusterOpts["discovery.ttl"]; ok {
  56. t, err := strconv.Atoi(tstr)
  57. if err != nil {
  58. return time.Duration(0), time.Duration(0), err
  59. }
  60. if t <= 0 {
  61. return time.Duration(0), time.Duration(0),
  62. fmt.Errorf("discovery.ttl must be positive")
  63. }
  64. ttl = time.Duration(t) * time.Second
  65. if _, ok := clusterOpts["discovery.heartbeat"]; !ok {
  66. h := int(t / defaultDiscoveryTTLFactor)
  67. heartbeat = time.Duration(h) * time.Second
  68. }
  69. if ttl <= heartbeat {
  70. return time.Duration(0), time.Duration(0),
  71. fmt.Errorf("discovery.ttl timer must be greater than discovery.heartbeat")
  72. }
  73. }
  74. return heartbeat, ttl, nil
  75. }
  76. // initDiscovery initializes the nodes discovery subsystem by connecting to the specified backend
  77. // and starts a registration loop to advertise the current node under the specified address.
  78. func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[string]string) (discoveryReloader, error) {
  79. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  80. if err != nil {
  81. return nil, err
  82. }
  83. reloader := &daemonDiscoveryReloader{
  84. backend: backend,
  85. ticker: time.NewTicker(heartbeat),
  86. term: make(chan bool),
  87. readyCh: make(chan struct{}),
  88. }
  89. // We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
  90. // but we never actually Watch() for nodes appearing and disappearing for the moment.
  91. go reloader.advertiseHeartbeat(advertiseAddress)
  92. return reloader, nil
  93. }
  94. // advertiseHeartbeat registers the current node against the discovery backend using the specified
  95. // address. The function never returns, as registration against the backend comes with a TTL and
  96. // requires regular heartbeats.
  97. func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
  98. var ready bool
  99. if err := d.initHeartbeat(address); err == nil {
  100. ready = true
  101. close(d.readyCh)
  102. }
  103. for {
  104. select {
  105. case <-d.ticker.C:
  106. if err := d.backend.Register(address); err != nil {
  107. logrus.Warnf("Registering as %q in discovery failed: %v", address, err)
  108. } else {
  109. if !ready {
  110. close(d.readyCh)
  111. ready = true
  112. }
  113. }
  114. case <-d.term:
  115. return
  116. }
  117. }
  118. }
  119. // initHeartbeat is used to do the first heartbeat. It uses a tight loop until
  120. // either the timeout period is reached or the heartbeat is successful and returns.
  121. func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
  122. // Setup a short ticker until the first heartbeat has succeeded
  123. t := time.NewTicker(500 * time.Millisecond)
  124. defer t.Stop()
  125. // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
  126. timeout := time.After(60 * time.Second)
  127. for {
  128. select {
  129. case <-timeout:
  130. return errors.New("timeout waiting for initial discovery")
  131. case <-d.term:
  132. return errors.New("terminated")
  133. case <-t.C:
  134. if err := d.backend.Register(address); err == nil {
  135. return nil
  136. }
  137. }
  138. }
  139. }
  140. // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
  141. func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
  142. d.Stop()
  143. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  144. if err != nil {
  145. return err
  146. }
  147. d.backend = backend
  148. d.ticker = time.NewTicker(heartbeat)
  149. d.readyCh = make(chan struct{})
  150. go d.advertiseHeartbeat(advertiseAddress)
  151. return nil
  152. }
  153. // Stop terminates the discovery advertising.
  154. func (d *daemonDiscoveryReloader) Stop() {
  155. d.ticker.Stop()
  156. d.term <- true
  157. }
  158. func parseDiscoveryOptions(backendAddress string, clusterOpts map[string]string) (time.Duration, discovery.Backend, error) {
  159. heartbeat, ttl, err := discoveryOpts(clusterOpts)
  160. if err != nil {
  161. return 0, nil, err
  162. }
  163. backend, err := discovery.New(backendAddress, heartbeat, ttl, clusterOpts)
  164. if err != nil {
  165. return 0, nil, err
  166. }
  167. return heartbeat, backend, nil
  168. }
  169. // modifiedDiscoverySettings returns whether the discovery configuration has been modified or not.
  170. func modifiedDiscoverySettings(config *Config, backendType, advertise string, clusterOpts map[string]string) bool {
  171. if config.ClusterStore != backendType || config.ClusterAdvertise != advertise {
  172. return true
  173. }
  174. if (config.ClusterOpts == nil && clusterOpts == nil) ||
  175. (config.ClusterOpts == nil && len(clusterOpts) == 0) ||
  176. (len(config.ClusterOpts) == 0 && clusterOpts == nil) {
  177. return false
  178. }
  179. return !reflect.DeepEqual(config.ClusterOpts, clusterOpts)
  180. }