Merge pull request #46681 from corhere/libn/datastore-misc-cleanups

This commit is contained in:
Brian Goff 2023-11-09 11:31:30 -08:00 committed by GitHub
commit 524eef5d75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 84 additions and 257 deletions

View file

@ -1,7 +1,6 @@
package datastore
import (
"errors"
"fmt"
"sync"
@ -11,22 +10,22 @@ import (
type kvMap map[string]KVObject
type cache struct {
sync.Mutex
mu sync.Mutex
kmm map[string]kvMap
ds *Store
ds store.Store
}
func newCache(ds *Store) *cache {
func newCache(ds store.Store) *cache {
return &cache{kmm: make(map[string]kvMap), ds: ds}
}
func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
var err error
c.Lock()
c.mu.Lock()
keyPrefix := Key(kvObject.KeyPrefix()...)
kmap, ok := c.kmm[keyPrefix]
c.Unlock()
c.mu.Unlock()
if ok {
return kmap, nil
@ -34,13 +33,7 @@ func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
kmap = kvMap{}
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, errors.New("error while populating kmap, object does not implement KVConstructor interface")
}
kvList, err := c.ds.store.List(keyPrefix)
kvList, err := c.ds.List(keyPrefix)
if err != nil {
if err == store.ErrKeyNotFound {
// If the store doesn't have anything then there is nothing to
@ -57,7 +50,7 @@ func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
continue
}
dstO := ctor.New()
dstO := kvObject.New()
err = dstO.SetValue(kvPair.Value)
if err != nil {
return nil, err
@ -74,15 +67,15 @@ out:
// There may multiple go routines racing to fill the
// cache. The one which places the kmap in c.kmm first
// wins. The others should just use what the first populated.
c.Lock()
c.mu.Lock()
kmapNew, ok := c.kmm[keyPrefix]
if ok {
c.Unlock()
c.mu.Unlock()
return kmapNew, nil
}
c.kmm[keyPrefix] = kmap
c.Unlock()
c.mu.Unlock()
return kmap, nil
}
@ -93,13 +86,13 @@ func (c *cache) add(kvObject KVObject, atomic bool) error {
return err
}
c.Lock()
c.mu.Lock()
// If atomic is true, cache needs to maintain its own index
// for atomicity and the add needs to be atomic.
if atomic {
if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
if prev.Index() != kvObject.Index() {
c.Unlock()
c.mu.Unlock()
return ErrKeyModified
}
}
@ -111,7 +104,7 @@ func (c *cache) add(kvObject KVObject, atomic bool) error {
}
kmap[Key(kvObject.Key()...)] = kvObject
c.Unlock()
c.mu.Unlock()
return nil
}
@ -121,20 +114,20 @@ func (c *cache) del(kvObject KVObject, atomic bool) error {
return err
}
c.Lock()
c.mu.Lock()
// If atomic is true, cache needs to maintain its own index
// for atomicity and del needs to be atomic.
if atomic {
if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
if prev.Index() != kvObject.Index() {
c.Unlock()
c.mu.Unlock()
return ErrKeyModified
}
}
}
delete(kmap, Key(kvObject.Key()...))
c.Unlock()
c.mu.Unlock()
return nil
}
@ -144,20 +137,15 @@ func (c *cache) get(kvObject KVObject) error {
return err
}
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
o, ok := kmap[Key(kvObject.Key()...)]
if !ok {
return ErrKeyNotFound
}
ctor, ok := o.(KVConstructor)
if !ok {
return errors.New("kvobject does not implement KVConstructor interface. could not get object")
}
return ctor.CopyTo(kvObject)
return o.CopyTo(kvObject)
}
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
@ -166,8 +154,8 @@ func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
return nil, err
}
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
var kvol []KVObject
for _, v := range kmap {

View file

@ -9,7 +9,6 @@ import (
"github.com/docker/docker/libnetwork/discoverapi"
store "github.com/docker/docker/libnetwork/internal/kvstore"
"github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
)
@ -21,7 +20,6 @@ var (
type Store struct {
mu sync.Mutex
scope string
store store.Store
cache *cache
}
@ -43,14 +41,8 @@ type KVObject interface {
// Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
// When SetIndex() is called, the object has been stored.
Exists() bool
// DataScope indicates the storage scope of the KV object
DataScope() string
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
Skip() bool
}
// KVConstructor interface defines methods which can construct a KVObject from another.
type KVConstructor interface {
// New returns a new object which is created based on the
// source object
New() KVObject
@ -143,10 +135,7 @@ func newClient(kv string, addr string, config *store.Config) (*Store, error) {
return nil, err
}
ds := &Store{scope: scope.Local, store: s}
ds.cache = newCache(ds)
return ds, nil
return &Store{store: s, cache: newCache(s)}, nil
}
// New creates a new Store instance.
@ -189,18 +178,8 @@ func (ds *Store) Close() {
ds.store.Close()
}
// Scope returns the scope of the store.
func (ds *Store) Scope() string {
return ds.scope
}
// PutObjectAtomic provides an atomic add and update operation for a Record.
func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
var (
previous *store.KVPair
pair *store.KVPair
err error
)
ds.mu.Lock()
defer ds.mu.Unlock()
@ -214,34 +193,26 @@ func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
if kvObject.Skip() {
goto add_cache
}
if kvObject.Exists() {
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
} else {
previous = nil
}
pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
if err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
if !kvObject.Skip() {
var previous *store.KVPair
if kvObject.Exists() {
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
}
return err
pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
if err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
}
return err
}
kvObject.SetIndex(pair.LastIndex)
}
kvObject.SetIndex(pair.LastIndex)
add_cache:
if ds.cache != nil {
// If persistent store is skipped, sequencing needs to
// happen in cache.
return ds.cache.add(kvObject, kvObject.Skip())
}
return nil
// If persistent store is skipped, sequencing needs to
// happen in cache.
return ds.cache.add(kvObject, kvObject.Skip())
}
// GetObject gets data from the store and unmarshals to the specified object.
@ -249,23 +220,7 @@ func (ds *Store) GetObject(key string, o KVObject) error {
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.cache != nil {
return ds.cache.get(o)
}
kvPair, err := ds.store.Get(key)
if err != nil {
return err
}
if err := o.SetValue(kvPair.Value); err != nil {
return err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
o.SetIndex(kvPair.LastIndex)
return nil
return ds.cache.get(o)
}
func (ds *Store) ensureParent(parent string) error {
@ -285,27 +240,10 @@ func (ds *Store) List(key string, kvObject KVObject) ([]KVObject, error) {
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.cache != nil {
return ds.cache.list(kvObject)
}
var kvol []KVObject
err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
kvol = append(kvol, val)
})
if err != nil {
return nil, err
}
return kvol, nil
return ds.cache.list(kvObject)
}
func (ds *Store) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
}
func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
// Make sure the parent key exists
if err := ds.ensureParent(key); err != nil {
return err
@ -362,24 +300,17 @@ func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
if kvObject.Skip() {
goto deleteCache
}
if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
if !kvObject.Skip() {
if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
}
return err
}
return err
}
deleteCache:
// cleanup the cache only if AtomicDelete went through successfully
if ds.cache != nil {
// If persistent store is skipped, sequencing needs to
// happen in cache.
return ds.cache.del(kvObject, kvObject.Skip())
}
return nil
// If persistent store is skipped, sequencing needs to
// happen in cache.
return ds.cache.del(kvObject, kvObject.Skip())
}

