etcd.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package etcd
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "log"
  6. "net"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "golang.org/x/net/context"
  11. etcd "github.com/coreos/etcd/client"
  12. "github.com/docker/libkv"
  13. "github.com/docker/libkv/store"
  14. )
  15. var (
  16. // ErrAbortTryLock is thrown when a user stops trying to seek the lock
  17. // by sending a signal to the stop chan, this is used to verify if the
  18. // operation succeeded
  19. ErrAbortTryLock = errors.New("lock operation aborted")
  20. )
  21. // Etcd is the receiver type for the
  22. // Store interface
  23. type Etcd struct {
  24. client etcd.KeysAPI
  25. }
  26. type etcdLock struct {
  27. client etcd.KeysAPI
  28. stopLock chan struct{}
  29. stopRenew chan struct{}
  30. key string
  31. value string
  32. last *etcd.Response
  33. ttl time.Duration
  34. }
  35. const (
  36. periodicSync = 5 * time.Minute
  37. defaultLockTTL = 20 * time.Second
  38. defaultUpdateTime = 5 * time.Second
  39. )
  40. // Register registers etcd to libkv
  41. func Register() {
  42. libkv.AddStore(store.ETCD, New)
  43. }
  44. // New creates a new Etcd client given a list
  45. // of endpoints and an optional tls config
  46. func New(addrs []string, options *store.Config) (store.Store, error) {
  47. s := &Etcd{}
  48. var (
  49. entries []string
  50. err error
  51. )
  52. entries = store.CreateEndpoints(addrs, "http")
  53. cfg := &etcd.Config{
  54. Endpoints: entries,
  55. Transport: etcd.DefaultTransport,
  56. HeaderTimeoutPerRequest: 3 * time.Second,
  57. }
  58. // Set options
  59. if options != nil {
  60. if options.TLS != nil {
  61. setTLS(cfg, options.TLS, addrs)
  62. }
  63. if options.ConnectionTimeout != 0 {
  64. setTimeout(cfg, options.ConnectionTimeout)
  65. }
  66. if options.Username != "" {
  67. setCredentials(cfg, options.Username, options.Password)
  68. }
  69. }
  70. c, err := etcd.New(*cfg)
  71. if err != nil {
  72. log.Fatal(err)
  73. }
  74. s.client = etcd.NewKeysAPI(c)
  75. // Periodic Cluster Sync
  76. go func() {
  77. for {
  78. if err := c.AutoSync(context.Background(), periodicSync); err != nil {
  79. return
  80. }
  81. }
  82. }()
  83. return s, nil
  84. }
  85. // SetTLS sets the tls configuration given a tls.Config scheme
  86. func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) {
  87. entries := store.CreateEndpoints(addrs, "https")
  88. cfg.Endpoints = entries
  89. // Set transport
  90. t := http.Transport{
  91. Dial: (&net.Dialer{
  92. Timeout: 30 * time.Second,
  93. KeepAlive: 30 * time.Second,
  94. }).Dial,
  95. TLSHandshakeTimeout: 10 * time.Second,
  96. TLSClientConfig: tls,
  97. }
  98. cfg.Transport = &t
  99. }
  100. // setTimeout sets the timeout used for connecting to the store
  101. func setTimeout(cfg *etcd.Config, time time.Duration) {
  102. cfg.HeaderTimeoutPerRequest = time
  103. }
  104. // setCredentials sets the username/password credentials for connecting to Etcd
  105. func setCredentials(cfg *etcd.Config, username, password string) {
  106. cfg.Username = username
  107. cfg.Password = password
  108. }
  109. // Normalize the key for usage in Etcd
  110. func (s *Etcd) normalize(key string) string {
  111. key = store.Normalize(key)
  112. return strings.TrimPrefix(key, "/")
  113. }
  114. // keyNotFound checks on the error returned by the KeysAPI
  115. // to verify if the key exists in the store or not
  116. func keyNotFound(err error) bool {
  117. if err != nil {
  118. if etcdError, ok := err.(etcd.Error); ok {
  119. if etcdError.Code == etcd.ErrorCodeKeyNotFound ||
  120. etcdError.Code == etcd.ErrorCodeNotFile ||
  121. etcdError.Code == etcd.ErrorCodeNotDir {
  122. return true
  123. }
  124. }
  125. }
  126. return false
  127. }
  128. // Get the value at "key", returns the last modified
  129. // index to use in conjunction to Atomic calls
  130. func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
  131. getOpts := &etcd.GetOptions{
  132. Quorum: true,
  133. }
  134. result, err := s.client.Get(context.Background(), s.normalize(key), getOpts)
  135. if err != nil {
  136. if keyNotFound(err) {
  137. return nil, store.ErrKeyNotFound
  138. }
  139. return nil, err
  140. }
  141. pair = &store.KVPair{
  142. Key: key,
  143. Value: []byte(result.Node.Value),
  144. LastIndex: result.Node.ModifiedIndex,
  145. }
  146. return pair, nil
  147. }
  148. // Put a value at "key"
  149. func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
  150. setOpts := &etcd.SetOptions{}
  151. // Set options
  152. if opts != nil {
  153. setOpts.Dir = opts.IsDir
  154. setOpts.TTL = opts.TTL
  155. }
  156. _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
  157. return err
  158. }
  159. // Delete a value at "key"
  160. func (s *Etcd) Delete(key string) error {
  161. opts := &etcd.DeleteOptions{
  162. Recursive: false,
  163. }
  164. _, err := s.client.Delete(context.Background(), s.normalize(key), opts)
  165. if keyNotFound(err) {
  166. return store.ErrKeyNotFound
  167. }
  168. return err
  169. }
  170. // Exists checks if the key exists inside the store
  171. func (s *Etcd) Exists(key string) (bool, error) {
  172. _, err := s.Get(key)
  173. if err != nil {
  174. if err == store.ErrKeyNotFound {
  175. return false, nil
  176. }
  177. return false, err
  178. }
  179. return true, nil
  180. }
  181. // Watch for changes on a "key"
  182. // It returns a channel that will receive changes or pass
  183. // on errors. Upon creation, the current value will first
  184. // be sent to the channel. Providing a non-nil stopCh can
  185. // be used to stop watching.
  186. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
  187. opts := &etcd.WatcherOptions{Recursive: false}
  188. watcher := s.client.Watcher(s.normalize(key), opts)
  189. // watchCh is sending back events to the caller
  190. watchCh := make(chan *store.KVPair)
  191. go func() {
  192. defer close(watchCh)
  193. // Get the current value
  194. pair, err := s.Get(key)
  195. if err != nil {
  196. return
  197. }
  198. // Push the current value through the channel.
  199. watchCh <- pair
  200. for {
  201. // Check if the watch was stopped by the caller
  202. select {
  203. case <-stopCh:
  204. return
  205. default:
  206. }
  207. result, err := watcher.Next(context.Background())
  208. if err != nil {
  209. return
  210. }
  211. watchCh <- &store.KVPair{
  212. Key: key,
  213. Value: []byte(result.Node.Value),
  214. LastIndex: result.Node.ModifiedIndex,
  215. }
  216. }
  217. }()
  218. return watchCh, nil
  219. }
  220. // WatchTree watches for changes on a "directory"
  221. // It returns a channel that will receive changes or pass
  222. // on errors. Upon creating a watch, the current childs values
  223. // will be sent to the channel. Providing a non-nil stopCh can
  224. // be used to stop watching.
  225. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
  226. watchOpts := &etcd.WatcherOptions{Recursive: true}
  227. watcher := s.client.Watcher(s.normalize(directory), watchOpts)
  228. // watchCh is sending back events to the caller
  229. watchCh := make(chan []*store.KVPair)
  230. go func() {
  231. defer close(watchCh)
  232. // Get child values
  233. list, err := s.List(directory)
  234. if err != nil {
  235. return
  236. }
  237. // Push the current value through the channel.
  238. watchCh <- list
  239. for {
  240. // Check if the watch was stopped by the caller
  241. select {
  242. case <-stopCh:
  243. return
  244. default:
  245. }
  246. _, err := watcher.Next(context.Background())
  247. if err != nil {
  248. return
  249. }
  250. list, err = s.List(directory)
  251. if err != nil {
  252. return
  253. }
  254. watchCh <- list
  255. }
  256. }()
  257. return watchCh, nil
  258. }
  259. // AtomicPut puts a value at "key" if the key has not been
  260. // modified in the meantime, throws an error if this is the case
  261. func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
  262. var (
  263. meta *etcd.Response
  264. err error
  265. )
  266. setOpts := &etcd.SetOptions{}
  267. if previous != nil {
  268. setOpts.PrevExist = etcd.PrevExist
  269. setOpts.PrevIndex = previous.LastIndex
  270. if previous.Value != nil {
  271. setOpts.PrevValue = string(previous.Value)
  272. }
  273. } else {
  274. setOpts.PrevExist = etcd.PrevNoExist
  275. }
  276. if opts != nil {
  277. if opts.TTL > 0 {
  278. setOpts.TTL = opts.TTL
  279. }
  280. }
  281. meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts)
  282. if err != nil {
  283. if etcdError, ok := err.(etcd.Error); ok {
  284. // Compare failed
  285. if etcdError.Code == etcd.ErrorCodeTestFailed {
  286. return false, nil, store.ErrKeyModified
  287. }
  288. // Node exists error (when PrevNoExist)
  289. if etcdError.Code == etcd.ErrorCodeNodeExist {
  290. return false, nil, store.ErrKeyExists
  291. }
  292. }
  293. return false, nil, err
  294. }
  295. updated := &store.KVPair{
  296. Key: key,
  297. Value: value,
  298. LastIndex: meta.Node.ModifiedIndex,
  299. }
  300. return true, updated, nil
  301. }
  302. // AtomicDelete deletes a value at "key" if the key
  303. // has not been modified in the meantime, throws an
  304. // error if this is the case
  305. func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
  306. if previous == nil {
  307. return false, store.ErrPreviousNotSpecified
  308. }
  309. delOpts := &etcd.DeleteOptions{}
  310. if previous != nil {
  311. delOpts.PrevIndex = previous.LastIndex
  312. if previous.Value != nil {
  313. delOpts.PrevValue = string(previous.Value)
  314. }
  315. }
  316. _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts)
  317. if err != nil {
  318. if etcdError, ok := err.(etcd.Error); ok {
  319. // Key Not Found
  320. if etcdError.Code == etcd.ErrorCodeKeyNotFound {
  321. return false, store.ErrKeyNotFound
  322. }
  323. // Compare failed
  324. if etcdError.Code == etcd.ErrorCodeTestFailed {
  325. return false, store.ErrKeyModified
  326. }
  327. }
  328. return false, err
  329. }
  330. return true, nil
  331. }
  332. // List child nodes of a given directory
  333. func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
  334. getOpts := &etcd.GetOptions{
  335. Quorum: true,
  336. Recursive: true,
  337. Sort: true,
  338. }
  339. resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts)
  340. if err != nil {
  341. if keyNotFound(err) {
  342. return nil, store.ErrKeyNotFound
  343. }
  344. return nil, err
  345. }
  346. kv := []*store.KVPair{}
  347. for _, n := range resp.Node.Nodes {
  348. kv = append(kv, &store.KVPair{
  349. Key: n.Key,
  350. Value: []byte(n.Value),
  351. LastIndex: n.ModifiedIndex,
  352. })
  353. }
  354. return kv, nil
  355. }
  356. // DeleteTree deletes a range of keys under a given directory
  357. func (s *Etcd) DeleteTree(directory string) error {
  358. delOpts := &etcd.DeleteOptions{
  359. Recursive: true,
  360. }
  361. _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts)
  362. if keyNotFound(err) {
  363. return store.ErrKeyNotFound
  364. }
  365. return err
  366. }
  367. // NewLock returns a handle to a lock struct which can
  368. // be used to provide mutual exclusion on a key
  369. func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
  370. var value string
  371. ttl := defaultLockTTL
  372. renewCh := make(chan struct{})
  373. // Apply options on Lock
  374. if options != nil {
  375. if options.Value != nil {
  376. value = string(options.Value)
  377. }
  378. if options.TTL != 0 {
  379. ttl = options.TTL
  380. }
  381. if options.RenewLock != nil {
  382. renewCh = options.RenewLock
  383. }
  384. }
  385. // Create lock object
  386. lock = &etcdLock{
  387. client: s.client,
  388. stopRenew: renewCh,
  389. key: s.normalize(key),
  390. value: value,
  391. ttl: ttl,
  392. }
  393. return lock, nil
  394. }
  395. // Lock attempts to acquire the lock and blocks while
  396. // doing so. It returns a channel that is closed if our
  397. // lock is lost or if an error occurs
  398. func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
  399. // Lock holder channel
  400. lockHeld := make(chan struct{})
  401. stopLocking := l.stopRenew
  402. setOpts := &etcd.SetOptions{
  403. TTL: l.ttl,
  404. }
  405. for {
  406. setOpts.PrevExist = etcd.PrevNoExist
  407. resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts)
  408. if err != nil {
  409. if etcdError, ok := err.(etcd.Error); ok {
  410. if etcdError.Code != etcd.ErrorCodeNodeExist {
  411. return nil, err
  412. }
  413. setOpts.PrevIndex = ^uint64(0)
  414. }
  415. } else {
  416. setOpts.PrevIndex = resp.Node.ModifiedIndex
  417. }
  418. setOpts.PrevExist = etcd.PrevExist
  419. l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts)
  420. if err == nil {
  421. // Leader section
  422. l.stopLock = stopLocking
  423. go l.holdLock(l.key, lockHeld, stopLocking)
  424. break
  425. } else {
  426. // If this is a legitimate error, return
  427. if etcdError, ok := err.(etcd.Error); ok {
  428. if etcdError.Code != etcd.ErrorCodeTestFailed {
  429. return nil, err
  430. }
  431. }
  432. // Seeker section
  433. errorCh := make(chan error)
  434. chWStop := make(chan bool)
  435. free := make(chan bool)
  436. go l.waitLock(l.key, errorCh, chWStop, free)
  437. // Wait for the key to be available or for
  438. // a signal to stop trying to lock the key
  439. select {
  440. case <-free:
  441. break
  442. case err := <-errorCh:
  443. return nil, err
  444. case <-stopChan:
  445. return nil, ErrAbortTryLock
  446. }
  447. // Delete or Expire event occurred
  448. // Retry
  449. }
  450. }
  451. return lockHeld, nil
  452. }
  453. // Hold the lock as long as we can
  454. // Updates the key ttl periodically until we receive
  455. // an explicit stop signal from the Unlock method
  456. func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) {
  457. defer close(lockHeld)
  458. update := time.NewTicker(l.ttl / 3)
  459. defer update.Stop()
  460. var err error
  461. setOpts := &etcd.SetOptions{TTL: l.ttl}
  462. for {
  463. select {
  464. case <-update.C:
  465. setOpts.PrevIndex = l.last.Node.ModifiedIndex
  466. l.last, err = l.client.Set(context.Background(), key, l.value, setOpts)
  467. if err != nil {
  468. return
  469. }
  470. case <-stopLocking:
  471. return
  472. }
  473. }
  474. }
  475. // WaitLock simply waits for the key to be available for creation
  476. func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) {
  477. opts := &etcd.WatcherOptions{Recursive: false}
  478. watcher := l.client.Watcher(key, opts)
  479. for {
  480. event, err := watcher.Next(context.Background())
  481. if err != nil {
  482. errorCh <- err
  483. return
  484. }
  485. if event.Action == "delete" || event.Action == "expire" {
  486. free <- true
  487. return
  488. }
  489. }
  490. }
  491. // Unlock the "key". Calling unlock while
  492. // not holding the lock will throw an error
  493. func (l *etcdLock) Unlock() error {
  494. if l.stopLock != nil {
  495. l.stopLock <- struct{}{}
  496. }
  497. if l.last != nil {
  498. delOpts := &etcd.DeleteOptions{
  499. PrevIndex: l.last.Node.ModifiedIndex,
  500. }
  501. _, err := l.client.Delete(context.Background(), l.key, delOpts)
  502. if err != nil {
  503. return err
  504. }
  505. }
  506. return nil
  507. }
  508. // Close closes the client connection
  509. func (s *Etcd) Close() {
  510. return
  511. }