datastore.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package datastore
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/docker/docker/libnetwork/discoverapi"
  9. store "github.com/docker/docker/libnetwork/internal/kvstore"
  10. "github.com/docker/docker/libnetwork/types"
  11. )
  12. // DataStore exported
  13. type DataStore interface {
  14. // GetObject gets data from datastore and unmarshals to the specified object
  15. GetObject(key string, o KVObject) error
  16. // PutObjectAtomic provides an atomic add and update operation for a Record
  17. PutObjectAtomic(kvObject KVObject) error
  18. // DeleteObjectAtomic performs an atomic delete operation
  19. DeleteObjectAtomic(kvObject KVObject) error
  20. // DeleteTree deletes a record
  21. DeleteTree(kvObject KVObject) error
  22. // List returns of a list of KVObjects belonging to the parent
  23. // key. The caller must pass a KVObject of the same type as
  24. // the objects that need to be listed
  25. List(string, KVObject) ([]KVObject, error)
  26. // Map returns a Map of KVObjects
  27. Map(key string, kvObject KVObject) (map[string]KVObject, error)
  28. // Scope returns the scope of the store
  29. Scope() string
  30. // KVStore returns access to the KV Store
  31. KVStore() store.Store
  32. // Close closes the data store
  33. Close()
  34. }
  35. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  36. var (
  37. ErrKeyModified = store.ErrKeyModified
  38. ErrKeyNotFound = store.ErrKeyNotFound
  39. )
  40. type datastore struct {
  41. scope string
  42. store store.Store
  43. cache *cache
  44. sync.Mutex
  45. }
  46. // KVObject is Key/Value interface used by objects to be part of the DataStore
  47. type KVObject interface {
  48. // Key method lets an object provide the Key to be used in KV Store
  49. Key() []string
  50. // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
  51. KeyPrefix() []string
  52. // Value method lets an object marshal its content to be stored in the KV store
  53. Value() []byte
  54. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  55. SetValue([]byte) error
  56. // Index method returns the latest DB Index as seen by the object
  57. Index() uint64
  58. // SetIndex method allows the datastore to store the latest DB Index into the object
  59. SetIndex(uint64)
  60. // True if the object exists in the datastore, false if it hasn't been stored yet.
  61. // When SetIndex() is called, the object has been stored.
  62. Exists() bool
  63. // DataScope indicates the storage scope of the KV object
  64. DataScope() string
  65. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  66. Skip() bool
  67. }
  68. // KVConstructor interface defines methods which can construct a KVObject from another.
  69. type KVConstructor interface {
  70. // New returns a new object which is created based on the
  71. // source object
  72. New() KVObject
  73. // CopyTo deep copies the contents of the implementing object
  74. // to the passed destination object
  75. CopyTo(KVObject) error
  76. }
  77. // ScopeCfg represents Datastore configuration.
  78. type ScopeCfg struct {
  79. Client ScopeClientCfg
  80. }
  81. // ScopeClientCfg represents Datastore Client-only mode configuration
  82. type ScopeClientCfg struct {
  83. Provider string
  84. Address string
  85. Config *store.Config
  86. }
  87. const (
  88. // LocalScope indicates to store the KV object in local datastore such as boltdb
  89. LocalScope = "local"
  90. // GlobalScope indicates to store the KV object in global datastore
  91. GlobalScope = "global"
  92. // SwarmScope is not indicating a datastore location. It is defined here
  93. // along with the other two scopes just for consistency.
  94. SwarmScope = "swarm"
  95. defaultPrefix = "/var/lib/docker/network/files"
  96. )
  97. const (
  98. // NetworkKeyPrefix is the prefix for network key in the kv store
  99. NetworkKeyPrefix = "network"
  100. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  101. EndpointKeyPrefix = "endpoint"
  102. )
  103. var (
  104. defaultRootChain = []string{"docker", "network", "v1.0"}
  105. rootChain = defaultRootChain
  106. )
  107. // DefaultScope returns a default scope config for clients to use.
  108. func DefaultScope(dataDir string) ScopeCfg {
  109. var dbpath string
  110. if dataDir == "" {
  111. dbpath = defaultPrefix + "/local-kv.db"
  112. } else {
  113. dbpath = dataDir + "/network/files/local-kv.db"
  114. }
  115. return ScopeCfg{
  116. Client: ScopeClientCfg{
  117. Provider: string(store.BOLTDB),
  118. Address: dbpath,
  119. Config: &store.Config{
  120. Bucket: "libnetwork",
  121. ConnectionTimeout: time.Minute,
  122. },
  123. },
  124. }
  125. }
  126. // IsValid checks if the scope config has valid configuration.
  127. func (cfg *ScopeCfg) IsValid() bool {
  128. if cfg == nil ||
  129. strings.TrimSpace(cfg.Client.Provider) == "" ||
  130. strings.TrimSpace(cfg.Client.Address) == "" {
  131. return false
  132. }
  133. return true
  134. }
  135. // Key provides convenient method to create a Key
  136. func Key(key ...string) string {
  137. keychain := append(rootChain, key...)
  138. str := strings.Join(keychain, "/")
  139. return str + "/"
  140. }
  141. // ParseKey provides convenient method to unpack the key to complement the Key function
  142. func ParseKey(key string) ([]string, error) {
  143. chain := strings.Split(strings.Trim(key, "/"), "/")
  144. // The key must at least be equal to the rootChain in order to be considered as valid
  145. if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
  146. return nil, types.BadRequestErrorf("invalid Key : %s", key)
  147. }
  148. return chain[len(rootChain):], nil
  149. }
  150. // newClient used to connect to KV Store
  151. func newClient(kv string, addr string, config *store.Config) (DataStore, error) {
  152. if config == nil {
  153. config = &store.Config{}
  154. }
  155. var addrs []string
  156. if kv == string(store.BOLTDB) {
  157. // Parse file path
  158. addrs = strings.Split(addr, ",")
  159. } else {
  160. // Parse URI
  161. parts := strings.SplitN(addr, "/", 2)
  162. addrs = strings.Split(parts[0], ",")
  163. // Add the custom prefix to the root chain
  164. if len(parts) == 2 {
  165. rootChain = append([]string{parts[1]}, defaultRootChain...)
  166. }
  167. }
  168. s, err := store.New(store.Backend(kv), addrs, config)
  169. if err != nil {
  170. return nil, err
  171. }
  172. ds := &datastore{scope: LocalScope, store: s}
  173. ds.cache = newCache(ds)
  174. return ds, nil
  175. }
  176. // NewDataStore creates a new instance of LibKV data store
  177. func NewDataStore(cfg ScopeCfg) (DataStore, error) {
  178. if cfg.Client.Provider == "" || cfg.Client.Address == "" {
  179. cfg = DefaultScope("")
  180. }
  181. return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
  182. }
  183. // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
  184. func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
  185. var (
  186. ok bool
  187. sCfgP *store.Config
  188. )
  189. sCfgP, ok = dsc.Config.(*store.Config)
  190. if !ok && dsc.Config != nil {
  191. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  192. }
  193. scopeCfg := ScopeCfg{
  194. Client: ScopeClientCfg{
  195. Address: dsc.Address,
  196. Provider: dsc.Provider,
  197. Config: sCfgP,
  198. },
  199. }
  200. ds, err := NewDataStore(scopeCfg)
  201. if err != nil {
  202. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  203. }
  204. return ds, err
  205. }
  206. func (ds *datastore) Close() {
  207. ds.store.Close()
  208. }
  209. func (ds *datastore) Scope() string {
  210. return ds.scope
  211. }
  212. func (ds *datastore) KVStore() store.Store {
  213. return ds.store
  214. }
  215. // PutObjectAtomic adds a new Record based on an object into the datastore
  216. func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
  217. var (
  218. previous *store.KVPair
  219. pair *store.KVPair
  220. err error
  221. )
  222. ds.Lock()
  223. defer ds.Unlock()
  224. if kvObject == nil {
  225. return types.BadRequestErrorf("invalid KV Object : nil")
  226. }
  227. kvObjValue := kvObject.Value()
  228. if kvObjValue == nil {
  229. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  230. }
  231. if kvObject.Skip() {
  232. goto add_cache
  233. }
  234. if kvObject.Exists() {
  235. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  236. } else {
  237. previous = nil
  238. }
  239. pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
  240. if err != nil {
  241. if err == store.ErrKeyExists {
  242. return ErrKeyModified
  243. }
  244. return err
  245. }
  246. kvObject.SetIndex(pair.LastIndex)
  247. add_cache:
  248. if ds.cache != nil {
  249. // If persistent store is skipped, sequencing needs to
  250. // happen in cache.
  251. return ds.cache.add(kvObject, kvObject.Skip())
  252. }
  253. return nil
  254. }
  255. // GetObject returns a record matching the key
  256. func (ds *datastore) GetObject(key string, o KVObject) error {
  257. ds.Lock()
  258. defer ds.Unlock()
  259. if ds.cache != nil {
  260. return ds.cache.get(o)
  261. }
  262. kvPair, err := ds.store.Get(key)
  263. if err != nil {
  264. return err
  265. }
  266. if err := o.SetValue(kvPair.Value); err != nil {
  267. return err
  268. }
  269. // Make sure the object has a correct view of the DB index in
  270. // case we need to modify it and update the DB.
  271. o.SetIndex(kvPair.LastIndex)
  272. return nil
  273. }
  274. func (ds *datastore) ensureParent(parent string) error {
  275. exists, err := ds.store.Exists(parent)
  276. if err != nil {
  277. return err
  278. }
  279. if exists {
  280. return nil
  281. }
  282. return ds.store.Put(parent, []byte{})
  283. }
  284. func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
  285. ds.Lock()
  286. defer ds.Unlock()
  287. if ds.cache != nil {
  288. return ds.cache.list(kvObject)
  289. }
  290. var kvol []KVObject
  291. cb := func(key string, val KVObject) {
  292. kvol = append(kvol, val)
  293. }
  294. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  295. if err != nil {
  296. return nil, err
  297. }
  298. return kvol, nil
  299. }
  300. func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
  301. // Bail out right away if the kvObject does not implement KVConstructor
  302. ctor, ok := kvObject.(KVConstructor)
  303. if !ok {
  304. return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
  305. }
  306. // Make sure the parent key exists
  307. if err := ds.ensureParent(key); err != nil {
  308. return err
  309. }
  310. kvList, err := ds.store.List(key)
  311. if err != nil {
  312. return err
  313. }
  314. for _, kvPair := range kvList {
  315. if len(kvPair.Value) == 0 {
  316. continue
  317. }
  318. dstO := ctor.New()
  319. if err := dstO.SetValue(kvPair.Value); err != nil {
  320. return err
  321. }
  322. // Make sure the object has a correct view of the DB index in
  323. // case we need to modify it and update the DB.
  324. dstO.SetIndex(kvPair.LastIndex)
  325. callback(kvPair.Key, dstO)
  326. }
  327. return nil
  328. }
  329. func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
  330. ds.Lock()
  331. defer ds.Unlock()
  332. kvol := make(map[string]KVObject)
  333. cb := func(key string, val KVObject) {
  334. // Trim the leading & trailing "/" to make it consistent across all stores
  335. kvol[strings.Trim(key, "/")] = val
  336. }
  337. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  338. if err != nil {
  339. return nil, err
  340. }
  341. return kvol, nil
  342. }
  343. // DeleteObjectAtomic performs atomic delete on a record
  344. func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
  345. ds.Lock()
  346. defer ds.Unlock()
  347. if kvObject == nil {
  348. return types.BadRequestErrorf("invalid KV Object : nil")
  349. }
  350. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  351. if kvObject.Skip() {
  352. goto del_cache
  353. }
  354. if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  355. if err == store.ErrKeyExists {
  356. return ErrKeyModified
  357. }
  358. return err
  359. }
  360. del_cache:
  361. // cleanup the cache only if AtomicDelete went through successfully
  362. if ds.cache != nil {
  363. // If persistent store is skipped, sequencing needs to
  364. // happen in cache.
  365. return ds.cache.del(kvObject, kvObject.Skip())
  366. }
  367. return nil
  368. }
  369. // DeleteTree unconditionally deletes a record from the store
  370. func (ds *datastore) DeleteTree(kvObject KVObject) error {
  371. ds.Lock()
  372. defer ds.Unlock()
  373. // cleanup the cache first
  374. if ds.cache != nil {
  375. // If persistent store is skipped, sequencing needs to
  376. // happen in cache.
  377. ds.cache.del(kvObject, kvObject.Skip())
  378. }
  379. if kvObject.Skip() {
  380. return nil
  381. }
  382. return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
  383. }