123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- package datastore
- import (
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/docker/docker/libnetwork/discoverapi"
- store "github.com/docker/docker/libnetwork/internal/kvstore"
- "github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
- "github.com/docker/docker/libnetwork/types"
- )
- // ErrKeyModified is raised for an atomic update when the update is working on a stale state
- var (
- ErrKeyModified = store.ErrKeyModified
- ErrKeyNotFound = store.ErrKeyNotFound
- )
- type Store struct {
- mu sync.Mutex
- store store.Store
- cache *cache
- }
- // KVObject is Key/Value interface used by objects to be part of the Store.
- type KVObject interface {
- // Key method lets an object provide the Key to be used in KV Store
- Key() []string
- // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
- KeyPrefix() []string
- // Value method lets an object marshal its content to be stored in the KV store
- Value() []byte
- // SetValue is used by the datastore to set the object's value when loaded from the data store.
- SetValue([]byte) error
- // Index method returns the latest DB Index as seen by the object
- Index() uint64
- // SetIndex method allows the datastore to store the latest DB Index into the object
- SetIndex(uint64)
- // Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
- // When SetIndex() is called, the object has been stored.
- Exists() bool
- // Skip provides a way for a KV Object to avoid persisting it in the KV Store
- Skip() bool
- // New returns a new object which is created based on the
- // source object
- New() KVObject
- // CopyTo deep copies the contents of the implementing object
- // to the passed destination object
- CopyTo(KVObject) error
- }
- // ScopeCfg represents Datastore configuration.
- type ScopeCfg struct {
- Client ScopeClientCfg
- }
- // ScopeClientCfg represents Datastore Client-only mode configuration
- type ScopeClientCfg struct {
- Provider string
- Address string
- Config *store.Config
- }
- const (
- // NetworkKeyPrefix is the prefix for network key in the kv store
- NetworkKeyPrefix = "network"
- // EndpointKeyPrefix is the prefix for endpoint key in the kv store
- EndpointKeyPrefix = "endpoint"
- )
- var (
- defaultRootChain = []string{"docker", "network", "v1.0"}
- rootChain = defaultRootChain
- )
- const defaultPrefix = "/var/lib/docker/network/files"
- // DefaultScope returns a default scope config for clients to use.
- func DefaultScope(dataDir string) ScopeCfg {
- var dbpath string
- if dataDir == "" {
- dbpath = defaultPrefix + "/local-kv.db"
- } else {
- dbpath = dataDir + "/network/files/local-kv.db"
- }
- return ScopeCfg{
- Client: ScopeClientCfg{
- Provider: string(store.BOLTDB),
- Address: dbpath,
- Config: &store.Config{
- Bucket: "libnetwork",
- ConnectionTimeout: time.Minute,
- },
- },
- }
- }
- // IsValid checks if the scope config has valid configuration.
- func (cfg *ScopeCfg) IsValid() bool {
- if cfg == nil || strings.TrimSpace(cfg.Client.Provider) == "" || strings.TrimSpace(cfg.Client.Address) == "" {
- return false
- }
- return true
- }
- // Key provides convenient method to create a Key
- func Key(key ...string) string {
- var b strings.Builder
- for _, parts := range [][]string{rootChain, key} {
- for _, part := range parts {
- b.WriteString(part)
- b.WriteString("/")
- }
- }
- return b.String()
- }
- // newClient used to connect to KV Store
- func newClient(kv string, addr string, config *store.Config) (*Store, error) {
- if kv != string(store.BOLTDB) {
- return nil, fmt.Errorf("unsupported KV store")
- }
- if config == nil {
- config = &store.Config{}
- }
- // Parse file path
- s, err := boltdb.New(strings.Split(addr, ","), config)
- if err != nil {
- return nil, err
- }
- return &Store{store: s, cache: newCache(s)}, nil
- }
- // New creates a new Store instance.
- func New(cfg ScopeCfg) (*Store, error) {
- if cfg.Client.Provider == "" || cfg.Client.Address == "" {
- cfg = DefaultScope("")
- }
- return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
- }
- // FromConfig creates a new instance of LibKV data store starting from the datastore config data.
- func FromConfig(dsc discoverapi.DatastoreConfigData) (*Store, error) {
- var (
- ok bool
- sCfgP *store.Config
- )
- sCfgP, ok = dsc.Config.(*store.Config)
- if !ok && dsc.Config != nil {
- return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
- }
- ds, err := New(ScopeCfg{
- Client: ScopeClientCfg{
- Address: dsc.Address,
- Provider: dsc.Provider,
- Config: sCfgP,
- },
- })
- if err != nil {
- return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
- }
- return ds, err
- }
- // Close closes the data store.
- func (ds *Store) Close() {
- ds.store.Close()
- }
- // PutObjectAtomic provides an atomic add and update operation for a Record.
- func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
- ds.mu.Lock()
- defer ds.mu.Unlock()
- if kvObject == nil {
- return types.InvalidParameterErrorf("invalid KV Object : nil")
- }
- kvObjValue := kvObject.Value()
- if kvObjValue == nil {
- return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
- }
- if !kvObject.Skip() {
- var previous *store.KVPair
- if kvObject.Exists() {
- previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
- }
- pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
- if err != nil {
- if err == store.ErrKeyExists {
- return ErrKeyModified
- }
- return err
- }
- kvObject.SetIndex(pair.LastIndex)
- }
- // If persistent store is skipped, sequencing needs to
- // happen in cache.
- return ds.cache.add(kvObject, kvObject.Skip())
- }
- // GetObject gets data from the store and unmarshals to the specified object.
- func (ds *Store) GetObject(key string, o KVObject) error {
- ds.mu.Lock()
- defer ds.mu.Unlock()
- return ds.cache.get(o)
- }
- func (ds *Store) ensureParent(parent string) error {
- exists, err := ds.store.Exists(parent)
- if err != nil {
- return err
- }
- if exists {
- return nil
- }
- return ds.store.Put(parent, []byte{})
- }
- // List returns of a list of KVObjects belonging to the parent key. The caller
- // must pass a KVObject of the same type as the objects that need to be listed.
- func (ds *Store) List(key string, kvObject KVObject) ([]KVObject, error) {
- ds.mu.Lock()
- defer ds.mu.Unlock()
- return ds.cache.list(kvObject)
- }
- func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
- // Make sure the parent key exists
- if err := ds.ensureParent(key); err != nil {
- return err
- }
- kvList, err := ds.store.List(key)
- if err != nil {
- return err
- }
- for _, kvPair := range kvList {
- if len(kvPair.Value) == 0 {
- continue
- }
- dstO := ctor.New()
- if err := dstO.SetValue(kvPair.Value); err != nil {
- return err
- }
- // Make sure the object has a correct view of the DB index in
- // case we need to modify it and update the DB.
- dstO.SetIndex(kvPair.LastIndex)
- callback(kvPair.Key, dstO)
- }
- return nil
- }
- // Map returns a Map of KVObjects.
- func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
- ds.mu.Lock()
- defer ds.mu.Unlock()
- results := map[string]KVObject{}
- err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
- // Trim the leading & trailing "/" to make it consistent across all stores
- results[strings.Trim(key, "/")] = val
- })
- if err != nil {
- return nil, err
- }
- return results, nil
- }
- // DeleteObjectAtomic performs atomic delete on a record.
- func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
- ds.mu.Lock()
- defer ds.mu.Unlock()
- if kvObject == nil {
- return types.InvalidParameterErrorf("invalid KV Object : nil")
- }
- previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
- if !kvObject.Skip() {
- if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
- if err == store.ErrKeyExists {
- return ErrKeyModified
- }
- return err
- }
- }
- // cleanup the cache only if AtomicDelete went through successfully
- // If persistent store is skipped, sequencing needs to
- // happen in cache.
- return ds.cache.del(kvObject, kvObject.Skip())
- }
|