View file

@ -5,7 +5,6 @@ import (
"testing"
"github.com/docker/docker/libnetwork/options"
"github.com/docker/docker/libnetwork/scope"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
@ -14,7 +13,8 @@ const dummyKey = "dummy"
// NewTestDataStore can be used by other Tests in order to use custom datastore
func NewTestDataStore() *Store {
return &Store{scope: scope.Local, store: NewMockStore()}
s := NewMockStore()
return &Store{store: s, cache: newCache(s)}
}
func TestKey(t *testing.T) {
@ -132,10 +132,6 @@ func (n *dummyObject) Skip() bool {
return n.SkipSave
}
func (n *dummyObject) DataScope() string {
return scope.Local
}
func (n *dummyObject) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"name": n.Name,
@ -157,6 +153,18 @@ func (n *dummyObject) UnmarshalJSON(b []byte) error {
return nil
}
func (n *dummyObject) New() KVObject {
return &dummyObject{}
}
func (n *dummyObject) CopyTo(o KVObject) error {
if err := o.SetValue(n.Value()); err != nil {
return err
}
o.SetIndex(n.Index())
return nil
}
// dummy structure to test "recursive" cases
type recStruct struct {
Name string `kv:"leaf"`

View file

@ -1,7 +1,7 @@
package datastore
import (
"errors"
"strings"
store "github.com/docker/docker/libnetwork/internal/kvstore"
"github.com/docker/docker/libnetwork/types"
@ -23,16 +23,6 @@ func NewMockStore() *MockStore {
return &MockStore{db: make(map[string]*MockData)}
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *MockStore) Get(key string) (*store.KVPair, error) {
mData := s.db[key]
if mData == nil {
return nil, nil
}
return &store.KVPair{Value: mData.Data, LastIndex: mData.Index}, nil
}
// Put a value at "key"
func (s *MockStore) Put(key string, value []byte) error {
mData := s.db[key]
@ -52,7 +42,16 @@ func (s *MockStore) Exists(key string) (bool, error) {
// List gets a range of values at "directory"
func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
return nil, errors.New("not implemented")
var res []*store.KVPair
for k, v := range s.db {
if strings.HasPrefix(k, prefix) {
res = append(res, &store.KVPair{Key: k, Value: v.Data, LastIndex: v.Index})
}
}
if len(res) == 0 {
return nil, store.ErrKeyNotFound
}
return res, nil
}
// AtomicPut put a value at "key" if the key has not been

View file

@ -12,7 +12,6 @@ import (
"github.com/docker/docker/libnetwork/datastore"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
)
@ -267,10 +266,6 @@ func (ncfg *networkConfiguration) CopyTo(o datastore.KVObject) error {
return nil
}
func (ncfg *networkConfiguration) DataScope() string {
return scope.Local
}
func (ep *bridgeEndpoint) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = ep.id
@ -384,10 +379,6 @@ func (ep *bridgeEndpoint) CopyTo(o datastore.KVObject) error {
return nil
}
func (ep *bridgeEndpoint) DataScope() string {
return scope.Local
}
func (n *bridgeNetwork) restorePortAllocations(ep *bridgeEndpoint) {
if ep.extConnConfig == nil ||
ep.extConnConfig.ExposedPorts == nil ||

View file

@ -12,7 +12,6 @@ import (
"github.com/docker/docker/libnetwork/datastore"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
)
@ -258,10 +257,6 @@ func (config *configuration) CopyTo(o datastore.KVObject) error {
return nil
}
func (config *configuration) DataScope() string {
return scope.Local
}
func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = ep.id
@ -357,7 +352,3 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
*dstEp = *ep
return nil
}
func (ep *endpoint) DataScope() string {
return scope.Local
}

View file

@ -12,7 +12,6 @@ import (
"github.com/docker/docker/libnetwork/datastore"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
)
@ -252,10 +251,6 @@ func (config *configuration) CopyTo(o datastore.KVObject) error {
return nil
}
func (config *configuration) DataScope() string {
return scope.Local
}
func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = ep.id
@ -351,7 +346,3 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
*dstEp = *ep
return nil
}
func (ep *endpoint) DataScope() string {
return scope.Local
}

View file

@ -12,7 +12,6 @@ import (
"github.com/docker/docker/libnetwork/datastore"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/netlabel"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
)
@ -221,10 +220,6 @@ func (ncfg *networkConfiguration) CopyTo(o datastore.KVObject) error {
return nil
}
func (ncfg *networkConfiguration) DataScope() string {
return scope.Local
}
func (ep *hnsEndpoint) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = ep.id
@ -333,7 +328,3 @@ func (ep *hnsEndpoint) CopyTo(o datastore.KVObject) error {
*dstEp = *ep
return nil
}
func (ep *hnsEndpoint) DataScope() string {
return scope.Local
}

View file

@ -1019,10 +1019,6 @@ func JoinOptionPriority(prio int) EndpointOption {
}
}
func (ep *Endpoint) DataScope() string {
return ep.getNetwork().DataScope()
}
func (ep *Endpoint) assignAddress(ipam ipamapi.Ipam, assignIPv4, assignIPv6 bool) error {
var err error

View file

@ -97,10 +97,6 @@ func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
return nil
}
func (ec *endpointCnt) DataScope() string {
return ec.n.DataScope()
}
func (ec *endpointCnt) EndpointCnt() uint64 {
ec.Lock()
defer ec.Unlock()
@ -111,7 +107,7 @@ func (ec *endpointCnt) EndpointCnt() uint64 {
func (ec *endpointCnt) updateStore() error {
store := ec.n.getController().getStore()
if store == nil {
return fmt.Errorf("store not found for scope %s on endpoint count update", ec.DataScope())
return fmt.Errorf("store not found on endpoint count update")
}
// make a copy of count and n to avoid being overwritten by store.GetObject
count := ec.EndpointCnt()
@ -140,7 +136,7 @@ func (ec *endpointCnt) setCnt(cnt uint64) error {
func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
store := ec.n.getController().getStore()
if store == nil {
return fmt.Errorf("store not found for scope %s", ec.DataScope())
return fmt.Errorf("store not found on endpoint count atomic inc/dec")
}
tmp := &endpointCnt{n: ec.n}

View file

@ -109,44 +109,6 @@ func (b *BoltDB) releaseDBhandle() {
}
}
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
b.mu.Lock()
defer b.mu.Unlock()
db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()
var val []byte
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
v := bucket.Get([]byte(key))
val = make([]byte, len(v))
copy(val, v)
return nil
})
if err != nil {
return nil, err
}
if len(val) == 0 {
return nil, store.ErrKeyNotFound
}
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
val = val[libkvmetadatalen:]
return &store.KVPair{Key: key, Value: val, LastIndex: dbIndex}, nil
}
// Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte) error {
b.mu.Lock()

View file

@ -39,9 +39,6 @@ type Store interface {
// Put a value at the specified key
Put(key string, value []byte) error
// Get a value given its key
Get(key string) (*KVPair, error)
// Exists verifies if a Key exists in the store.
Exists(key string) (bool, error)

View file

@ -510,15 +510,6 @@ func (n *Network) CopyTo(o datastore.KVObject) error {
return nil
}
func (n *Network) DataScope() string {
s := n.Scope()
// All swarm scope networks have local datascope
if s == scope.Swarm {
s = scope.Local
}
return s
}
func (n *Network) getEpCnt() *endpointCnt {
n.mu.Lock()
defer n.mu.Unlock()
@ -1773,7 +1764,7 @@ func (n *Network) deriveAddressSpace() (string, error) {
if err != nil {
return "", types.NotFoundErrorf("failed to get default address space: %v", err)
}
if n.DataScope() == scope.Global {
if n.Scope() == scope.Global {
return global, nil
}
return local, nil

View file

@ -9,7 +9,6 @@ import (
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/datastore"
"github.com/docker/docker/libnetwork/osl"
"github.com/docker/docker/libnetwork/scope"
)
const (
@ -122,10 +121,6 @@ func (sbs *sbState) CopyTo(o datastore.KVObject) error {
return nil
}
func (sbs *sbState) DataScope() string {
return scope.Local
}
func (sb *Sandbox) storeUpdate() error {
sbs := &sbState{
c: sb.controller,
@ -184,7 +179,7 @@ func (c *Controller) sandboxCleanup(activeSandboxes map[string]interface{}) erro
// It's normal for no sandboxes to be found. Just bail out.
return nil
}
return fmt.Errorf("failed to get sandboxes for scope %s: %v", store.Scope(), err)
return fmt.Errorf("failed to get sandboxes: %v", err)
}
for _, s := range sandboxStates {

View file

@ -68,7 +68,7 @@ func (c *Controller) getNetworks() ([]*Network, error) {
n.epCnt = ec
if n.scope == "" {
n.scope = store.Scope()
n.scope = scope.Local
}
nl = append(nl, n)
}
@ -105,7 +105,7 @@ func (c *Controller) getNetworksFromStore() []*Network { // FIXME: unify with c.
n.epCnt = ec
}
if n.scope == "" {
n.scope = store.Scope()
n.scope = scope.Local
}
n.mu.Unlock()
nl = append(nl, n)
@ -132,8 +132,8 @@ func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
if err != nil {
if err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w",
n.Name(), store.Scope(), err)
return nil, fmt.Errorf("failed to get endpoints for network %s: %w",
n.Name(), err)
}
return nil, nil
}