Merge pull request #46041 from thaJeztah/datastore_remove_libkv
libnetwork: remove most of kvstore
This commit is contained in:
commit
c30c3f94c7
8 changed files with 102 additions and 160 deletions
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/docker/docker/libnetwork/discoverapi"
|
||||
store "github.com/docker/docker/libnetwork/internal/kvstore"
|
||||
"github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
|
||||
"github.com/docker/docker/libnetwork/types"
|
||||
)
|
||||
|
||||
|
@ -115,9 +116,7 @@ func DefaultScope(dataDir string) ScopeCfg {
|
|||
|
||||
// IsValid checks if the scope config has valid configuration.
|
||||
func (cfg *ScopeCfg) IsValid() bool {
|
||||
if cfg == nil ||
|
||||
strings.TrimSpace(cfg.Client.Provider) == "" ||
|
||||
strings.TrimSpace(cfg.Client.Address) == "" {
|
||||
if cfg == nil || strings.TrimSpace(cfg.Client.Provider) == "" || strings.TrimSpace(cfg.Client.Address) == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -138,27 +137,16 @@ func Key(key ...string) string {
|
|||
|
||||
// newClient used to connect to KV Store
|
||||
func newClient(kv string, addr string, config *store.Config) (*Store, error) {
|
||||
if kv != string(store.BOLTDB) {
|
||||
return nil, fmt.Errorf("unsupported KV store")
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
config = &store.Config{}
|
||||
}
|
||||
|
||||
var addrs []string
|
||||
|
||||
if kv == string(store.BOLTDB) {
|
||||
// Parse file path
|
||||
addrs = strings.Split(addr, ",")
|
||||
} else {
|
||||
// Parse URI
|
||||
parts := strings.SplitN(addr, "/", 2)
|
||||
addrs = strings.Split(parts[0], ",")
|
||||
|
||||
// Add the custom prefix to the root chain
|
||||
if len(parts) == 2 {
|
||||
rootChain = append([]string{parts[1]}, defaultRootChain...)
|
||||
}
|
||||
}
|
||||
|
||||
s, err := store.New(store.Backend(kv), addrs, config)
|
||||
// Parse file path
|
||||
s, err := boltdb.New(strings.Split(addr, ","), config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -214,11 +202,6 @@ func (ds *Store) Scope() string {
|
|||
return ds.scope
|
||||
}
|
||||
|
||||
// KVStore returns access to the KV Store.
|
||||
func (ds *Store) KVStore() store.Store {
|
||||
return ds.store
|
||||
}
|
||||
|
||||
// PutObjectAtomic provides an atomic add and update operation for a Record.
|
||||
func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
|
||||
var (
|
||||
|
@ -390,7 +373,7 @@ func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
|
|||
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
|
||||
|
||||
if kvObject.Skip() {
|
||||
goto del_cache
|
||||
goto deleteCache
|
||||
}
|
||||
|
||||
if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
|
||||
|
@ -400,7 +383,7 @@ func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
|
|||
return err
|
||||
}
|
||||
|
||||
del_cache:
|
||||
deleteCache:
|
||||
// cleanup the cache only if AtomicDelete went through successfully
|
||||
if ds.cache != nil {
|
||||
// If persistent store is skipped, sequencing needs to
|
||||
|
|
|
@ -6,9 +6,10 @@ import (
|
|||
|
||||
"github.com/docker/docker/libnetwork/options"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
)
|
||||
|
||||
var dummyKey = "dummy"
|
||||
const dummyKey = "dummy"
|
||||
|
||||
// NewTestDataStore can be used by other Tests in order to use custom datastore
|
||||
func NewTestDataStore() *Store {
|
||||
|
@ -16,11 +17,9 @@ func NewTestDataStore() *Store {
|
|||
}
|
||||
|
||||
func TestKey(t *testing.T) {
|
||||
eKey := []string{"hello", "world"}
|
||||
sKey := Key(eKey...)
|
||||
if sKey != "docker/network/v1.0/hello/world/" {
|
||||
t.Fatalf("unexpected key : %s", sKey)
|
||||
}
|
||||
sKey := Key("hello", "world")
|
||||
const expected = "docker/network/v1.0/hello/world/"
|
||||
assert.Check(t, is.Equal(sKey, expected))
|
||||
}
|
||||
|
||||
func TestInvalidDataStore(t *testing.T) {
|
||||
|
@ -30,28 +29,19 @@ func TestInvalidDataStore(t *testing.T) {
|
|||
Address: "localhost:8500",
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("Invalid Datastore connection configuration must result in a failure")
|
||||
}
|
||||
assert.Check(t, is.Error(err, "unsupported KV store"))
|
||||
}
|
||||
|
||||
func TestKVObjectFlatKey(t *testing.T) {
|
||||
store := NewTestDataStore()
|
||||
expected := dummyKVObject("1000", true)
|
||||
err := store.PutObjectAtomic(expected)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
keychain := []string{dummyKey, "1000"}
|
||||
data, err := store.KVStore().Get(Key(keychain...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var n dummyObject
|
||||
json.Unmarshal(data.Value, &n)
|
||||
if n.Name != expected.Name {
|
||||
t.Fatal("Dummy object doesn't match the expected object")
|
||||
}
|
||||
assert.Check(t, err)
|
||||
|
||||
n := dummyObject{ID: "1000"} // GetObject uses KVObject.Key() for cache lookup.
|
||||
err = store.GetObject(Key(dummyKey, "1000"), &n)
|
||||
assert.Check(t, err)
|
||||
assert.Check(t, is.Equal(n.Name, expected.Name))
|
||||
}
|
||||
|
||||
func TestAtomicKVObjectFlatKey(t *testing.T) {
|
||||
|
@ -59,45 +49,30 @@ func TestAtomicKVObjectFlatKey(t *testing.T) {
|
|||
expected := dummyKVObject("1111", true)
|
||||
assert.Check(t, !expected.Exists())
|
||||
err := store.PutObjectAtomic(expected)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Check(t, err)
|
||||
assert.Check(t, expected.Exists())
|
||||
|
||||
// PutObjectAtomic automatically sets the Index again. Hence the following must pass.
|
||||
|
||||
err = store.PutObjectAtomic(expected)
|
||||
if err != nil {
|
||||
t.Fatal("Atomic update should succeed.")
|
||||
}
|
||||
assert.Check(t, err, "Atomic update should succeed.")
|
||||
|
||||
// Get the latest index and try PutObjectAtomic again for the same Key
|
||||
// This must succeed as well
|
||||
data, err := store.KVStore().Get(Key(expected.Key()...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n := dummyObject{}
|
||||
json.Unmarshal(data.Value, &n)
|
||||
n.ID = "1111"
|
||||
n.SetIndex(data.LastIndex)
|
||||
n := dummyObject{ID: "1111"} // GetObject uses KVObject.Key() for cache lookup.
|
||||
err = store.GetObject(Key(expected.Key()...), &n)
|
||||
assert.Check(t, err)
|
||||
n.ReturnValue = true
|
||||
err = store.PutObjectAtomic(&n)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Check(t, err)
|
||||
|
||||
// Get the Object using GetObject, then set again.
|
||||
newObj := dummyObject{}
|
||||
newObj := dummyObject{ID: "1111"} // GetObject uses KVObject.Key() for cache lookup.
|
||||
err = store.GetObject(Key(expected.Key()...), &newObj)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Check(t, err)
|
||||
assert.Check(t, newObj.Exists())
|
||||
err = store.PutObjectAtomic(&n)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Check(t, err)
|
||||
}
|
||||
|
||||
// dummy data used to test the datastore
|
||||
|
@ -161,15 +136,15 @@ func (n *dummyObject) DataScope() string {
|
|||
}
|
||||
|
||||
func (n *dummyObject) MarshalJSON() ([]byte, error) {
|
||||
netMap := make(map[string]interface{})
|
||||
netMap["name"] = n.Name
|
||||
netMap["networkType"] = n.NetworkType
|
||||
netMap["enableIPv6"] = n.EnableIPv6
|
||||
netMap["generic"] = n.Generic
|
||||
return json.Marshal(netMap)
|
||||
return json.Marshal(map[string]interface{}{
|
||||
"name": n.Name,
|
||||
"networkType": n.NetworkType,
|
||||
"enableIPv6": n.EnableIPv6,
|
||||
"generic": n.Generic,
|
||||
})
|
||||
}
|
||||
|
||||
func (n *dummyObject) UnmarshalJSON(b []byte) (err error) {
|
||||
func (n *dummyObject) UnmarshalJSON(b []byte) error {
|
||||
var netMap map[string]interface{}
|
||||
if err := json.Unmarshal(b, &netMap); err != nil {
|
||||
return err
|
||||
|
@ -225,23 +200,23 @@ func (r *recStruct) Skip() bool {
|
|||
}
|
||||
|
||||
func dummyKVObject(id string, retValue bool) *dummyObject {
|
||||
cDict := make(map[string]string)
|
||||
cDict["foo"] = "bar"
|
||||
cDict["hello"] = "world"
|
||||
n := dummyObject{
|
||||
cDict := map[string]string{
|
||||
"foo": "bar",
|
||||
"hello": "world",
|
||||
}
|
||||
return &dummyObject{
|
||||
Name: "testNw",
|
||||
NetworkType: "bridge",
|
||||
EnableIPv6: true,
|
||||
Rec: &recStruct{"gen", 5, cDict, 0, false, false},
|
||||
Rec: &recStruct{Name: "gen", Field1: 5, Dict: cDict},
|
||||
ID: id,
|
||||
DBIndex: 0,
|
||||
ReturnValue: retValue,
|
||||
DBExists: false,
|
||||
SkipSave: false,
|
||||
Generic: map[string]interface{}{
|
||||
"label1": &recStruct{Name: "value1", Field1: 1, Dict: cDict},
|
||||
"label2": "subnet=10.1.1.0/16",
|
||||
},
|
||||
}
|
||||
generic := make(map[string]interface{})
|
||||
generic["label1"] = &recStruct{"value1", 1, cDict, 0, false, false}
|
||||
generic["label2"] = "subnet=10.1.1.0/16"
|
||||
n.Generic = generic
|
||||
return &n
|
||||
}
|
||||
|
|
|
@ -6,14 +6,9 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/docker/docker/libnetwork/driverapi"
|
||||
"github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
|
||||
"github.com/docker/docker/pkg/plugingetter"
|
||||
)
|
||||
|
||||
func init() {
|
||||
boltdb.Register()
|
||||
}
|
||||
|
||||
type driverTester struct {
|
||||
t *testing.T
|
||||
d *driver
|
||||
|
|
|
@ -45,11 +45,6 @@ const (
|
|||
transientTimeout = time.Duration(10) * time.Second
|
||||
)
|
||||
|
||||
// Register registers boltdb to libkv
|
||||
func Register() {
|
||||
store.AddStore(store.BOLTDB, New)
|
||||
}
|
||||
|
||||
// New opens a new BoltDB connection to the specified path and bucket
|
||||
func New(endpoints []string, options *store.Config) (store.Store, error) {
|
||||
if len(endpoints) > 1 {
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package kvstore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Initialize creates a new Store object, initializing the client
|
||||
type Initialize func(addrs []string, options *Config) (Store, error)
|
||||
|
||||
var (
|
||||
// Backend initializers
|
||||
initializers = make(map[Backend]Initialize)
|
||||
|
||||
supportedBackend = func() string {
|
||||
keys := make([]string, 0, len(initializers))
|
||||
for k := range initializers {
|
||||
keys = append(keys, string(k))
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return strings.Join(keys, ", ")
|
||||
}()
|
||||
)
|
||||
|
||||
// New creates an instance of store
|
||||
func New(backend Backend, addrs []string, options *Config) (Store, error) {
|
||||
if init, exists := initializers[backend]; exists {
|
||||
return init(addrs, options)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%s %s", ErrBackendNotSupported.Error(), supportedBackend)
|
||||
}
|
||||
|
||||
// AddStore adds a new store backend to libkv
|
||||
func AddStore(store Backend, init Initialize) {
|
||||
initializers[store] = init
|
||||
}
|
|
@ -7,16 +7,9 @@ import (
|
|||
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
"github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
|
||||
)
|
||||
|
||||
func registerKVStores() {
|
||||
boltdb.Register()
|
||||
}
|
||||
|
||||
func (c *Controller) initStores() error {
|
||||
registerKVStores()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package libnetwork
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/libnetwork/config"
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
store "github.com/docker/docker/libnetwork/internal/kvstore"
|
||||
)
|
||||
|
@ -19,9 +21,15 @@ func TestBoltdbBackend(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNoPersist(t *testing.T) {
|
||||
testController, err := New(OptionBoltdbWithRandomDBFile(t))
|
||||
dbFile := filepath.Join(t.TempDir(), "bolt.db")
|
||||
configOption := func(c *config.Config) {
|
||||
c.Scope.Client.Provider = "boltdb"
|
||||
c.Scope.Client.Address = dbFile
|
||||
c.Scope.Client.Config = &store.Config{Bucket: "testBackend"}
|
||||
}
|
||||
testController, err := New(configOption)
|
||||
if err != nil {
|
||||
t.Fatalf("Error new controller: %v", err)
|
||||
t.Fatalf("Error creating new controller: %v", err)
|
||||
}
|
||||
defer testController.Stop()
|
||||
nw, err := testController.NewNetwork("host", "host", "", NetworkOptionPersist(false))
|
||||
|
@ -32,12 +40,32 @@ func TestNoPersist(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Error creating endpoint: %v", err)
|
||||
}
|
||||
kvStore := testController.getStore().KVStore()
|
||||
if exists, _ := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); exists {
|
||||
t.Fatalf("Network with persist=false should not be stored in KV Store")
|
||||
testController.Stop()
|
||||
|
||||
// Create a new controller using the same database-file. The network
|
||||
// should not have persisted.
|
||||
testController, err = New(configOption)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating new controller: %v", err)
|
||||
}
|
||||
if exists, _ := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); exists {
|
||||
t.Fatalf("Endpoint in Network with persist=false should not be stored in KV Store")
|
||||
defer testController.Stop()
|
||||
|
||||
// FIXME(thaJeztah): GetObject uses the given key for lookups if no cache-store is present, but the KvObject's Key() to look up in cache....
|
||||
nwKVObject := &Network{id: nw.ID()}
|
||||
err = testController.getStore().GetObject(datastore.Key(datastore.NetworkKeyPrefix, nw.ID()), nwKVObject)
|
||||
if !errors.Is(err, store.ErrKeyNotFound) {
|
||||
t.Errorf("Expected %q error when retrieving network from store, got: %q", store.ErrKeyNotFound, err)
|
||||
}
|
||||
if nwKVObject.Exists() {
|
||||
t.Errorf("Network with persist=false should not be stored in KV Store")
|
||||
}
|
||||
|
||||
epKVObject := &Endpoint{network: nw, id: ep.ID()}
|
||||
err = testController.getStore().GetObject(datastore.Key(datastore.EndpointKeyPrefix, nw.ID(), ep.ID()), epKVObject)
|
||||
if !errors.Is(err, store.ErrKeyNotFound) {
|
||||
t.Errorf("Expected %v error when retrieving endpoint from store, got: %v", store.ErrKeyNotFound, err)
|
||||
}
|
||||
if epKVObject.Exists() {
|
||||
t.Errorf("Endpoint in Network with persist=false should not be stored in KV Store")
|
||||
}
|
||||
kvStore.Close()
|
||||
}
|
||||
|
|
|
@ -36,14 +36,25 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
|
|||
if err != nil {
|
||||
t.Fatalf("Error creating endpoint: %v", err)
|
||||
}
|
||||
kvStore := testController.getStore().KVStore()
|
||||
if exists, err := kvStore.Exists(datastore.Key(datastore.NetworkKeyPrefix, nw.ID())); !exists || err != nil {
|
||||
t.Fatalf("Network key should have been created.")
|
||||
// FIXME(thaJeztah): GetObject uses the given key for lookups if no cache-store is present, but the KvObject's Key() to look up in cache....
|
||||
nwKVObject := &Network{id: nw.ID()}
|
||||
err = testController.getStore().GetObject(datastore.Key(datastore.NetworkKeyPrefix, nw.ID()), nwKVObject)
|
||||
if err != nil {
|
||||
t.Errorf("Error when retrieving network key from store: %v", err)
|
||||
}
|
||||
if exists, err := kvStore.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, nw.ID(), ep.ID()}...)); !exists || err != nil {
|
||||
t.Fatalf("Endpoint key should have been created.")
|
||||
if !nwKVObject.Exists() {
|
||||
t.Errorf("Network key should have been created.")
|
||||
}
|
||||
kvStore.Close()
|
||||
|
||||
epKVObject := &Endpoint{network: nw, id: ep.ID()}
|
||||
err = testController.getStore().GetObject(datastore.Key(datastore.EndpointKeyPrefix, nw.ID(), ep.ID()), epKVObject)
|
||||
if err != nil {
|
||||
t.Errorf("Error when retrieving Endpoint key from store: %v", err)
|
||||
}
|
||||
if !epKVObject.Exists() {
|
||||
t.Errorf("Endpoint key should have been created.")
|
||||
}
|
||||
testController.Stop()
|
||||
|
||||
// test restore of local store
|
||||
testController, err = New(cfgOptions...)
|
||||
|
@ -52,7 +63,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
|
|||
}
|
||||
defer testController.Stop()
|
||||
if _, err = testController.NetworkByID(nw.ID()); err != nil {
|
||||
t.Fatalf("Error getting network %v", err)
|
||||
t.Errorf("Error getting network %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue