datastore.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package datastore
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/docker/docker/libnetwork/discoverapi"
  8. store "github.com/docker/docker/libnetwork/internal/kvstore"
  9. "github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
  10. "github.com/docker/docker/libnetwork/types"
  11. )
  12. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  13. var (
  14. ErrKeyModified = store.ErrKeyModified
  15. ErrKeyNotFound = store.ErrKeyNotFound
  16. )
  17. type Store struct {
  18. mu sync.Mutex
  19. store store.Store
  20. cache *cache
  21. }
  22. // KVObject is Key/Value interface used by objects to be part of the Store.
  23. type KVObject interface {
  24. // Key method lets an object provide the Key to be used in KV Store
  25. Key() []string
  26. // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
  27. KeyPrefix() []string
  28. // Value method lets an object marshal its content to be stored in the KV store
  29. Value() []byte
  30. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  31. SetValue([]byte) error
  32. // Index method returns the latest DB Index as seen by the object
  33. Index() uint64
  34. // SetIndex method allows the datastore to store the latest DB Index into the object
  35. SetIndex(uint64)
  36. // Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
  37. // When SetIndex() is called, the object has been stored.
  38. Exists() bool
  39. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  40. Skip() bool
  41. // New returns a new object which is created based on the
  42. // source object
  43. New() KVObject
  44. // CopyTo deep copies the contents of the implementing object
  45. // to the passed destination object
  46. CopyTo(KVObject) error
  47. }
  48. // ScopeCfg represents Datastore configuration.
  49. type ScopeCfg struct {
  50. Client ScopeClientCfg
  51. }
  52. // ScopeClientCfg represents Datastore Client-only mode configuration
  53. type ScopeClientCfg struct {
  54. Provider string
  55. Address string
  56. Config *store.Config
  57. }
  58. const (
  59. // NetworkKeyPrefix is the prefix for network key in the kv store
  60. NetworkKeyPrefix = "network"
  61. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  62. EndpointKeyPrefix = "endpoint"
  63. )
  64. var (
  65. defaultRootChain = []string{"docker", "network", "v1.0"}
  66. rootChain = defaultRootChain
  67. )
  68. const defaultPrefix = "/var/lib/docker/network/files"
  69. // DefaultScope returns a default scope config for clients to use.
  70. func DefaultScope(dataDir string) ScopeCfg {
  71. var dbpath string
  72. if dataDir == "" {
  73. dbpath = defaultPrefix + "/local-kv.db"
  74. } else {
  75. dbpath = dataDir + "/network/files/local-kv.db"
  76. }
  77. return ScopeCfg{
  78. Client: ScopeClientCfg{
  79. Provider: string(store.BOLTDB),
  80. Address: dbpath,
  81. Config: &store.Config{
  82. Bucket: "libnetwork",
  83. ConnectionTimeout: time.Minute,
  84. },
  85. },
  86. }
  87. }
  88. // IsValid checks if the scope config has valid configuration.
  89. func (cfg *ScopeCfg) IsValid() bool {
  90. if cfg == nil || strings.TrimSpace(cfg.Client.Provider) == "" || strings.TrimSpace(cfg.Client.Address) == "" {
  91. return false
  92. }
  93. return true
  94. }
  95. // Key provides convenient method to create a Key
  96. func Key(key ...string) string {
  97. var b strings.Builder
  98. for _, parts := range [][]string{rootChain, key} {
  99. for _, part := range parts {
  100. b.WriteString(part)
  101. b.WriteString("/")
  102. }
  103. }
  104. return b.String()
  105. }
  106. // newClient used to connect to KV Store
  107. func newClient(kv string, addr string, config *store.Config) (*Store, error) {
  108. if kv != string(store.BOLTDB) {
  109. return nil, fmt.Errorf("unsupported KV store")
  110. }
  111. if config == nil {
  112. config = &store.Config{}
  113. }
  114. // Parse file path
  115. s, err := boltdb.New(strings.Split(addr, ","), config)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return &Store{store: s, cache: newCache(s)}, nil
  120. }
  121. // New creates a new Store instance.
  122. func New(cfg ScopeCfg) (*Store, error) {
  123. if cfg.Client.Provider == "" || cfg.Client.Address == "" {
  124. cfg = DefaultScope("")
  125. }
  126. return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
  127. }
  128. // FromConfig creates a new instance of LibKV data store starting from the datastore config data.
  129. func FromConfig(dsc discoverapi.DatastoreConfigData) (*Store, error) {
  130. var (
  131. ok bool
  132. sCfgP *store.Config
  133. )
  134. sCfgP, ok = dsc.Config.(*store.Config)
  135. if !ok && dsc.Config != nil {
  136. return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
  137. }
  138. ds, err := New(ScopeCfg{
  139. Client: ScopeClientCfg{
  140. Address: dsc.Address,
  141. Provider: dsc.Provider,
  142. Config: sCfgP,
  143. },
  144. })
  145. if err != nil {
  146. return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
  147. }
  148. return ds, err
  149. }
  150. // Close closes the data store.
  151. func (ds *Store) Close() {
  152. ds.store.Close()
  153. }
  154. // PutObjectAtomic provides an atomic add and update operation for a Record.
  155. func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
  156. ds.mu.Lock()
  157. defer ds.mu.Unlock()
  158. if kvObject == nil {
  159. return types.InvalidParameterErrorf("invalid KV Object : nil")
  160. }
  161. kvObjValue := kvObject.Value()
  162. if kvObjValue == nil {
  163. return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  164. }
  165. if !kvObject.Skip() {
  166. var previous *store.KVPair
  167. if kvObject.Exists() {
  168. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  169. }
  170. pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
  171. if err != nil {
  172. if err == store.ErrKeyExists {
  173. return ErrKeyModified
  174. }
  175. return err
  176. }
  177. kvObject.SetIndex(pair.LastIndex)
  178. }
  179. // If persistent store is skipped, sequencing needs to
  180. // happen in cache.
  181. return ds.cache.add(kvObject, kvObject.Skip())
  182. }
  183. // GetObject gets data from the store and unmarshals to the specified object.
  184. func (ds *Store) GetObject(key string, o KVObject) error {
  185. ds.mu.Lock()
  186. defer ds.mu.Unlock()
  187. return ds.cache.get(o)
  188. }
  189. func (ds *Store) ensureParent(parent string) error {
  190. exists, err := ds.store.Exists(parent)
  191. if err != nil {
  192. return err
  193. }
  194. if exists {
  195. return nil
  196. }
  197. return ds.store.Put(parent, []byte{})
  198. }
  199. // List returns of a list of KVObjects belonging to the parent key. The caller
  200. // must pass a KVObject of the same type as the objects that need to be listed.
  201. func (ds *Store) List(key string, kvObject KVObject) ([]KVObject, error) {
  202. ds.mu.Lock()
  203. defer ds.mu.Unlock()
  204. return ds.cache.list(kvObject)
  205. }
  206. func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
  207. // Make sure the parent key exists
  208. if err := ds.ensureParent(key); err != nil {
  209. return err
  210. }
  211. kvList, err := ds.store.List(key)
  212. if err != nil {
  213. return err
  214. }
  215. for _, kvPair := range kvList {
  216. if len(kvPair.Value) == 0 {
  217. continue
  218. }
  219. dstO := ctor.New()
  220. if err := dstO.SetValue(kvPair.Value); err != nil {
  221. return err
  222. }
  223. // Make sure the object has a correct view of the DB index in
  224. // case we need to modify it and update the DB.
  225. dstO.SetIndex(kvPair.LastIndex)
  226. callback(kvPair.Key, dstO)
  227. }
  228. return nil
  229. }
  230. // Map returns a Map of KVObjects.
  231. func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
  232. ds.mu.Lock()
  233. defer ds.mu.Unlock()
  234. results := map[string]KVObject{}
  235. err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
  236. // Trim the leading & trailing "/" to make it consistent across all stores
  237. results[strings.Trim(key, "/")] = val
  238. })
  239. if err != nil {
  240. return nil, err
  241. }
  242. return results, nil
  243. }
  244. // DeleteObjectAtomic performs atomic delete on a record.
  245. func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
  246. ds.mu.Lock()
  247. defer ds.mu.Unlock()
  248. if kvObject == nil {
  249. return types.InvalidParameterErrorf("invalid KV Object : nil")
  250. }
  251. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  252. if !kvObject.Skip() {
  253. if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  254. if err == store.ErrKeyExists {
  255. return ErrKeyModified
  256. }
  257. return err
  258. }
  259. }
  260. // cleanup the cache only if AtomicDelete went through successfully
  261. // If persistent store is skipped, sequencing needs to
  262. // happen in cache.
  263. return ds.cache.del(kvObject, kvObject.Skip())
  264. }