datastore.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. package datastore
  2. import (
  3. "fmt"
  4. "log"
  5. "reflect"
  6. "strings"
  7. "sync"
  8. "github.com/docker/libkv"
  9. "github.com/docker/libkv/store"
  10. "github.com/docker/libnetwork/discoverapi"
  11. "github.com/docker/libnetwork/types"
  12. )
  13. //DataStore exported
  14. type DataStore interface {
  15. // GetObject gets data from datastore and unmarshals to the specified object
  16. GetObject(key string, o KVObject) error
  17. // PutObject adds a new Record based on an object into the datastore
  18. PutObject(kvObject KVObject) error
  19. // PutObjectAtomic provides an atomic add and update operation for a Record
  20. PutObjectAtomic(kvObject KVObject) error
  21. // DeleteObject deletes a record
  22. DeleteObject(kvObject KVObject) error
  23. // DeleteObjectAtomic performs an atomic delete operation
  24. DeleteObjectAtomic(kvObject KVObject) error
  25. // DeleteTree deletes a record
  26. DeleteTree(kvObject KVObject) error
  27. // Watchable returns whether the store is watchable or not
  28. Watchable() bool
  29. // Watch for changes on a KVObject
  30. Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
  31. // RestartWatch retriggers stopped Watches
  32. RestartWatch()
  33. // Active returns if the store is active
  34. Active() bool
  35. // List returns of a list of KVObjects belonging to the parent
  36. // key. The caller must pass a KVObject of the same type as
  37. // the objects that need to be listed
  38. List(string, KVObject) ([]KVObject, error)
  39. // Scope returns the scope of the store
  40. Scope() string
  41. // KVStore returns access to the KV Store
  42. KVStore() store.Store
  43. // Close closes the data store
  44. Close()
  45. }
  46. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  47. var (
  48. ErrKeyModified = store.ErrKeyModified
  49. ErrKeyNotFound = store.ErrKeyNotFound
  50. )
  51. type datastore struct {
  52. scope string
  53. store store.Store
  54. cache *cache
  55. watchCh chan struct{}
  56. active bool
  57. sequential bool
  58. sync.Mutex
  59. }
  60. // KVObject is Key/Value interface used by objects to be part of the DataStore
  61. type KVObject interface {
  62. // Key method lets an object to provide the Key to be used in KV Store
  63. Key() []string
  64. // KeyPrefix method lets an object to return immediate parent key that can be used for tree walk
  65. KeyPrefix() []string
  66. // Value method lets an object to marshal its content to be stored in the KV store
  67. Value() []byte
  68. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  69. SetValue([]byte) error
  70. // Index method returns the latest DB Index as seen by the object
  71. Index() uint64
  72. // SetIndex method allows the datastore to store the latest DB Index into the object
  73. SetIndex(uint64)
  74. // True if the object exists in the datastore, false if it hasn't been stored yet.
  75. // When SetIndex() is called, the object has been stored.
  76. Exists() bool
  77. // DataScope indicates the storage scope of the KV object
  78. DataScope() string
  79. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  80. Skip() bool
  81. }
  82. // KVConstructor interface defines methods which can construct a KVObject from another.
  83. type KVConstructor interface {
  84. // New returns a new object which is created based on the
  85. // source object
  86. New() KVObject
  87. // CopyTo deep copies the contents of the implementing object
  88. // to the passed destination object
  89. CopyTo(KVObject) error
  90. }
  91. // ScopeCfg represents Datastore configuration.
  92. type ScopeCfg struct {
  93. Client ScopeClientCfg
  94. }
  95. // ScopeClientCfg represents Datastore Client-only mode configuration
  96. type ScopeClientCfg struct {
  97. Provider string
  98. Address string
  99. Config *store.Config
  100. }
  101. const (
  102. // LocalScope indicates to store the KV object in local datastore such as boltdb
  103. LocalScope = "local"
  104. // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
  105. GlobalScope = "global"
  106. defaultPrefix = "/var/lib/docker/network/files"
  107. )
  108. const (
  109. // NetworkKeyPrefix is the prefix for network key in the kv store
  110. NetworkKeyPrefix = "network"
  111. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  112. EndpointKeyPrefix = "endpoint"
  113. )
  114. var (
  115. defaultScopes = makeDefaultScopes()
  116. )
  117. func makeDefaultScopes() map[string]*ScopeCfg {
  118. def := make(map[string]*ScopeCfg)
  119. def[LocalScope] = &ScopeCfg{
  120. Client: ScopeClientCfg{
  121. Provider: string(store.BOLTDB),
  122. Address: defaultPrefix + "/local-kv.db",
  123. Config: &store.Config{
  124. Bucket: "libnetwork",
  125. },
  126. },
  127. }
  128. return def
  129. }
  130. var defaultRootChain = []string{"docker", "network", "v1.0"}
  131. var rootChain = defaultRootChain
  132. // DefaultScopes returns a map of default scopes and it's config for clients to use.
  133. func DefaultScopes(dataDir string) map[string]*ScopeCfg {
  134. if dataDir != "" {
  135. defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/local-kv.db"
  136. return defaultScopes
  137. }
  138. defaultScopes[LocalScope].Client.Address = defaultPrefix + "/local-kv.db"
  139. return defaultScopes
  140. }
  141. // IsValid checks if the scope config has valid configuration.
  142. func (cfg *ScopeCfg) IsValid() bool {
  143. if cfg == nil ||
  144. strings.TrimSpace(cfg.Client.Provider) == "" ||
  145. strings.TrimSpace(cfg.Client.Address) == "" {
  146. return false
  147. }
  148. return true
  149. }
  150. //Key provides convenient method to create a Key
  151. func Key(key ...string) string {
  152. keychain := append(rootChain, key...)
  153. str := strings.Join(keychain, "/")
  154. return str + "/"
  155. }
  156. //ParseKey provides convenient method to unpack the key to complement the Key function
  157. func ParseKey(key string) ([]string, error) {
  158. chain := strings.Split(strings.Trim(key, "/"), "/")
  159. // The key must atleast be equal to the rootChain in order to be considered as valid
  160. if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
  161. return nil, types.BadRequestErrorf("invalid Key : %s", key)
  162. }
  163. return chain[len(rootChain):], nil
  164. }
  165. // newClient used to connect to KV Store
  166. func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
  167. if cached && scope != LocalScope {
  168. return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
  169. }
  170. sequential := false
  171. if scope == LocalScope {
  172. sequential = true
  173. }
  174. if config == nil {
  175. config = &store.Config{}
  176. }
  177. var addrs []string
  178. if kv == string(store.BOLTDB) {
  179. // Parse file path
  180. addrs = strings.Split(addr, ",")
  181. } else {
  182. // Parse URI
  183. parts := strings.SplitN(addr, "/", 2)
  184. addrs = strings.Split(parts[0], ",")
  185. // Add the custom prefix to the root chain
  186. if len(parts) == 2 {
  187. rootChain = append([]string{parts[1]}, defaultRootChain...)
  188. }
  189. }
  190. store, err := libkv.NewStore(store.Backend(kv), addrs, config)
  191. if err != nil {
  192. return nil, err
  193. }
  194. ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential}
  195. if cached {
  196. ds.cache = newCache(ds)
  197. }
  198. return ds, nil
  199. }
  200. // NewDataStore creates a new instance of LibKV data store
  201. func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
  202. if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
  203. c, ok := defaultScopes[scope]
  204. if !ok || c.Client.Provider == "" || c.Client.Address == "" {
  205. return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
  206. }
  207. cfg = c
  208. }
  209. var cached bool
  210. if scope == LocalScope {
  211. cached = true
  212. }
  213. return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
  214. }
  215. // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
  216. func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
  217. var (
  218. ok bool
  219. sCfgP *store.Config
  220. )
  221. sCfgP, ok = dsc.Config.(*store.Config)
  222. if !ok && dsc.Config != nil {
  223. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  224. }
  225. scopeCfg := &ScopeCfg{
  226. Client: ScopeClientCfg{
  227. Address: dsc.Address,
  228. Provider: dsc.Provider,
  229. Config: sCfgP,
  230. },
  231. }
  232. ds, err := NewDataStore(dsc.Scope, scopeCfg)
  233. if err != nil {
  234. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  235. }
  236. return ds, err
  237. }
  238. func (ds *datastore) Close() {
  239. ds.store.Close()
  240. }
  241. func (ds *datastore) Scope() string {
  242. return ds.scope
  243. }
  244. func (ds *datastore) Active() bool {
  245. return ds.active
  246. }
  247. func (ds *datastore) Watchable() bool {
  248. return ds.scope != LocalScope
  249. }
  250. func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
  251. sCh := make(chan struct{})
  252. ctor, ok := kvObject.(KVConstructor)
  253. if !ok {
  254. return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
  255. }
  256. kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
  257. if err != nil {
  258. return nil, err
  259. }
  260. kvoCh := make(chan KVObject)
  261. go func() {
  262. retry_watch:
  263. var err error
  264. // Make sure to get a new instance of watch channel
  265. ds.Lock()
  266. watchCh := ds.watchCh
  267. ds.Unlock()
  268. loop:
  269. for {
  270. select {
  271. case <-stopCh:
  272. close(sCh)
  273. return
  274. case kvPair := <-kvpCh:
  275. // If the backend KV store gets reset libkv's go routine
  276. // for the watch can exit resulting in a nil value in
  277. // channel.
  278. if kvPair == nil {
  279. ds.Lock()
  280. ds.active = false
  281. ds.Unlock()
  282. break loop
  283. }
  284. dstO := ctor.New()
  285. if err = dstO.SetValue(kvPair.Value); err != nil {
  286. log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
  287. break
  288. }
  289. dstO.SetIndex(kvPair.LastIndex)
  290. kvoCh <- dstO
  291. }
  292. }
  293. // Wait on watch channel for a re-trigger when datastore becomes active
  294. <-watchCh
  295. kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
  296. if err != nil {
  297. log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
  298. }
  299. goto retry_watch
  300. }()
  301. return kvoCh, nil
  302. }
  303. func (ds *datastore) RestartWatch() {
  304. ds.Lock()
  305. defer ds.Unlock()
  306. ds.active = true
  307. watchCh := ds.watchCh
  308. ds.watchCh = make(chan struct{})
  309. close(watchCh)
  310. }
  311. func (ds *datastore) KVStore() store.Store {
  312. return ds.store
  313. }
  314. // PutObjectAtomic adds a new Record based on an object into the datastore
  315. func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
  316. var (
  317. previous *store.KVPair
  318. pair *store.KVPair
  319. err error
  320. )
  321. if ds.sequential {
  322. ds.Lock()
  323. defer ds.Unlock()
  324. }
  325. if kvObject == nil {
  326. return types.BadRequestErrorf("invalid KV Object : nil")
  327. }
  328. kvObjValue := kvObject.Value()
  329. if kvObjValue == nil {
  330. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  331. }
  332. if kvObject.Skip() {
  333. goto add_cache
  334. }
  335. if kvObject.Exists() {
  336. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  337. } else {
  338. previous = nil
  339. }
  340. _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
  341. if err != nil {
  342. if err == store.ErrKeyExists {
  343. return ErrKeyModified
  344. }
  345. return err
  346. }
  347. kvObject.SetIndex(pair.LastIndex)
  348. add_cache:
  349. if ds.cache != nil {
  350. // If persistent store is skipped, sequencing needs to
  351. // happen in cache.
  352. return ds.cache.add(kvObject, kvObject.Skip())
  353. }
  354. return nil
  355. }
  356. // PutObject adds a new Record based on an object into the datastore
  357. func (ds *datastore) PutObject(kvObject KVObject) error {
  358. if ds.sequential {
  359. ds.Lock()
  360. defer ds.Unlock()
  361. }
  362. if kvObject == nil {
  363. return types.BadRequestErrorf("invalid KV Object : nil")
  364. }
  365. if kvObject.Skip() {
  366. goto add_cache
  367. }
  368. if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
  369. return err
  370. }
  371. add_cache:
  372. if ds.cache != nil {
  373. // If persistent store is skipped, sequencing needs to
  374. // happen in cache.
  375. return ds.cache.add(kvObject, kvObject.Skip())
  376. }
  377. return nil
  378. }
  379. func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
  380. kvObjValue := kvObject.Value()
  381. if kvObjValue == nil {
  382. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  383. }
  384. return ds.store.Put(Key(key...), kvObjValue, nil)
  385. }
  386. // GetObject returns a record matching the key
  387. func (ds *datastore) GetObject(key string, o KVObject) error {
  388. if ds.sequential {
  389. ds.Lock()
  390. defer ds.Unlock()
  391. }
  392. if ds.cache != nil {
  393. return ds.cache.get(key, o)
  394. }
  395. kvPair, err := ds.store.Get(key)
  396. if err != nil {
  397. return err
  398. }
  399. if err := o.SetValue(kvPair.Value); err != nil {
  400. return err
  401. }
  402. // Make sure the object has a correct view of the DB index in
  403. // case we need to modify it and update the DB.
  404. o.SetIndex(kvPair.LastIndex)
  405. return nil
  406. }
  407. func (ds *datastore) ensureParent(parent string) error {
  408. exists, err := ds.store.Exists(parent)
  409. if err != nil {
  410. return err
  411. }
  412. if exists {
  413. return nil
  414. }
  415. return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
  416. }
  417. func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
  418. if ds.sequential {
  419. ds.Lock()
  420. defer ds.Unlock()
  421. }
  422. if ds.cache != nil {
  423. return ds.cache.list(kvObject)
  424. }
  425. // Bail out right away if the kvObject does not implement KVConstructor
  426. ctor, ok := kvObject.(KVConstructor)
  427. if !ok {
  428. return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
  429. }
  430. // Make sure the parent key exists
  431. if err := ds.ensureParent(key); err != nil {
  432. return nil, err
  433. }
  434. kvList, err := ds.store.List(key)
  435. if err != nil {
  436. return nil, err
  437. }
  438. var kvol []KVObject
  439. for _, kvPair := range kvList {
  440. if len(kvPair.Value) == 0 {
  441. continue
  442. }
  443. dstO := ctor.New()
  444. if err := dstO.SetValue(kvPair.Value); err != nil {
  445. return nil, err
  446. }
  447. // Make sure the object has a correct view of the DB index in
  448. // case we need to modify it and update the DB.
  449. dstO.SetIndex(kvPair.LastIndex)
  450. kvol = append(kvol, dstO)
  451. }
  452. return kvol, nil
  453. }
  454. // DeleteObject unconditionally deletes a record from the store
  455. func (ds *datastore) DeleteObject(kvObject KVObject) error {
  456. if ds.sequential {
  457. ds.Lock()
  458. defer ds.Unlock()
  459. }
  460. // cleaup the cache first
  461. if ds.cache != nil {
  462. // If persistent store is skipped, sequencing needs to
  463. // happen in cache.
  464. ds.cache.del(kvObject, kvObject.Skip())
  465. }
  466. if kvObject.Skip() {
  467. return nil
  468. }
  469. return ds.store.Delete(Key(kvObject.Key()...))
  470. }
  471. // DeleteObjectAtomic performs atomic delete on a record
  472. func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
  473. if ds.sequential {
  474. ds.Lock()
  475. defer ds.Unlock()
  476. }
  477. if kvObject == nil {
  478. return types.BadRequestErrorf("invalid KV Object : nil")
  479. }
  480. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  481. if kvObject.Skip() {
  482. goto del_cache
  483. }
  484. if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  485. if err == store.ErrKeyExists {
  486. return ErrKeyModified
  487. }
  488. return err
  489. }
  490. del_cache:
  491. // cleanup the cache only if AtomicDelete went through successfully
  492. if ds.cache != nil {
  493. // If persistent store is skipped, sequencing needs to
  494. // happen in cache.
  495. return ds.cache.del(kvObject, kvObject.Skip())
  496. }
  497. return nil
  498. }
  499. // DeleteTree unconditionally deletes a record from the store
  500. func (ds *datastore) DeleteTree(kvObject KVObject) error {
  501. if ds.sequential {
  502. ds.Lock()
  503. defer ds.Unlock()
  504. }
  505. // cleaup the cache first
  506. if ds.cache != nil {
  507. // If persistent store is skipped, sequencing needs to
  508. // happen in cache.
  509. ds.cache.del(kvObject, kvObject.Skip())
  510. }
  511. if kvObject.Skip() {
  512. return nil
  513. }
  514. return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
  515. }