kv.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package kv
  2. import (
  3. "fmt"
  4. "path"
  5. "strings"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/pkg/discovery"
  9. "github.com/docker/go-connections/tlsconfig"
  10. "github.com/docker/libkv"
  11. "github.com/docker/libkv/store"
  12. "github.com/docker/libkv/store/consul"
  13. "github.com/docker/libkv/store/etcd"
  14. "github.com/docker/libkv/store/zookeeper"
  15. )
  16. const (
  17. defaultDiscoveryPath = "docker/nodes"
  18. )
  19. // Discovery is exported
  20. type Discovery struct {
  21. backend store.Backend
  22. store store.Store
  23. heartbeat time.Duration
  24. ttl time.Duration
  25. prefix string
  26. path string
  27. }
  28. func init() {
  29. Init()
  30. }
  31. // Init is exported
  32. func Init() {
  33. // Register to libkv
  34. zookeeper.Register()
  35. consul.Register()
  36. etcd.Register()
  37. // Register to internal discovery service
  38. discovery.Register("zk", &Discovery{backend: store.ZK})
  39. discovery.Register("consul", &Discovery{backend: store.CONSUL})
  40. discovery.Register("etcd", &Discovery{backend: store.ETCD})
  41. }
  42. // Initialize is exported
  43. func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration, clusterOpts map[string]string) error {
  44. var (
  45. parts = strings.SplitN(uris, "/", 2)
  46. addrs = strings.Split(parts[0], ",")
  47. err error
  48. )
  49. // A custom prefix to the path can be optionally used.
  50. if len(parts) == 2 {
  51. s.prefix = parts[1]
  52. }
  53. s.heartbeat = heartbeat
  54. s.ttl = ttl
  55. // Use a custom path if specified in discovery options
  56. dpath := defaultDiscoveryPath
  57. if clusterOpts["kv.path"] != "" {
  58. dpath = clusterOpts["kv.path"]
  59. }
  60. s.path = path.Join(s.prefix, dpath)
  61. var config *store.Config
  62. if clusterOpts["kv.cacertfile"] != "" && clusterOpts["kv.certfile"] != "" && clusterOpts["kv.keyfile"] != "" {
  63. logrus.Info("Initializing discovery with TLS")
  64. tlsConfig, err := tlsconfig.Client(tlsconfig.Options{
  65. CAFile: clusterOpts["kv.cacertfile"],
  66. CertFile: clusterOpts["kv.certfile"],
  67. KeyFile: clusterOpts["kv.keyfile"],
  68. })
  69. if err != nil {
  70. return err
  71. }
  72. config = &store.Config{
  73. // Set ClientTLS to trigger https (bug in libkv/etcd)
  74. ClientTLS: &store.ClientTLSConfig{
  75. CACertFile: clusterOpts["kv.cacertfile"],
  76. CertFile: clusterOpts["kv.certfile"],
  77. KeyFile: clusterOpts["kv.keyfile"],
  78. },
  79. // The actual TLS config that will be used
  80. TLS: tlsConfig,
  81. }
  82. } else {
  83. logrus.Info("Initializing discovery without TLS")
  84. }
  85. // Creates a new store, will ignore options given
  86. // if not supported by the chosen store
  87. s.store, err = libkv.NewStore(s.backend, addrs, config)
  88. return err
  89. }
  90. // Watch the store until either there's a store error or we receive a stop request.
  91. // Returns false if we shouldn't attempt watching the store anymore (stop request received).
  92. func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
  93. for {
  94. select {
  95. case pairs := <-watchCh:
  96. if pairs == nil {
  97. return true
  98. }
  99. logrus.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs))
  100. // Convert `KVPair` into `discovery.Entry`.
  101. addrs := make([]string, len(pairs))
  102. for _, pair := range pairs {
  103. addrs = append(addrs, string(pair.Value))
  104. }
  105. entries, err := discovery.CreateEntries(addrs)
  106. if err != nil {
  107. errCh <- err
  108. } else {
  109. discoveryCh <- entries
  110. }
  111. case <-stopCh:
  112. // We were requested to stop watching.
  113. return false
  114. }
  115. }
  116. }
  117. // Watch is exported
  118. func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  119. ch := make(chan discovery.Entries)
  120. errCh := make(chan error)
  121. go func() {
  122. defer close(ch)
  123. defer close(errCh)
  124. // Forever: Create a store watch, watch until we get an error and then try again.
  125. // Will only stop if we receive a stopCh request.
  126. for {
  127. // Create the path to watch if it does not exist yet
  128. exists, err := s.store.Exists(s.path)
  129. if err != nil {
  130. errCh <- err
  131. }
  132. if !exists {
  133. if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil {
  134. errCh <- err
  135. }
  136. }
  137. // Set up a watch.
  138. watchCh, err := s.store.WatchTree(s.path, stopCh)
  139. if err != nil {
  140. errCh <- err
  141. } else {
  142. if !s.watchOnce(stopCh, watchCh, ch, errCh) {
  143. return
  144. }
  145. }
  146. // If we get here it means the store watch channel was closed. This
  147. // is unexpected so let's retry later.
  148. errCh <- fmt.Errorf("Unexpected watch error")
  149. time.Sleep(s.heartbeat)
  150. }
  151. }()
  152. return ch, errCh
  153. }
  154. // Register is exported
  155. func (s *Discovery) Register(addr string) error {
  156. opts := &store.WriteOptions{TTL: s.ttl}
  157. return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
  158. }
  159. // Store returns the underlying store used by KV discovery.
  160. func (s *Discovery) Store() store.Store {
  161. return s.store
  162. }
  163. // Prefix returns the store prefix
  164. func (s *Discovery) Prefix() string {
  165. return s.prefix
  166. }