datastore.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. package datastore
  2. import (
  3. "fmt"
  4. "log"
  5. "reflect"
  6. "strings"
  7. "sync"
  8. "github.com/docker/libkv"
  9. "github.com/docker/libkv/store"
  10. "github.com/docker/libkv/store/boltdb"
  11. "github.com/docker/libkv/store/consul"
  12. "github.com/docker/libkv/store/etcd"
  13. "github.com/docker/libkv/store/zookeeper"
  14. "github.com/docker/libnetwork/discoverapi"
  15. "github.com/docker/libnetwork/types"
  16. )
  17. //DataStore exported
  18. type DataStore interface {
  19. // GetObject gets data from datastore and unmarshals to the specified object
  20. GetObject(key string, o KVObject) error
  21. // PutObject adds a new Record based on an object into the datastore
  22. PutObject(kvObject KVObject) error
  23. // PutObjectAtomic provides an atomic add and update operation for a Record
  24. PutObjectAtomic(kvObject KVObject) error
  25. // DeleteObject deletes a record
  26. DeleteObject(kvObject KVObject) error
  27. // DeleteObjectAtomic performs an atomic delete operation
  28. DeleteObjectAtomic(kvObject KVObject) error
  29. // DeleteTree deletes a record
  30. DeleteTree(kvObject KVObject) error
  31. // Watchable returns whether the store is watchable or not
  32. Watchable() bool
  33. // Watch for changes on a KVObject
  34. Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
  35. // RestartWatch retriggers stopped Watches
  36. RestartWatch()
  37. // Active returns if the store is active
  38. Active() bool
  39. // List returns of a list of KVObjects belonging to the parent
  40. // key. The caller must pass a KVObject of the same type as
  41. // the objects that need to be listed
  42. List(string, KVObject) ([]KVObject, error)
  43. // Scope returns the scope of the store
  44. Scope() string
  45. // KVStore returns access to the KV Store
  46. KVStore() store.Store
  47. // Close closes the data store
  48. Close()
  49. }
  50. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  51. var (
  52. ErrKeyModified = store.ErrKeyModified
  53. ErrKeyNotFound = store.ErrKeyNotFound
  54. )
  55. type datastore struct {
  56. scope string
  57. store store.Store
  58. cache *cache
  59. watchCh chan struct{}
  60. active bool
  61. sync.Mutex
  62. }
  63. // KVObject is Key/Value interface used by objects to be part of the DataStore
  64. type KVObject interface {
  65. // Key method lets an object to provide the Key to be used in KV Store
  66. Key() []string
  67. // KeyPrefix method lets an object to return immediate parent key that can be used for tree walk
  68. KeyPrefix() []string
  69. // Value method lets an object to marshal its content to be stored in the KV store
  70. Value() []byte
  71. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  72. SetValue([]byte) error
  73. // Index method returns the latest DB Index as seen by the object
  74. Index() uint64
  75. // SetIndex method allows the datastore to store the latest DB Index into the object
  76. SetIndex(uint64)
  77. // True if the object exists in the datastore, false if it hasn't been stored yet.
  78. // When SetIndex() is called, the object has been stored.
  79. Exists() bool
  80. // DataScope indicates the storage scope of the KV object
  81. DataScope() string
  82. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  83. Skip() bool
  84. }
  85. // KVConstructor interface defines methods which can construct a KVObject from another.
  86. type KVConstructor interface {
  87. // New returns a new object which is created based on the
  88. // source object
  89. New() KVObject
  90. // CopyTo deep copies the contents of the implementing object
  91. // to the passed destination object
  92. CopyTo(KVObject) error
  93. }
  94. // ScopeCfg represents Datastore configuration.
  95. type ScopeCfg struct {
  96. Client ScopeClientCfg
  97. }
  98. // ScopeClientCfg represents Datastore Client-only mode configuration
  99. type ScopeClientCfg struct {
  100. Provider string
  101. Address string
  102. Config *store.Config
  103. }
  104. const (
  105. // LocalScope indicates to store the KV object in local datastore such as boltdb
  106. LocalScope = "local"
  107. // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
  108. GlobalScope = "global"
  109. defaultPrefix = "/var/lib/docker/network/files"
  110. )
  111. const (
  112. // NetworkKeyPrefix is the prefix for network key in the kv store
  113. NetworkKeyPrefix = "network"
  114. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  115. EndpointKeyPrefix = "endpoint"
  116. )
  117. var (
  118. defaultScopes = makeDefaultScopes()
  119. )
  120. func makeDefaultScopes() map[string]*ScopeCfg {
  121. def := make(map[string]*ScopeCfg)
  122. def[LocalScope] = &ScopeCfg{
  123. Client: ScopeClientCfg{
  124. Provider: string(store.BOLTDB),
  125. Address: defaultPrefix + "/local-kv.db",
  126. Config: &store.Config{
  127. Bucket: "libnetwork",
  128. },
  129. },
  130. }
  131. return def
  132. }
  133. var defaultRootChain = []string{"docker", "network", "v1.0"}
  134. var rootChain = defaultRootChain
  135. func init() {
  136. consul.Register()
  137. zookeeper.Register()
  138. etcd.Register()
  139. boltdb.Register()
  140. }
  141. // DefaultScopes returns a map of default scopes and it's config for clients to use.
  142. func DefaultScopes(dataDir string) map[string]*ScopeCfg {
  143. if dataDir != "" {
  144. defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/local-kv.db"
  145. return defaultScopes
  146. }
  147. defaultScopes[LocalScope].Client.Address = defaultPrefix + "/local-kv.db"
  148. return defaultScopes
  149. }
  150. // IsValid checks if the scope config has valid configuration.
  151. func (cfg *ScopeCfg) IsValid() bool {
  152. if cfg == nil ||
  153. strings.TrimSpace(cfg.Client.Provider) == "" ||
  154. strings.TrimSpace(cfg.Client.Address) == "" {
  155. return false
  156. }
  157. return true
  158. }
  159. //Key provides convenient method to create a Key
  160. func Key(key ...string) string {
  161. keychain := append(rootChain, key...)
  162. str := strings.Join(keychain, "/")
  163. return str + "/"
  164. }
  165. //ParseKey provides convenient method to unpack the key to complement the Key function
  166. func ParseKey(key string) ([]string, error) {
  167. chain := strings.Split(strings.Trim(key, "/"), "/")
  168. // The key must atleast be equal to the rootChain in order to be considered as valid
  169. if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
  170. return nil, types.BadRequestErrorf("invalid Key : %s", key)
  171. }
  172. return chain[len(rootChain):], nil
  173. }
  174. // newClient used to connect to KV Store
  175. func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
  176. if cached && scope != LocalScope {
  177. return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
  178. }
  179. if config == nil {
  180. config = &store.Config{}
  181. }
  182. var addrs []string
  183. if kv == string(store.BOLTDB) {
  184. // Parse file path
  185. addrs = strings.Split(addr, ",")
  186. } else {
  187. // Parse URI
  188. parts := strings.SplitN(addr, "/", 2)
  189. addrs = strings.Split(parts[0], ",")
  190. // Add the custom prefix to the root chain
  191. if len(parts) == 2 {
  192. rootChain = append([]string{parts[1]}, defaultRootChain...)
  193. }
  194. }
  195. store, err := libkv.NewStore(store.Backend(kv), addrs, config)
  196. if err != nil {
  197. return nil, err
  198. }
  199. ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{})}
  200. if cached {
  201. ds.cache = newCache(ds)
  202. }
  203. return ds, nil
  204. }
  205. // NewDataStore creates a new instance of LibKV data store
  206. func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
  207. if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
  208. c, ok := defaultScopes[scope]
  209. if !ok || c.Client.Provider == "" || c.Client.Address == "" {
  210. return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
  211. }
  212. cfg = c
  213. }
  214. var cached bool
  215. if scope == LocalScope {
  216. cached = true
  217. }
  218. return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
  219. }
  220. // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
  221. func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
  222. var (
  223. ok bool
  224. sCfgP *store.Config
  225. )
  226. sCfgP, ok = dsc.Config.(*store.Config)
  227. if !ok && dsc.Config != nil {
  228. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  229. }
  230. scopeCfg := &ScopeCfg{
  231. Client: ScopeClientCfg{
  232. Address: dsc.Address,
  233. Provider: dsc.Provider,
  234. Config: sCfgP,
  235. },
  236. }
  237. ds, err := NewDataStore(dsc.Scope, scopeCfg)
  238. if err != nil {
  239. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  240. }
  241. return ds, err
  242. }
  243. func (ds *datastore) Close() {
  244. ds.store.Close()
  245. }
  246. func (ds *datastore) Scope() string {
  247. return ds.scope
  248. }
  249. func (ds *datastore) Active() bool {
  250. return ds.active
  251. }
  252. func (ds *datastore) Watchable() bool {
  253. return ds.scope != LocalScope
  254. }
  255. func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
  256. sCh := make(chan struct{})
  257. ctor, ok := kvObject.(KVConstructor)
  258. if !ok {
  259. return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
  260. }
  261. kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
  262. if err != nil {
  263. return nil, err
  264. }
  265. kvoCh := make(chan KVObject)
  266. go func() {
  267. retry_watch:
  268. var err error
  269. // Make sure to get a new instance of watch channel
  270. ds.Lock()
  271. watchCh := ds.watchCh
  272. ds.Unlock()
  273. loop:
  274. for {
  275. select {
  276. case <-stopCh:
  277. close(sCh)
  278. return
  279. case kvPair := <-kvpCh:
  280. // If the backend KV store gets reset libkv's go routine
  281. // for the watch can exit resulting in a nil value in
  282. // channel.
  283. if kvPair == nil {
  284. ds.Lock()
  285. ds.active = false
  286. ds.Unlock()
  287. break loop
  288. }
  289. dstO := ctor.New()
  290. if err = dstO.SetValue(kvPair.Value); err != nil {
  291. log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
  292. break
  293. }
  294. dstO.SetIndex(kvPair.LastIndex)
  295. kvoCh <- dstO
  296. }
  297. }
  298. // Wait on watch channel for a re-trigger when datastore becomes active
  299. <-watchCh
  300. kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
  301. if err != nil {
  302. log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
  303. }
  304. goto retry_watch
  305. }()
  306. return kvoCh, nil
  307. }
  308. func (ds *datastore) RestartWatch() {
  309. ds.Lock()
  310. defer ds.Unlock()
  311. ds.active = true
  312. watchCh := ds.watchCh
  313. ds.watchCh = make(chan struct{})
  314. close(watchCh)
  315. }
  316. func (ds *datastore) KVStore() store.Store {
  317. return ds.store
  318. }
  319. // PutObjectAtomic adds a new Record based on an object into the datastore
  320. func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
  321. var (
  322. previous *store.KVPair
  323. pair *store.KVPair
  324. err error
  325. )
  326. ds.Lock()
  327. defer ds.Unlock()
  328. if kvObject == nil {
  329. return types.BadRequestErrorf("invalid KV Object : nil")
  330. }
  331. kvObjValue := kvObject.Value()
  332. if kvObjValue == nil {
  333. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  334. }
  335. if kvObject.Skip() {
  336. goto add_cache
  337. }
  338. if kvObject.Exists() {
  339. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  340. } else {
  341. previous = nil
  342. }
  343. _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
  344. if err != nil {
  345. return err
  346. }
  347. kvObject.SetIndex(pair.LastIndex)
  348. add_cache:
  349. if ds.cache != nil {
  350. return ds.cache.add(kvObject)
  351. }
  352. return nil
  353. }
  354. // PutObject adds a new Record based on an object into the datastore
  355. func (ds *datastore) PutObject(kvObject KVObject) error {
  356. ds.Lock()
  357. defer ds.Unlock()
  358. if kvObject == nil {
  359. return types.BadRequestErrorf("invalid KV Object : nil")
  360. }
  361. if kvObject.Skip() {
  362. goto add_cache
  363. }
  364. if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
  365. return err
  366. }
  367. add_cache:
  368. if ds.cache != nil {
  369. return ds.cache.add(kvObject)
  370. }
  371. return nil
  372. }
  373. func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
  374. kvObjValue := kvObject.Value()
  375. if kvObjValue == nil {
  376. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  377. }
  378. return ds.store.Put(Key(key...), kvObjValue, nil)
  379. }
  380. // GetObject returns a record matching the key
  381. func (ds *datastore) GetObject(key string, o KVObject) error {
  382. ds.Lock()
  383. defer ds.Unlock()
  384. if ds.cache != nil {
  385. return ds.cache.get(key, o)
  386. }
  387. kvPair, err := ds.store.Get(key)
  388. if err != nil {
  389. return err
  390. }
  391. if err := o.SetValue(kvPair.Value); err != nil {
  392. return err
  393. }
  394. // Make sure the object has a correct view of the DB index in
  395. // case we need to modify it and update the DB.
  396. o.SetIndex(kvPair.LastIndex)
  397. return nil
  398. }
  399. func (ds *datastore) ensureParent(parent string) error {
  400. exists, err := ds.store.Exists(parent)
  401. if err != nil {
  402. return err
  403. }
  404. if exists {
  405. return nil
  406. }
  407. return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
  408. }
  409. func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
  410. ds.Lock()
  411. defer ds.Unlock()
  412. if ds.cache != nil {
  413. return ds.cache.list(kvObject)
  414. }
  415. // Bail out right away if the kvObject does not implement KVConstructor
  416. ctor, ok := kvObject.(KVConstructor)
  417. if !ok {
  418. return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
  419. }
  420. // Make sure the parent key exists
  421. if err := ds.ensureParent(key); err != nil {
  422. return nil, err
  423. }
  424. kvList, err := ds.store.List(key)
  425. if err != nil {
  426. return nil, err
  427. }
  428. var kvol []KVObject
  429. for _, kvPair := range kvList {
  430. if len(kvPair.Value) == 0 {
  431. continue
  432. }
  433. dstO := ctor.New()
  434. if err := dstO.SetValue(kvPair.Value); err != nil {
  435. return nil, err
  436. }
  437. // Make sure the object has a correct view of the DB index in
  438. // case we need to modify it and update the DB.
  439. dstO.SetIndex(kvPair.LastIndex)
  440. kvol = append(kvol, dstO)
  441. }
  442. return kvol, nil
  443. }
  444. // DeleteObject unconditionally deletes a record from the store
  445. func (ds *datastore) DeleteObject(kvObject KVObject) error {
  446. ds.Lock()
  447. defer ds.Unlock()
  448. // cleaup the cache first
  449. if ds.cache != nil {
  450. ds.cache.del(kvObject)
  451. }
  452. if kvObject.Skip() {
  453. return nil
  454. }
  455. return ds.store.Delete(Key(kvObject.Key()...))
  456. }
  457. // DeleteObjectAtomic performs atomic delete on a record
  458. func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
  459. ds.Lock()
  460. defer ds.Unlock()
  461. if kvObject == nil {
  462. return types.BadRequestErrorf("invalid KV Object : nil")
  463. }
  464. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  465. if kvObject.Skip() {
  466. goto del_cache
  467. }
  468. if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  469. return err
  470. }
  471. del_cache:
  472. // cleanup the cache only if AtomicDelete went through successfully
  473. if ds.cache != nil {
  474. return ds.cache.del(kvObject)
  475. }
  476. return nil
  477. }
  478. // DeleteTree unconditionally deletes a record from the store
  479. func (ds *datastore) DeleteTree(kvObject KVObject) error {
  480. ds.Lock()
  481. defer ds.Unlock()
  482. // cleaup the cache first
  483. if ds.cache != nil {
  484. ds.cache.del(kvObject)
  485. }
  486. if kvObject.Skip() {
  487. return nil
  488. }
  489. return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
  490. }