libnetwork/datastore: unconditionally use ds.cache
ds.cache is never nil so the uncached code paths are unreachable in practice. And given how many KVObject deep-copy implementations shallow copy pointers and other reference-typed values, there is the distinct possibility that disabling the datastore cache could break things. Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
parent
5b3086db1f
commit
43dccc6c1a
4 changed files with 36 additions and 71 deletions
|
@ -1,7 +1,6 @@
|
||||||
package datastore
|
package datastore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -34,12 +33,6 @@ func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
|
||||||
|
|
||||||
kmap = kvMap{}
|
kmap = kvMap{}
|
||||||
|
|
||||||
// Bail out right away if the kvObject does not implement KVConstructor
|
|
||||||
ctor, ok := kvObject.(KVConstructor)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("error while populating kmap, object does not implement KVConstructor interface")
|
|
||||||
}
|
|
||||||
|
|
||||||
kvList, err := c.ds.List(keyPrefix)
|
kvList, err := c.ds.List(keyPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == store.ErrKeyNotFound {
|
if err == store.ErrKeyNotFound {
|
||||||
|
@ -57,7 +50,7 @@ func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
dstO := ctor.New()
|
dstO := kvObject.New()
|
||||||
err = dstO.SetValue(kvPair.Value)
|
err = dstO.SetValue(kvPair.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -152,12 +145,7 @@ func (c *cache) get(kvObject KVObject) error {
|
||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
ctor, ok := o.(KVConstructor)
|
return o.CopyTo(kvObject)
|
||||||
if !ok {
|
|
||||||
return errors.New("kvobject does not implement KVConstructor interface. could not get object")
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctor.CopyTo(kvObject)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
|
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
|
||||||
|
|
|
@ -47,10 +47,6 @@ type KVObject interface {
|
||||||
DataScope() string
|
DataScope() string
|
||||||
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
|
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
|
||||||
Skip() bool
|
Skip() bool
|
||||||
}
|
|
||||||
|
|
||||||
// KVConstructor interface defines methods which can construct a KVObject from another.
|
|
||||||
type KVConstructor interface {
|
|
||||||
// New returns a new object which is created based on the
|
// New returns a new object which is created based on the
|
||||||
// source object
|
// source object
|
||||||
New() KVObject
|
New() KVObject
|
||||||
|
@ -232,13 +228,9 @@ func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
|
||||||
kvObject.SetIndex(pair.LastIndex)
|
kvObject.SetIndex(pair.LastIndex)
|
||||||
|
|
||||||
add_cache:
|
add_cache:
|
||||||
if ds.cache != nil {
|
// If persistent store is skipped, sequencing needs to
|
||||||
// If persistent store is skipped, sequencing needs to
|
// happen in cache.
|
||||||
// happen in cache.
|
return ds.cache.add(kvObject, kvObject.Skip())
|
||||||
return ds.cache.add(kvObject, kvObject.Skip())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject gets data from the store and unmarshals to the specified object.
|
// GetObject gets data from the store and unmarshals to the specified object.
|
||||||
|
@ -246,23 +238,7 @@ func (ds *Store) GetObject(key string, o KVObject) error {
|
||||||
ds.mu.Lock()
|
ds.mu.Lock()
|
||||||
defer ds.mu.Unlock()
|
defer ds.mu.Unlock()
|
||||||
|
|
||||||
if ds.cache != nil {
|
return ds.cache.get(o)
|
||||||
return ds.cache.get(o)
|
|
||||||
}
|
|
||||||
|
|
||||||
kvPair, err := ds.store.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := o.SetValue(kvPair.Value); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the object has a correct view of the DB index in
|
|
||||||
// case we need to modify it and update the DB.
|
|
||||||
o.SetIndex(kvPair.LastIndex)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Store) ensureParent(parent string) error {
|
func (ds *Store) ensureParent(parent string) error {
|
||||||
|
@ -282,27 +258,10 @@ func (ds *Store) List(key string, kvObject KVObject) ([]KVObject, error) {
|
||||||
ds.mu.Lock()
|
ds.mu.Lock()
|
||||||
defer ds.mu.Unlock()
|
defer ds.mu.Unlock()
|
||||||
|
|
||||||
if ds.cache != nil {
|
return ds.cache.list(kvObject)
|
||||||
return ds.cache.list(kvObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
var kvol []KVObject
|
|
||||||
err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
|
|
||||||
kvol = append(kvol, val)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return kvol, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Store) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
|
func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
|
||||||
// Bail out right away if the kvObject does not implement KVConstructor
|
|
||||||
ctor, ok := kvObject.(KVConstructor)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the parent key exists
|
// Make sure the parent key exists
|
||||||
if err := ds.ensureParent(key); err != nil {
|
if err := ds.ensureParent(key); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -372,11 +331,7 @@ func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
|
||||||
|
|
||||||
deleteCache:
|
deleteCache:
|
||||||
// cleanup the cache only if AtomicDelete went through successfully
|
// cleanup the cache only if AtomicDelete went through successfully
|
||||||
if ds.cache != nil {
|
// If persistent store is skipped, sequencing needs to
|
||||||
// If persistent store is skipped, sequencing needs to
|
// happen in cache.
|
||||||
// happen in cache.
|
return ds.cache.del(kvObject, kvObject.Skip())
|
||||||
return ds.cache.del(kvObject, kvObject.Skip())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,8 @@ const dummyKey = "dummy"
|
||||||
|
|
||||||
// NewTestDataStore can be used by other Tests in order to use custom datastore
|
// NewTestDataStore can be used by other Tests in order to use custom datastore
|
||||||
func NewTestDataStore() *Store {
|
func NewTestDataStore() *Store {
|
||||||
return &Store{scope: scope.Local, store: NewMockStore()}
|
s := NewMockStore()
|
||||||
|
return &Store{scope: scope.Local, store: s, cache: newCache(s)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestKey(t *testing.T) {
|
func TestKey(t *testing.T) {
|
||||||
|
@ -157,6 +158,18 @@ func (n *dummyObject) UnmarshalJSON(b []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *dummyObject) New() KVObject {
|
||||||
|
return &dummyObject{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *dummyObject) CopyTo(o KVObject) error {
|
||||||
|
if err := o.SetValue(n.Value()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
o.SetIndex(n.Index())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// dummy structure to test "recursive" cases
|
// dummy structure to test "recursive" cases
|
||||||
type recStruct struct {
|
type recStruct struct {
|
||||||
Name string `kv:"leaf"`
|
Name string `kv:"leaf"`
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package datastore
|
package datastore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"strings"
|
||||||
|
|
||||||
store "github.com/docker/docker/libnetwork/internal/kvstore"
|
store "github.com/docker/docker/libnetwork/internal/kvstore"
|
||||||
"github.com/docker/docker/libnetwork/types"
|
"github.com/docker/docker/libnetwork/types"
|
||||||
|
@ -52,7 +52,16 @@ func (s *MockStore) Exists(key string) (bool, error) {
|
||||||
|
|
||||||
// List gets a range of values at "directory"
|
// List gets a range of values at "directory"
|
||||||
func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
|
func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
|
||||||
return nil, errors.New("not implemented")
|
var res []*store.KVPair
|
||||||
|
for k, v := range s.db {
|
||||||
|
if strings.HasPrefix(k, prefix) {
|
||||||
|
res = append(res, &store.KVPair{Key: k, Value: v.Data, LastIndex: v.Index})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(res) == 0 {
|
||||||
|
return nil, store.ErrKeyNotFound
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AtomicPut put a value at "key" if the key has not been
|
// AtomicPut put a value at "key" if the key has not been
|
||||||
|
|
Loading…
Add table
Reference in a new issue