datastore.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package datastore
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. store "github.com/docker/docker/libnetwork/internal/kvstore"
  8. "github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
  9. "github.com/docker/docker/libnetwork/types"
  10. )
  11. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  12. var (
  13. ErrKeyModified = store.ErrKeyModified
  14. ErrKeyNotFound = store.ErrKeyNotFound
  15. )
  16. type Store struct {
  17. mu sync.Mutex
  18. store store.Store
  19. cache *cache
  20. }
  21. // KVObject is Key/Value interface used by objects to be part of the Store.
  22. type KVObject interface {
  23. // Key method lets an object provide the Key to be used in KV Store
  24. Key() []string
  25. // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
  26. KeyPrefix() []string
  27. // Value method lets an object marshal its content to be stored in the KV store
  28. Value() []byte
  29. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  30. SetValue([]byte) error
  31. // Index method returns the latest DB Index as seen by the object
  32. Index() uint64
  33. // SetIndex method allows the datastore to store the latest DB Index into the object
  34. SetIndex(uint64)
  35. // Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
  36. // When SetIndex() is called, the object has been stored.
  37. Exists() bool
  38. // Skip provides a way for a KV Object to avoid persisting it in the KV Store
  39. Skip() bool
  40. // New returns a new object which is created based on the
  41. // source object
  42. New() KVObject
  43. // CopyTo deep copies the contents of the implementing object
  44. // to the passed destination object
  45. CopyTo(KVObject) error
  46. }
  47. // ScopeCfg represents Datastore configuration.
  48. type ScopeCfg struct {
  49. Client ScopeClientCfg
  50. }
  51. // ScopeClientCfg represents Datastore Client-only mode configuration
  52. type ScopeClientCfg struct {
  53. Provider string
  54. Address string
  55. Config *store.Config
  56. }
  57. const (
  58. // NetworkKeyPrefix is the prefix for network key in the kv store
  59. NetworkKeyPrefix = "network"
  60. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  61. EndpointKeyPrefix = "endpoint"
  62. )
  63. var (
  64. defaultRootChain = []string{"docker", "network", "v1.0"}
  65. rootChain = defaultRootChain
  66. )
  67. const defaultPrefix = "/var/lib/docker/network/files"
  68. // DefaultScope returns a default scope config for clients to use.
  69. func DefaultScope(dataDir string) ScopeCfg {
  70. var dbpath string
  71. if dataDir == "" {
  72. dbpath = defaultPrefix + "/local-kv.db"
  73. } else {
  74. dbpath = dataDir + "/network/files/local-kv.db"
  75. }
  76. return ScopeCfg{
  77. Client: ScopeClientCfg{
  78. Provider: string(store.BOLTDB),
  79. Address: dbpath,
  80. Config: &store.Config{
  81. Bucket: "libnetwork",
  82. ConnectionTimeout: time.Minute,
  83. },
  84. },
  85. }
  86. }
  87. // IsValid checks if the scope config has valid configuration.
  88. func (cfg *ScopeCfg) IsValid() bool {
  89. if cfg == nil || strings.TrimSpace(cfg.Client.Provider) == "" || strings.TrimSpace(cfg.Client.Address) == "" {
  90. return false
  91. }
  92. return true
  93. }
  94. // Key provides convenient method to create a Key
  95. func Key(key ...string) string {
  96. var b strings.Builder
  97. for _, parts := range [][]string{rootChain, key} {
  98. for _, part := range parts {
  99. b.WriteString(part)
  100. b.WriteString("/")
  101. }
  102. }
  103. return b.String()
  104. }
  105. // newClient used to connect to KV Store
  106. func newClient(kv string, addr string, config *store.Config) (*Store, error) {
  107. if kv != string(store.BOLTDB) {
  108. return nil, fmt.Errorf("unsupported KV store")
  109. }
  110. if config == nil {
  111. config = &store.Config{}
  112. }
  113. s, err := boltdb.New(addr, config)
  114. if err != nil {
  115. return nil, err
  116. }
  117. return &Store{store: s, cache: newCache(s)}, nil
  118. }
  119. // New creates a new Store instance.
  120. func New(cfg ScopeCfg) (*Store, error) {
  121. if cfg.Client.Provider == "" || cfg.Client.Address == "" {
  122. cfg = DefaultScope("")
  123. }
  124. return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
  125. }
  126. // Close closes the data store.
  127. func (ds *Store) Close() {
  128. ds.store.Close()
  129. }
  130. // PutObjectAtomic provides an atomic add and update operation for a Record.
  131. func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
  132. ds.mu.Lock()
  133. defer ds.mu.Unlock()
  134. if kvObject == nil {
  135. return types.InvalidParameterErrorf("invalid KV Object: nil")
  136. }
  137. kvObjValue := kvObject.Value()
  138. if kvObjValue == nil {
  139. return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  140. }
  141. if !kvObject.Skip() {
  142. var previous *store.KVPair
  143. if kvObject.Exists() {
  144. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  145. }
  146. pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
  147. if err != nil {
  148. if err == store.ErrKeyExists {
  149. return ErrKeyModified
  150. }
  151. return err
  152. }
  153. kvObject.SetIndex(pair.LastIndex)
  154. }
  155. // If persistent store is skipped, sequencing needs to
  156. // happen in cache.
  157. return ds.cache.add(kvObject, kvObject.Skip())
  158. }
  159. // GetObject gets data from the store and unmarshals to the specified object.
  160. func (ds *Store) GetObject(o KVObject) error {
  161. ds.mu.Lock()
  162. defer ds.mu.Unlock()
  163. return ds.cache.get(o)
  164. }
  165. func (ds *Store) ensureParent(parent string) error {
  166. exists, err := ds.store.Exists(parent)
  167. if err != nil {
  168. return err
  169. }
  170. if exists {
  171. return nil
  172. }
  173. return ds.store.Put(parent, []byte{})
  174. }
  175. // List returns of a list of KVObjects belonging to the parent key. The caller
  176. // must pass a KVObject of the same type as the objects that need to be listed.
  177. func (ds *Store) List(kvObject KVObject) ([]KVObject, error) {
  178. ds.mu.Lock()
  179. defer ds.mu.Unlock()
  180. return ds.cache.list(kvObject)
  181. }
  182. func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
  183. // Make sure the parent key exists
  184. if err := ds.ensureParent(key); err != nil {
  185. return err
  186. }
  187. kvList, err := ds.store.List(key)
  188. if err != nil {
  189. return err
  190. }
  191. for _, kvPair := range kvList {
  192. if len(kvPair.Value) == 0 {
  193. continue
  194. }
  195. dstO := ctor.New()
  196. if err := dstO.SetValue(kvPair.Value); err != nil {
  197. return err
  198. }
  199. // Make sure the object has a correct view of the DB index in
  200. // case we need to modify it and update the DB.
  201. dstO.SetIndex(kvPair.LastIndex)
  202. callback(kvPair.Key, dstO)
  203. }
  204. return nil
  205. }
  206. // Map returns a Map of KVObjects.
  207. func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
  208. ds.mu.Lock()
  209. defer ds.mu.Unlock()
  210. results := map[string]KVObject{}
  211. err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
  212. // Trim the leading & trailing "/" to make it consistent across all stores
  213. results[strings.Trim(key, "/")] = val
  214. })
  215. if err != nil {
  216. return nil, err
  217. }
  218. return results, nil
  219. }
  220. // DeleteObject deletes a kvObject from the on-disk DB and the in-memory cache.
  221. // Unlike DeleteObjectAtomic, it doesn't check the optimistic lock of the
  222. // passed kvObject.
  223. func (ds *Store) DeleteObject(kvObject KVObject) error {
  224. ds.mu.Lock()
  225. defer ds.mu.Unlock()
  226. if kvObject == nil {
  227. return types.InvalidParameterErrorf("invalid KV Object: nil")
  228. }
  229. if !kvObject.Skip() {
  230. if err := ds.store.Delete(Key(kvObject.Key()...)); err != nil {
  231. return err
  232. }
  233. }
  234. // cleanup the cache only if AtomicDelete went through successfully
  235. // If persistent store is skipped, sequencing needs to
  236. // happen in cache.
  237. return ds.cache.del(kvObject, false)
  238. }
  239. // DeleteObjectAtomic performs atomic delete on a record.
  240. func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
  241. ds.mu.Lock()
  242. defer ds.mu.Unlock()
  243. if kvObject == nil {
  244. return types.InvalidParameterErrorf("invalid KV Object: nil")
  245. }
  246. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  247. if !kvObject.Skip() {
  248. if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
  249. if err == store.ErrKeyExists {
  250. return ErrKeyModified
  251. }
  252. return err
  253. }
  254. }
  255. // cleanup the cache only if AtomicDelete went through successfully
  256. // If persistent store is skipped, sequencing needs to
  257. // happen in cache.
  258. return ds.cache.del(kvObject, kvObject.Skip())
  259. }