datastore.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. package datastore
  2. import (
  3. "fmt"
  4. "log"
  5. "reflect"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/docker/libkv"
  10. "github.com/docker/libkv/store"
  11. "github.com/docker/libnetwork/discoverapi"
  12. "github.com/docker/libnetwork/types"
  13. )
  14. //DataStore exported
  15. type DataStore interface {
  16. // GetObject gets data from datastore and unmarshals to the specified object
  17. GetObject(key string, o KVObject) error
  18. // PutObject adds a new Record based on an object into the datastore
  19. PutObject(kvObject KVObject) error
  20. // PutObjectAtomic provides an atomic add and update operation for a Record
  21. PutObjectAtomic(kvObject KVObject) error
  22. // DeleteObject deletes a record
  23. DeleteObject(kvObject KVObject) error
  24. // DeleteObjectAtomic performs an atomic delete operation
  25. DeleteObjectAtomic(kvObject KVObject) error
  26. // DeleteTree deletes a record
  27. DeleteTree(kvObject KVObject) error
  28. // Watchable returns whether the store is watchable or not
  29. Watchable() bool
  30. // Watch for changes on a KVObject
  31. Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
  32. // RestartWatch retriggers stopped Watches
  33. RestartWatch()
  34. // Active returns if the store is active
  35. Active() bool
  36. // List returns of a list of KVObjects belonging to the parent
  37. // key. The caller must pass a KVObject of the same type as
  38. // the objects that need to be listed
  39. List(string, KVObject) ([]KVObject, error)
  40. // Map returns a Map of KVObjects
  41. Map(key string, kvObject KVObject) (map[string]KVObject, error)
  42. // Scope returns the scope of the store
  43. Scope() string
  44. // KVStore returns access to the KV Store
  45. KVStore() store.Store
  46. // Close closes the data store
  47. Close()
  48. }
  49. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  50. var (
  51. ErrKeyModified = store.ErrKeyModified
  52. ErrKeyNotFound = store.ErrKeyNotFound
  53. )
  54. type datastore struct {
  55. scope string
  56. store store.Store
  57. cache *cache
  58. watchCh chan struct{}
  59. active bool
  60. sequential bool
  61. sync.Mutex
  62. }
  63. // KVObject is Key/Value interface used by objects to be part of the DataStore
  64. type KVObject interface {
  65. // Key method lets an object provide the Key to be used in KV Store
  66. Key() []string
  67. // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
  68. KeyPrefix() []string
  69. // Value method lets an object marshal its content to be stored in the KV store
  70. Value() []byte
  71. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  72. SetValue([]byte) error
  73. // Index method returns the latest DB Index as seen by the object
  74. Index() uint64
  75. // SetIndex method allows the datastore to store the latest DB Index into the object
  76. SetIndex(uint64)
  77. // True if the object exists in the datastore, false if it hasn't been stored yet.
  78. // When SetIndex() is called, the object has been stored.
  79. Exists() bool
  80. // DataScope indicates the storage scope of the KV object
  81. DataScope() string
  82. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  83. Skip() bool
  84. }
  85. // KVConstructor interface defines methods which can construct a KVObject from another.
  86. type KVConstructor interface {
  87. // New returns a new object which is created based on the
  88. // source object
  89. New() KVObject
  90. // CopyTo deep copies the contents of the implementing object
  91. // to the passed destination object
  92. CopyTo(KVObject) error
  93. }
  94. // ScopeCfg represents Datastore configuration.
  95. type ScopeCfg struct {
  96. Client ScopeClientCfg
  97. }
  98. // ScopeClientCfg represents Datastore Client-only mode configuration
  99. type ScopeClientCfg struct {
  100. Provider string
  101. Address string
  102. Config *store.Config
  103. }
  104. const (
  105. // LocalScope indicates to store the KV object in local datastore such as boltdb
  106. LocalScope = "local"
  107. // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
  108. GlobalScope = "global"
  109. // SwarmScope is not indicating a datastore location. It is defined here
  110. // along with the other two scopes just for consistency.
  111. SwarmScope = "swarm"
  112. defaultPrefix = "/var/lib/docker/network/files"
  113. )
  114. const (
  115. // NetworkKeyPrefix is the prefix for network key in the kv store
  116. NetworkKeyPrefix = "network"
  117. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  118. EndpointKeyPrefix = "endpoint"
  119. )
  120. var (
  121. defaultScopes = makeDefaultScopes()
  122. )
  123. func makeDefaultScopes() map[string]*ScopeCfg {
  124. def := make(map[string]*ScopeCfg)
  125. def[LocalScope] = &ScopeCfg{
  126. Client: ScopeClientCfg{
  127. Provider: string(store.BOLTDB),
  128. Address: defaultPrefix + "/local-kv.db",
  129. Config: &store.Config{
  130. Bucket: "libnetwork",
  131. ConnectionTimeout: time.Minute,
  132. },
  133. },
  134. }
  135. return def
  136. }
  137. var defaultRootChain = []string{"docker", "network", "v1.0"}
  138. var rootChain = defaultRootChain
  139. // DefaultScopes returns a map of default scopes and its config for clients to use.
  140. func DefaultScopes(dataDir string) map[string]*ScopeCfg {
  141. if dataDir != "" {
  142. defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/local-kv.db"
  143. return defaultScopes
  144. }
  145. defaultScopes[LocalScope].Client.Address = defaultPrefix + "/local-kv.db"
  146. return defaultScopes
  147. }
  148. // IsValid checks if the scope config has valid configuration.
  149. func (cfg *ScopeCfg) IsValid() bool {
  150. if cfg == nil ||
  151. strings.TrimSpace(cfg.Client.Provider) == "" ||
  152. strings.TrimSpace(cfg.Client.Address) == "" {
  153. return false
  154. }
  155. return true
  156. }
  157. //Key provides convenient method to create a Key
  158. func Key(key ...string) string {
  159. keychain := append(rootChain, key...)
  160. str := strings.Join(keychain, "/")
  161. return str + "/"
  162. }
  163. //ParseKey provides convenient method to unpack the key to complement the Key function
  164. func ParseKey(key string) ([]string, error) {
  165. chain := strings.Split(strings.Trim(key, "/"), "/")
  166. // The key must atleast be equal to the rootChain in order to be considered as valid
  167. if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
  168. return nil, types.BadRequestErrorf("invalid Key : %s", key)
  169. }
  170. return chain[len(rootChain):], nil
  171. }
  172. // newClient used to connect to KV Store
  173. func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
  174. if cached && scope != LocalScope {
  175. return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
  176. }
  177. sequential := false
  178. if scope == LocalScope {
  179. sequential = true
  180. }
  181. if config == nil {
  182. config = &store.Config{}
  183. }
  184. var addrs []string
  185. if kv == string(store.BOLTDB) {
  186. // Parse file path
  187. addrs = strings.Split(addr, ",")
  188. } else {
  189. // Parse URI
  190. parts := strings.SplitN(addr, "/", 2)
  191. addrs = strings.Split(parts[0], ",")
  192. // Add the custom prefix to the root chain
  193. if len(parts) == 2 {
  194. rootChain = append([]string{parts[1]}, defaultRootChain...)
  195. }
  196. }
  197. store, err := libkv.NewStore(store.Backend(kv), addrs, config)
  198. if err != nil {
  199. return nil, err
  200. }
  201. ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential}
  202. if cached {
  203. ds.cache = newCache(ds)
  204. }
  205. return ds, nil
  206. }
  207. // NewDataStore creates a new instance of LibKV data store
  208. func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
  209. if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
  210. c, ok := defaultScopes[scope]
  211. if !ok || c.Client.Provider == "" || c.Client.Address == "" {
  212. return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
  213. }
  214. cfg = c
  215. }
  216. var cached bool
  217. if scope == LocalScope {
  218. cached = true
  219. }
  220. return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
  221. }
  222. // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
  223. func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
  224. var (
  225. ok bool
  226. sCfgP *store.Config
  227. )
  228. sCfgP, ok = dsc.Config.(*store.Config)
  229. if !ok && dsc.Config != nil {
  230. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  231. }
  232. scopeCfg := &ScopeCfg{
  233. Client: ScopeClientCfg{
  234. Address: dsc.Address,
  235. Provider: dsc.Provider,
  236. Config: sCfgP,
  237. },
  238. }
  239. ds, err := NewDataStore(dsc.Scope, scopeCfg)
  240. if err != nil {
  241. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  242. }
  243. return ds, err
  244. }
  245. func (ds *datastore) Close() {
  246. ds.store.Close()
  247. }
  248. func (ds *datastore) Scope() string {
  249. return ds.scope
  250. }
  251. func (ds *datastore) Active() bool {
  252. return ds.active
  253. }
  254. func (ds *datastore) Watchable() bool {
  255. return ds.scope != LocalScope
  256. }
  257. func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
  258. sCh := make(chan struct{})
  259. ctor, ok := kvObject.(KVConstructor)
  260. if !ok {
  261. return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
  262. }
  263. kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
  264. if err != nil {
  265. return nil, err
  266. }
  267. kvoCh := make(chan KVObject)
  268. go func() {
  269. retry_watch:
  270. var err error
  271. // Make sure to get a new instance of watch channel
  272. ds.Lock()
  273. watchCh := ds.watchCh
  274. ds.Unlock()
  275. loop:
  276. for {
  277. select {
  278. case <-stopCh:
  279. close(sCh)
  280. return
  281. case kvPair := <-kvpCh:
  282. // If the backend KV store gets reset libkv's go routine
  283. // for the watch can exit resulting in a nil value in
  284. // channel.
  285. if kvPair == nil {
  286. ds.Lock()
  287. ds.active = false
  288. ds.Unlock()
  289. break loop
  290. }
  291. dstO := ctor.New()
  292. if err = dstO.SetValue(kvPair.Value); err != nil {
  293. log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
  294. break
  295. }
  296. dstO.SetIndex(kvPair.LastIndex)
  297. kvoCh <- dstO
  298. }
  299. }
  300. // Wait on watch channel for a re-trigger when datastore becomes active
  301. <-watchCh
  302. kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
  303. if err != nil {
  304. log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
  305. }
  306. goto retry_watch
  307. }()
  308. return kvoCh, nil
  309. }
  310. func (ds *datastore) RestartWatch() {
  311. ds.Lock()
  312. defer ds.Unlock()
  313. ds.active = true
  314. watchCh := ds.watchCh
  315. ds.watchCh = make(chan struct{})
  316. close(watchCh)
  317. }
  318. func (ds *datastore) KVStore() store.Store {
  319. return ds.store
  320. }
  321. // PutObjectAtomic adds a new Record based on an object into the datastore
  322. func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
  323. var (
  324. previous *store.KVPair
  325. pair *store.KVPair
  326. err error
  327. )
  328. if ds.sequential {
  329. ds.Lock()
  330. defer ds.Unlock()
  331. }
  332. if kvObject == nil {
  333. return types.BadRequestErrorf("invalid KV Object : nil")
  334. }
  335. kvObjValue := kvObject.Value()
  336. if kvObjValue == nil {
  337. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  338. }
  339. if kvObject.Skip() {
  340. goto add_cache
  341. }
  342. if kvObject.Exists() {
  343. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  344. } else {
  345. previous = nil
  346. }
  347. _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
  348. if err != nil {
  349. if err == store.ErrKeyExists {
  350. return ErrKeyModified
  351. }
  352. return err
  353. }
  354. kvObject.SetIndex(pair.LastIndex)
  355. add_cache:
  356. if ds.cache != nil {
  357. // If persistent store is skipped, sequencing needs to
  358. // happen in cache.
  359. return ds.cache.add(kvObject, kvObject.Skip())
  360. }
  361. return nil
  362. }
  363. // PutObject adds a new Record based on an object into the datastore
  364. func (ds *datastore) PutObject(kvObject KVObject) error {
  365. if ds.sequential {
  366. ds.Lock()
  367. defer ds.Unlock()
  368. }
  369. if kvObject == nil {
  370. return types.BadRequestErrorf("invalid KV Object : nil")
  371. }
  372. if kvObject.Skip() {
  373. goto add_cache
  374. }
  375. if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
  376. return err
  377. }
  378. add_cache:
  379. if ds.cache != nil {
  380. // If persistent store is skipped, sequencing needs to
  381. // happen in cache.
  382. return ds.cache.add(kvObject, kvObject.Skip())
  383. }
  384. return nil
  385. }
  386. func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
  387. kvObjValue := kvObject.Value()
  388. if kvObjValue == nil {
  389. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  390. }
  391. return ds.store.Put(Key(key...), kvObjValue, nil)
  392. }
  393. // GetObject returns a record matching the key
  394. func (ds *datastore) GetObject(key string, o KVObject) error {
  395. if ds.sequential {
  396. ds.Lock()
  397. defer ds.Unlock()
  398. }
  399. if ds.cache != nil {
  400. return ds.cache.get(key, o)
  401. }
  402. kvPair, err := ds.store.Get(key)
  403. if err != nil {
  404. return err
  405. }
  406. if err := o.SetValue(kvPair.Value); err != nil {
  407. return err
  408. }
  409. // Make sure the object has a correct view of the DB index in
  410. // case we need to modify it and update the DB.
  411. o.SetIndex(kvPair.LastIndex)
  412. return nil
  413. }
  414. func (ds *datastore) ensureParent(parent string) error {
  415. exists, err := ds.store.Exists(parent)
  416. if err != nil {
  417. return err
  418. }
  419. if exists {
  420. return nil
  421. }
  422. return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
  423. }
  424. func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
  425. if ds.sequential {
  426. ds.Lock()
  427. defer ds.Unlock()
  428. }
  429. if ds.cache != nil {
  430. return ds.cache.list(kvObject)
  431. }
  432. var kvol []KVObject
  433. cb := func(key string, val KVObject) {
  434. kvol = append(kvol, val)
  435. }
  436. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  437. if err != nil {
  438. return nil, err
  439. }
  440. return kvol, nil
  441. }
  442. func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
  443. // Bail out right away if the kvObject does not implement KVConstructor
  444. ctor, ok := kvObject.(KVConstructor)
  445. if !ok {
  446. return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
  447. }
  448. // Make sure the parent key exists
  449. if err := ds.ensureParent(key); err != nil {
  450. return err
  451. }
  452. kvList, err := ds.store.List(key)
  453. if err != nil {
  454. return err
  455. }
  456. for _, kvPair := range kvList {
  457. if len(kvPair.Value) == 0 {
  458. continue
  459. }
  460. dstO := ctor.New()
  461. if err := dstO.SetValue(kvPair.Value); err != nil {
  462. return err
  463. }
  464. // Make sure the object has a correct view of the DB index in
  465. // case we need to modify it and update the DB.
  466. dstO.SetIndex(kvPair.LastIndex)
  467. callback(kvPair.Key, dstO)
  468. }
  469. return nil
  470. }
  471. func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
  472. if ds.sequential {
  473. ds.Lock()
  474. defer ds.Unlock()
  475. }
  476. kvol := make(map[string]KVObject)
  477. cb := func(key string, val KVObject) {
  478. // Trim the leading & trailing "/" to make it consistent across all stores
  479. kvol[strings.Trim(key, "/")] = val
  480. }
  481. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  482. if err != nil {
  483. return nil, err
  484. }
  485. return kvol, nil
  486. }
  487. // DeleteObject unconditionally deletes a record from the store
  488. func (ds *datastore) DeleteObject(kvObject KVObject) error {
  489. if ds.sequential {
  490. ds.Lock()
  491. defer ds.Unlock()
  492. }
  493. // cleaup the cache first
  494. if ds.cache != nil {
  495. // If persistent store is skipped, sequencing needs to
  496. // happen in cache.
  497. ds.cache.del(kvObject, kvObject.Skip())
  498. }
  499. if kvObject.Skip() {
  500. return nil
  501. }
  502. return ds.store.Delete(Key(kvObject.Key()...))
  503. }
  504. // DeleteObjectAtomic performs atomic delete on a record
  505. func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
  506. if ds.sequential {
  507. ds.Lock()
  508. defer ds.Unlock()
  509. }
  510. if kvObject == nil {
  511. return types.BadRequestErrorf("invalid KV Object : nil")
  512. }
  513. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  514. if kvObject.Skip() {
  515. goto del_cache
  516. }
  517. if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  518. if err == store.ErrKeyExists {
  519. return ErrKeyModified
  520. }
  521. return err
  522. }
  523. del_cache:
  524. // cleanup the cache only if AtomicDelete went through successfully
  525. if ds.cache != nil {
  526. // If persistent store is skipped, sequencing needs to
  527. // happen in cache.
  528. return ds.cache.del(kvObject, kvObject.Skip())
  529. }
  530. return nil
  531. }
  532. // DeleteTree unconditionally deletes a record from the store
  533. func (ds *datastore) DeleteTree(kvObject KVObject) error {
  534. if ds.sequential {
  535. ds.Lock()
  536. defer ds.Unlock()
  537. }
  538. // cleaup the cache first
  539. if ds.cache != nil {
  540. // If persistent store is skipped, sequencing needs to
  541. // happen in cache.
  542. ds.cache.del(kvObject, kvObject.Skip())
  543. }
  544. if kvObject.Skip() {
  545. return nil
  546. }
  547. return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
  548. }