discovery.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package discovery
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/pkg/discovery"
  9. // Register the libkv backends for discovery.
  10. _ "github.com/docker/docker/pkg/discovery/kv"
  11. )
  12. const (
  13. // defaultDiscoveryHeartbeat is the default value for discovery heartbeat interval.
  14. defaultDiscoveryHeartbeat = 20 * time.Second
  15. // defaultDiscoveryTTLFactor is the default TTL factor for discovery
  16. defaultDiscoveryTTLFactor = 3
  17. )
  18. // ErrDiscoveryDisabled is an error returned if the discovery is disabled
  19. var ErrDiscoveryDisabled = errors.New("discovery is disabled")
  20. // Reloader is the discovery reloader of the daemon
  21. type Reloader interface {
  22. discovery.Watcher
  23. Stop()
  24. Reload(backend, address string, clusterOpts map[string]string) error
  25. ReadyCh() <-chan struct{}
  26. }
  27. type daemonDiscoveryReloader struct {
  28. backend discovery.Backend
  29. ticker *time.Ticker
  30. term chan bool
  31. readyCh chan struct{}
  32. }
  33. func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  34. return d.backend.Watch(stopCh)
  35. }
  36. func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
  37. return d.readyCh
  38. }
  39. func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
  40. var (
  41. heartbeat = defaultDiscoveryHeartbeat
  42. ttl = defaultDiscoveryTTLFactor * defaultDiscoveryHeartbeat
  43. )
  44. if hb, ok := clusterOpts["discovery.heartbeat"]; ok {
  45. h, err := strconv.Atoi(hb)
  46. if err != nil {
  47. return time.Duration(0), time.Duration(0), err
  48. }
  49. if h <= 0 {
  50. return time.Duration(0), time.Duration(0),
  51. fmt.Errorf("discovery.heartbeat must be positive")
  52. }
  53. heartbeat = time.Duration(h) * time.Second
  54. ttl = defaultDiscoveryTTLFactor * heartbeat
  55. }
  56. if tstr, ok := clusterOpts["discovery.ttl"]; ok {
  57. t, err := strconv.Atoi(tstr)
  58. if err != nil {
  59. return time.Duration(0), time.Duration(0), err
  60. }
  61. if t <= 0 {
  62. return time.Duration(0), time.Duration(0),
  63. fmt.Errorf("discovery.ttl must be positive")
  64. }
  65. ttl = time.Duration(t) * time.Second
  66. if _, ok := clusterOpts["discovery.heartbeat"]; !ok {
  67. heartbeat = time.Duration(t) * time.Second / time.Duration(defaultDiscoveryTTLFactor)
  68. }
  69. if ttl <= heartbeat {
  70. return time.Duration(0), time.Duration(0),
  71. fmt.Errorf("discovery.ttl timer must be greater than discovery.heartbeat")
  72. }
  73. }
  74. return heartbeat, ttl, nil
  75. }
  76. // Init initializes the nodes discovery subsystem by connecting to the specified backend
  77. // and starts a registration loop to advertise the current node under the specified address.
  78. func Init(backendAddress, advertiseAddress string, clusterOpts map[string]string) (Reloader, error) {
  79. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  80. if err != nil {
  81. return nil, err
  82. }
  83. reloader := &daemonDiscoveryReloader{
  84. backend: backend,
  85. ticker: time.NewTicker(heartbeat),
  86. term: make(chan bool),
  87. readyCh: make(chan struct{}),
  88. }
  89. // We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
  90. // but we never actually Watch() for nodes appearing and disappearing for the moment.
  91. go reloader.advertiseHeartbeat(advertiseAddress)
  92. return reloader, nil
  93. }
  94. // advertiseHeartbeat registers the current node against the discovery backend using the specified
  95. // address. The function never returns, as registration against the backend comes with a TTL and
  96. // requires regular heartbeats.
  97. func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
  98. var ready bool
  99. if err := d.initHeartbeat(address); err == nil {
  100. ready = true
  101. close(d.readyCh)
  102. } else {
  103. logrus.WithError(err).Debug("First discovery heartbeat failed")
  104. }
  105. for {
  106. select {
  107. case <-d.ticker.C:
  108. if err := d.backend.Register(address); err != nil {
  109. logrus.Warnf("Registering as %q in discovery failed: %v", address, err)
  110. } else {
  111. if !ready {
  112. close(d.readyCh)
  113. ready = true
  114. }
  115. }
  116. case <-d.term:
  117. return
  118. }
  119. }
  120. }
  121. // initHeartbeat is used to do the first heartbeat. It uses a tight loop until
  122. // either the timeout period is reached or the heartbeat is successful and returns.
  123. func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
  124. // Setup a short ticker until the first heartbeat has succeeded
  125. t := time.NewTicker(500 * time.Millisecond)
  126. defer t.Stop()
  127. // timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
  128. timeout := time.After(60 * time.Second)
  129. for {
  130. select {
  131. case <-timeout:
  132. return errors.New("timeout waiting for initial discovery")
  133. case <-d.term:
  134. return errors.New("terminated")
  135. case <-t.C:
  136. if err := d.backend.Register(address); err == nil {
  137. return nil
  138. }
  139. }
  140. }
  141. }
  142. // Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
  143. func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
  144. d.Stop()
  145. heartbeat, backend, err := parseDiscoveryOptions(backendAddress, clusterOpts)
  146. if err != nil {
  147. return err
  148. }
  149. d.backend = backend
  150. d.ticker = time.NewTicker(heartbeat)
  151. d.readyCh = make(chan struct{})
  152. go d.advertiseHeartbeat(advertiseAddress)
  153. return nil
  154. }
  155. // Stop terminates the discovery advertising.
  156. func (d *daemonDiscoveryReloader) Stop() {
  157. d.ticker.Stop()
  158. d.term <- true
  159. }
  160. func parseDiscoveryOptions(backendAddress string, clusterOpts map[string]string) (time.Duration, discovery.Backend, error) {
  161. heartbeat, ttl, err := discoveryOpts(clusterOpts)
  162. if err != nil {
  163. return 0, nil, err
  164. }
  165. backend, err := discovery.New(backendAddress, heartbeat, ttl, clusterOpts)
  166. if err != nil {
  167. return 0, nil, err
  168. }
  169. return heartbeat, backend, nil
  170. }