datastore.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package datastore
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/docker/docker/libnetwork/discoverapi"
  8. store "github.com/docker/docker/libnetwork/internal/kvstore"
  9. "github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
  10. "github.com/docker/docker/libnetwork/types"
  11. )
  12. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  13. var (
  14. ErrKeyModified = store.ErrKeyModified
  15. ErrKeyNotFound = store.ErrKeyNotFound
  16. )
  17. type Store struct {
  18. mu sync.Mutex
  19. scope string
  20. store store.Store
  21. cache *cache
  22. }
  23. // KVObject is Key/Value interface used by objects to be part of the Store.
  24. type KVObject interface {
  25. // Key method lets an object provide the Key to be used in KV Store
  26. Key() []string
  27. // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
  28. KeyPrefix() []string
  29. // Value method lets an object marshal its content to be stored in the KV store
  30. Value() []byte
  31. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  32. SetValue([]byte) error
  33. // Index method returns the latest DB Index as seen by the object
  34. Index() uint64
  35. // SetIndex method allows the datastore to store the latest DB Index into the object
  36. SetIndex(uint64)
  37. // Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
  38. // When SetIndex() is called, the object has been stored.
  39. Exists() bool
  40. // DataScope indicates the storage scope of the KV object
  41. DataScope() string
  42. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  43. Skip() bool
  44. }
  45. // KVConstructor interface defines methods which can construct a KVObject from another.
  46. type KVConstructor interface {
  47. // New returns a new object which is created based on the
  48. // source object
  49. New() KVObject
  50. // CopyTo deep copies the contents of the implementing object
  51. // to the passed destination object
  52. CopyTo(KVObject) error
  53. }
  54. // ScopeCfg represents Datastore configuration.
  55. type ScopeCfg struct {
  56. Client ScopeClientCfg
  57. }
  58. // ScopeClientCfg represents Datastore Client-only mode configuration
  59. type ScopeClientCfg struct {
  60. Provider string
  61. Address string
  62. Config *store.Config
  63. }
  64. const (
  65. // LocalScope indicates to store the KV object in local datastore such as boltdb
  66. LocalScope = "local"
  67. // GlobalScope indicates to store the KV object in global datastore
  68. GlobalScope = "global"
  69. // SwarmScope is not indicating a datastore location. It is defined here
  70. // along with the other two scopes just for consistency.
  71. SwarmScope = "swarm"
  72. defaultPrefix = "/var/lib/docker/network/files"
  73. )
  74. const (
  75. // NetworkKeyPrefix is the prefix for network key in the kv store
  76. NetworkKeyPrefix = "network"
  77. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  78. EndpointKeyPrefix = "endpoint"
  79. )
  80. var (
  81. defaultRootChain = []string{"docker", "network", "v1.0"}
  82. rootChain = defaultRootChain
  83. )
  84. // DefaultScope returns a default scope config for clients to use.
  85. func DefaultScope(dataDir string) ScopeCfg {
  86. var dbpath string
  87. if dataDir == "" {
  88. dbpath = defaultPrefix + "/local-kv.db"
  89. } else {
  90. dbpath = dataDir + "/network/files/local-kv.db"
  91. }
  92. return ScopeCfg{
  93. Client: ScopeClientCfg{
  94. Provider: string(store.BOLTDB),
  95. Address: dbpath,
  96. Config: &store.Config{
  97. Bucket: "libnetwork",
  98. ConnectionTimeout: time.Minute,
  99. },
  100. },
  101. }
  102. }
  103. // IsValid checks if the scope config has valid configuration.
  104. func (cfg *ScopeCfg) IsValid() bool {
  105. if cfg == nil || strings.TrimSpace(cfg.Client.Provider) == "" || strings.TrimSpace(cfg.Client.Address) == "" {
  106. return false
  107. }
  108. return true
  109. }
  110. // Key provides convenient method to create a Key
  111. func Key(key ...string) string {
  112. var b strings.Builder
  113. for _, parts := range [][]string{rootChain, key} {
  114. for _, part := range parts {
  115. b.WriteString(part)
  116. b.WriteString("/")
  117. }
  118. }
  119. return b.String()
  120. }
  121. // newClient used to connect to KV Store
  122. func newClient(kv string, addr string, config *store.Config) (*Store, error) {
  123. if kv != string(store.BOLTDB) {
  124. return nil, fmt.Errorf("unsupported KV store")
  125. }
  126. if config == nil {
  127. config = &store.Config{}
  128. }
  129. // Parse file path
  130. s, err := boltdb.New(strings.Split(addr, ","), config)
  131. if err != nil {
  132. return nil, err
  133. }
  134. ds := &Store{scope: LocalScope, store: s}
  135. ds.cache = newCache(ds)
  136. return ds, nil
  137. }
  138. // New creates a new Store instance.
  139. func New(cfg ScopeCfg) (*Store, error) {
  140. if cfg.Client.Provider == "" || cfg.Client.Address == "" {
  141. cfg = DefaultScope("")
  142. }
  143. return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
  144. }
  145. // FromConfig creates a new instance of LibKV data store starting from the datastore config data.
  146. func FromConfig(dsc discoverapi.DatastoreConfigData) (*Store, error) {
  147. var (
  148. ok bool
  149. sCfgP *store.Config
  150. )
  151. sCfgP, ok = dsc.Config.(*store.Config)
  152. if !ok && dsc.Config != nil {
  153. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  154. }
  155. ds, err := New(ScopeCfg{
  156. Client: ScopeClientCfg{
  157. Address: dsc.Address,
  158. Provider: dsc.Provider,
  159. Config: sCfgP,
  160. },
  161. })
  162. if err != nil {
  163. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  164. }
  165. return ds, err
  166. }
  167. // Close closes the data store.
  168. func (ds *Store) Close() {
  169. ds.store.Close()
  170. }
  171. // Scope returns the scope of the store.
  172. func (ds *Store) Scope() string {
  173. return ds.scope
  174. }
  175. // PutObjectAtomic provides an atomic add and update operation for a Record.
  176. func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
  177. var (
  178. previous *store.KVPair
  179. pair *store.KVPair
  180. err error
  181. )
  182. ds.mu.Lock()
  183. defer ds.mu.Unlock()
  184. if kvObject == nil {
  185. return types.BadRequestErrorf("invalid KV Object : nil")
  186. }
  187. kvObjValue := kvObject.Value()
  188. if kvObjValue == nil {
  189. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  190. }
  191. if kvObject.Skip() {
  192. goto add_cache
  193. }
  194. if kvObject.Exists() {
  195. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  196. } else {
  197. previous = nil
  198. }
  199. pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
  200. if err != nil {
  201. if err == store.ErrKeyExists {
  202. return ErrKeyModified
  203. }
  204. return err
  205. }
  206. kvObject.SetIndex(pair.LastIndex)
  207. add_cache:
  208. if ds.cache != nil {
  209. // If persistent store is skipped, sequencing needs to
  210. // happen in cache.
  211. return ds.cache.add(kvObject, kvObject.Skip())
  212. }
  213. return nil
  214. }
  215. // GetObject gets data from the store and unmarshals to the specified object.
  216. func (ds *Store) GetObject(key string, o KVObject) error {
  217. ds.mu.Lock()
  218. defer ds.mu.Unlock()
  219. if ds.cache != nil {
  220. return ds.cache.get(o)
  221. }
  222. kvPair, err := ds.store.Get(key)
  223. if err != nil {
  224. return err
  225. }
  226. if err := o.SetValue(kvPair.Value); err != nil {
  227. return err
  228. }
  229. // Make sure the object has a correct view of the DB index in
  230. // case we need to modify it and update the DB.
  231. o.SetIndex(kvPair.LastIndex)
  232. return nil
  233. }
  234. func (ds *Store) ensureParent(parent string) error {
  235. exists, err := ds.store.Exists(parent)
  236. if err != nil {
  237. return err
  238. }
  239. if exists {
  240. return nil
  241. }
  242. return ds.store.Put(parent, []byte{})
  243. }
  244. // List returns of a list of KVObjects belonging to the parent key. The caller
  245. // must pass a KVObject of the same type as the objects that need to be listed.
  246. func (ds *Store) List(key string, kvObject KVObject) ([]KVObject, error) {
  247. ds.mu.Lock()
  248. defer ds.mu.Unlock()
  249. if ds.cache != nil {
  250. return ds.cache.list(kvObject)
  251. }
  252. var kvol []KVObject
  253. cb := func(key string, val KVObject) {
  254. kvol = append(kvol, val)
  255. }
  256. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return kvol, nil
  261. }
  262. func (ds *Store) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
  263. // Bail out right away if the kvObject does not implement KVConstructor
  264. ctor, ok := kvObject.(KVConstructor)
  265. if !ok {
  266. return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
  267. }
  268. // Make sure the parent key exists
  269. if err := ds.ensureParent(key); err != nil {
  270. return err
  271. }
  272. kvList, err := ds.store.List(key)
  273. if err != nil {
  274. return err
  275. }
  276. for _, kvPair := range kvList {
  277. if len(kvPair.Value) == 0 {
  278. continue
  279. }
  280. dstO := ctor.New()
  281. if err := dstO.SetValue(kvPair.Value); err != nil {
  282. return err
  283. }
  284. // Make sure the object has a correct view of the DB index in
  285. // case we need to modify it and update the DB.
  286. dstO.SetIndex(kvPair.LastIndex)
  287. callback(kvPair.Key, dstO)
  288. }
  289. return nil
  290. }
  291. // Map returns a Map of KVObjects.
  292. func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
  293. ds.mu.Lock()
  294. defer ds.mu.Unlock()
  295. kvol := make(map[string]KVObject)
  296. cb := func(key string, val KVObject) {
  297. // Trim the leading & trailing "/" to make it consistent across all stores
  298. kvol[strings.Trim(key, "/")] = val
  299. }
  300. err := ds.iterateKVPairsFromStore(key, kvObject, cb)
  301. if err != nil {
  302. return nil, err
  303. }
  304. return kvol, nil
  305. }
  306. // DeleteObjectAtomic performs atomic delete on a record.
  307. func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
  308. ds.mu.Lock()
  309. defer ds.mu.Unlock()
  310. if kvObject == nil {
  311. return types.BadRequestErrorf("invalid KV Object : nil")
  312. }
  313. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  314. if kvObject.Skip() {
  315. goto deleteCache
  316. }
  317. if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  318. if err == store.ErrKeyExists {
  319. return ErrKeyModified
  320. }
  321. return err
  322. }
  323. deleteCache:
  324. // cleanup the cache only if AtomicDelete went through successfully
  325. if ds.cache != nil {
  326. // If persistent store is skipped, sequencing needs to
  327. // happen in cache.
  328. return ds.cache.del(kvObject, kvObject.Skip())
  329. }
  330. return nil
  331. }