zookeeper.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package zookeeper
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/docker/libkv"
  6. "github.com/docker/libkv/store"
  7. zk "github.com/samuel/go-zookeeper/zk"
  8. )
  9. const (
  10. // SOH control character
  11. SOH = "\x01"
  12. defaultTimeout = 10 * time.Second
  13. )
  14. // Zookeeper is the receiver type for
  15. // the Store interface
  16. type Zookeeper struct {
  17. timeout time.Duration
  18. client *zk.Conn
  19. }
  20. type zookeeperLock struct {
  21. client *zk.Conn
  22. lock *zk.Lock
  23. key string
  24. value []byte
  25. }
  26. // Register registers zookeeper to libkv
  27. func Register() {
  28. libkv.AddStore(store.ZK, New)
  29. }
  30. // New creates a new Zookeeper client given a
  31. // list of endpoints and an optional tls config
  32. func New(endpoints []string, options *store.Config) (store.Store, error) {
  33. s := &Zookeeper{}
  34. s.timeout = defaultTimeout
  35. // Set options
  36. if options != nil {
  37. if options.ConnectionTimeout != 0 {
  38. s.setTimeout(options.ConnectionTimeout)
  39. }
  40. }
  41. // Connect to Zookeeper
  42. conn, _, err := zk.Connect(endpoints, s.timeout)
  43. if err != nil {
  44. return nil, err
  45. }
  46. s.client = conn
  47. return s, nil
  48. }
  49. // setTimeout sets the timeout for connecting to Zookeeper
  50. func (s *Zookeeper) setTimeout(time time.Duration) {
  51. s.timeout = time
  52. }
  53. // Get the value at "key", returns the last modified index
  54. // to use in conjunction to Atomic calls
  55. func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
  56. resp, meta, err := s.client.Get(s.normalize(key))
  57. if err != nil {
  58. if err == zk.ErrNoNode {
  59. return nil, store.ErrKeyNotFound
  60. }
  61. return nil, err
  62. }
  63. // FIXME handle very rare cases where Get returns the
  64. // SOH control character instead of the actual value
  65. if string(resp) == SOH {
  66. return s.Get(store.Normalize(key))
  67. }
  68. pair = &store.KVPair{
  69. Key: key,
  70. Value: resp,
  71. LastIndex: uint64(meta.Version),
  72. }
  73. return pair, nil
  74. }
  75. // createFullPath creates the entire path for a directory
  76. // that does not exist
  77. func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
  78. for i := 1; i <= len(path); i++ {
  79. newpath := "/" + strings.Join(path[:i], "/")
  80. if i == len(path) && ephemeral {
  81. _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  82. return err
  83. }
  84. _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
  85. if err != nil {
  86. // Skip if node already exists
  87. if err != zk.ErrNodeExists {
  88. return err
  89. }
  90. }
  91. }
  92. return nil
  93. }
  94. // Put a value at "key"
  95. func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
  96. fkey := s.normalize(key)
  97. exists, err := s.Exists(key)
  98. if err != nil {
  99. return err
  100. }
  101. if !exists {
  102. if opts != nil && opts.TTL > 0 {
  103. s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
  104. } else {
  105. s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
  106. }
  107. }
  108. _, err = s.client.Set(fkey, value, -1)
  109. return err
  110. }
  111. // Delete a value at "key"
  112. func (s *Zookeeper) Delete(key string) error {
  113. err := s.client.Delete(s.normalize(key), -1)
  114. if err == zk.ErrNoNode {
  115. return store.ErrKeyNotFound
  116. }
  117. return err
  118. }
  119. // Exists checks if the key exists inside the store
  120. func (s *Zookeeper) Exists(key string) (bool, error) {
  121. exists, _, err := s.client.Exists(s.normalize(key))
  122. if err != nil {
  123. return false, err
  124. }
  125. return exists, nil
  126. }
  127. // Watch for changes on a "key"
  128. // It returns a channel that will receive changes or pass
  129. // on errors. Upon creation, the current value will first
  130. // be sent to the channel. Providing a non-nil stopCh can
  131. // be used to stop watching.
  132. func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
  133. // Get the key first
  134. pair, err := s.Get(key)
  135. if err != nil {
  136. return nil, err
  137. }
  138. // Catch zk notifications and fire changes into the channel.
  139. watchCh := make(chan *store.KVPair)
  140. go func() {
  141. defer close(watchCh)
  142. // Get returns the current value to the channel prior
  143. // to listening to any event that may occur on that key
  144. watchCh <- pair
  145. for {
  146. _, _, eventCh, err := s.client.GetW(s.normalize(key))
  147. if err != nil {
  148. return
  149. }
  150. select {
  151. case e := <-eventCh:
  152. if e.Type == zk.EventNodeDataChanged {
  153. if entry, err := s.Get(key); err == nil {
  154. watchCh <- entry
  155. }
  156. }
  157. case <-stopCh:
  158. // There is no way to stop GetW so just quit
  159. return
  160. }
  161. }
  162. }()
  163. return watchCh, nil
  164. }
  165. // WatchTree watches for changes on a "directory"
  166. // It returns a channel that will receive changes or pass
  167. // on errors. Upon creating a watch, the current childs values
  168. // will be sent to the channel .Providing a non-nil stopCh can
  169. // be used to stop watching.
  170. func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
  171. // List the childrens first
  172. entries, err := s.List(directory)
  173. if err != nil {
  174. return nil, err
  175. }
  176. // Catch zk notifications and fire changes into the channel.
  177. watchCh := make(chan []*store.KVPair)
  178. go func() {
  179. defer close(watchCh)
  180. // List returns the children values to the channel
  181. // prior to listening to any events that may occur
  182. // on those keys
  183. watchCh <- entries
  184. for {
  185. _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
  186. if err != nil {
  187. return
  188. }
  189. select {
  190. case e := <-eventCh:
  191. if e.Type == zk.EventNodeChildrenChanged {
  192. if kv, err := s.List(directory); err == nil {
  193. watchCh <- kv
  194. }
  195. }
  196. case <-stopCh:
  197. // There is no way to stop GetW so just quit
  198. return
  199. }
  200. }
  201. }()
  202. return watchCh, nil
  203. }
  204. // List child nodes of a given directory
  205. func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
  206. keys, stat, err := s.client.Children(s.normalize(directory))
  207. if err != nil {
  208. if err == zk.ErrNoNode {
  209. return nil, store.ErrKeyNotFound
  210. }
  211. return nil, err
  212. }
  213. kv := []*store.KVPair{}
  214. // FIXME Costly Get request for each child key..
  215. for _, key := range keys {
  216. pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
  217. if err != nil {
  218. // If node is not found: List is out of date, retry
  219. if err == store.ErrKeyNotFound {
  220. return s.List(directory)
  221. }
  222. return nil, err
  223. }
  224. kv = append(kv, &store.KVPair{
  225. Key: key,
  226. Value: []byte(pair.Value),
  227. LastIndex: uint64(stat.Version),
  228. })
  229. }
  230. return kv, nil
  231. }
  232. // DeleteTree deletes a range of keys under a given directory
  233. func (s *Zookeeper) DeleteTree(directory string) error {
  234. pairs, err := s.List(directory)
  235. if err != nil {
  236. return err
  237. }
  238. var reqs []interface{}
  239. for _, pair := range pairs {
  240. reqs = append(reqs, &zk.DeleteRequest{
  241. Path: s.normalize(directory + "/" + pair.Key),
  242. Version: -1,
  243. })
  244. }
  245. _, err = s.client.Multi(reqs...)
  246. return err
  247. }
  248. // AtomicPut put a value at "key" if the key has not been
  249. // modified in the meantime, throws an error if this is the case
  250. func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
  251. var lastIndex uint64
  252. if previous != nil {
  253. meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
  254. if err != nil {
  255. // Compare Failed
  256. if err == zk.ErrBadVersion {
  257. return false, nil, store.ErrKeyModified
  258. }
  259. return false, nil, err
  260. }
  261. lastIndex = uint64(meta.Version)
  262. } else {
  263. // Interpret previous == nil as create operation.
  264. _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
  265. if err != nil {
  266. // Directory does not exist
  267. if err == zk.ErrNoNode {
  268. // Create the directory
  269. parts := store.SplitKey(strings.TrimSuffix(key, "/"))
  270. parts = parts[:len(parts)-1]
  271. if err = s.createFullPath(parts, false); err != nil {
  272. // Failed to create the directory.
  273. return false, nil, err
  274. }
  275. // Create the node
  276. if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
  277. // Node exist error (when previous nil)
  278. if err == zk.ErrNodeExists {
  279. return false, nil, store.ErrKeyExists
  280. }
  281. return false, nil, err
  282. }
  283. } else {
  284. // Node Exists error (when previous nil)
  285. if err == zk.ErrNodeExists {
  286. return false, nil, store.ErrKeyExists
  287. }
  288. // Unhandled error
  289. return false, nil, err
  290. }
  291. }
  292. lastIndex = 0 // Newly created nodes have version 0.
  293. }
  294. pair := &store.KVPair{
  295. Key: key,
  296. Value: value,
  297. LastIndex: lastIndex,
  298. }
  299. return true, pair, nil
  300. }
  301. // AtomicDelete deletes a value at "key" if the key
  302. // has not been modified in the meantime, throws an
  303. // error if this is the case
  304. func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
  305. if previous == nil {
  306. return false, store.ErrPreviousNotSpecified
  307. }
  308. err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
  309. if err != nil {
  310. // Key not found
  311. if err == zk.ErrNoNode {
  312. return false, store.ErrKeyNotFound
  313. }
  314. // Compare failed
  315. if err == zk.ErrBadVersion {
  316. return false, store.ErrKeyModified
  317. }
  318. // General store error
  319. return false, err
  320. }
  321. return true, nil
  322. }
  323. // NewLock returns a handle to a lock struct which can
  324. // be used to provide mutual exclusion on a key
  325. func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
  326. value := []byte("")
  327. // Apply options
  328. if options != nil {
  329. if options.Value != nil {
  330. value = options.Value
  331. }
  332. }
  333. lock = &zookeeperLock{
  334. client: s.client,
  335. key: s.normalize(key),
  336. value: value,
  337. lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
  338. }
  339. return lock, err
  340. }
  341. // Lock attempts to acquire the lock and blocks while
  342. // doing so. It returns a channel that is closed if our
  343. // lock is lost or if an error occurs
  344. func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
  345. err := l.lock.Lock()
  346. if err == nil {
  347. // We hold the lock, we can set our value
  348. // FIXME: The value is left behind
  349. // (problematic for leader election)
  350. _, err = l.client.Set(l.key, l.value, -1)
  351. }
  352. return make(chan struct{}), err
  353. }
  354. // Unlock the "key". Calling unlock while
  355. // not holding the lock will throw an error
  356. func (l *zookeeperLock) Unlock() error {
  357. return l.lock.Unlock()
  358. }
  359. // Close closes the client connection
  360. func (s *Zookeeper) Close() {
  361. s.client.Close()
  362. }
  363. // Normalize the key for usage in Zookeeper
  364. func (s *Zookeeper) normalize(key string) string {
  365. key = store.Normalize(key)
  366. return strings.TrimSuffix(key, "/")
  367. }