Przeglądaj źródła

Initial kv store integration and datastore implementation

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 10 lat temu
rodzic
commit
9b952fc982

+ 16 - 0
libnetwork/controller.go

@@ -48,8 +48,10 @@ package libnetwork
 import (
 import (
 	"sync"
 	"sync"
 
 
+	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/plugins"
 	"github.com/docker/docker/pkg/plugins"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/sandbox"
 	"github.com/docker/libnetwork/sandbox"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
@@ -95,6 +97,7 @@ type controller struct {
 	networks  networkTable
 	networks  networkTable
 	drivers   driverTable
 	drivers   driverTable
 	sandboxes sandboxTable
 	sandboxes sandboxTable
+	store     datastore.DataStore
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
@@ -107,6 +110,18 @@ func New() (NetworkController, error) {
 	if err := initDrivers(c); err != nil {
 	if err := initDrivers(c); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
+	/* TODO : Duh ! make this configurable :-) */
+	config := &datastore.StoreConfiguration{}
+	config.Provider = "consul"
+	config.Addrs = []string{"localhost:8500"}
+
+	store, err := datastore.NewDataStore(config)
+	if err != nil {
+		log.Error("Failed to connect with Consul server")
+	}
+	c.store = store
+
 	return c, nil
 	return c, nil
 }
 }
 
 
@@ -176,6 +191,7 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 	// Store the network handler in controller
 	// Store the network handler in controller
 	c.Lock()
 	c.Lock()
 	c.networks[network.id] = network
 	c.networks[network.id] = network
+	c.store.PutObjectAtomic(network)
 	c.Unlock()
 	c.Unlock()
 
 
 	return network, nil
 	return network, nil

+ 111 - 0
libnetwork/datastore/datastore.go

@@ -0,0 +1,111 @@
+package datastore
+
+import (
+	"errors"
+	"strings"
+
+	"github.com/docker/swarm/pkg/store"
+)
+
+//DataStore exported
+type DataStore interface {
+	// PutObject adds a new Record based on an object into the datastore
+	PutObject(kvObject KV) error
+	// PutObjectAtomic provides an atomic add and update operation for a Record
+	PutObjectAtomic(kvObject KV) error
+	// KVStore returns access to the KV Store
+	KVStore() store.Store
+}
+
+type datastore struct {
+	store  store.Store
+	config *StoreConfiguration
+}
+
+//StoreConfiguration exported
+type StoreConfiguration struct {
+	Addrs    []string
+	Provider string
+}
+
+//KV Key Value interface used by objects to be part of the DataStore
+type KV interface {
+	Key() []string
+	Value() []byte
+	Index() uint64
+	SetIndex(uint64)
+}
+
+//Key provides convenient method to create a Key
+func Key(key ...string) string {
+	keychain := []string{"docker", "libnetwork"}
+	keychain = append(keychain, key...)
+	str := strings.Join(keychain, "/")
+	return str + "/"
+}
+
+var errNewDatastore = errors.New("Error creating new Datastore")
+var errInvalidConfiguration = errors.New("Invalid Configuration passed to Datastore")
+var errInvalidAtomicRequest = errors.New("Invalid Atomic Request")
+
+// newClient used to connect to KV Store
+func newClient(kv string, addrs []string) (DataStore, error) {
+	store, err := store.CreateStore(kv, addrs, store.Config{})
+	if err != nil {
+		return nil, err
+	}
+	ds := &datastore{store: store}
+	return ds, nil
+}
+
+// NewDataStore creates a new instance of LibKV data store
+func NewDataStore(config *StoreConfiguration) (DataStore, error) {
+	if config == nil {
+		return nil, errInvalidConfiguration
+	}
+	return newClient(config.Provider, config.Addrs)
+}
+
+func (ds *datastore) KVStore() store.Store {
+	return ds.store
+}
+
+// PutObjectAtomic adds a new Record based on an object into the datastore
+func (ds *datastore) PutObjectAtomic(kvObject KV) error {
+	if kvObject == nil {
+		return errors.New("kvObject is nil")
+	}
+	kvObjValue := kvObject.Value()
+
+	if kvObjValue == nil {
+		return errInvalidAtomicRequest
+	}
+	_, err := ds.store.AtomicPut(Key(kvObject.Key()...), []byte{}, kvObjValue, kvObject.Index())
+	if err != nil {
+		return err
+	}
+
+	_, index, err := ds.store.Get(Key(kvObject.Key()...))
+	if err != nil {
+		return err
+	}
+	kvObject.SetIndex(index)
+	return nil
+}
+
+// PutObject adds a new Record based on an object into the datastore
+func (ds *datastore) PutObject(kvObject KV) error {
+	if kvObject == nil {
+		return errors.New("kvObject is nil")
+	}
+	return ds.putObjectWithKey(kvObject, kvObject.Key()...)
+}
+
+func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
+	kvObjValue := kvObject.Value()
+
+	if kvObjValue == nil {
+		return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...))
+	}
+	return ds.store.Put(Key(key...), kvObjValue)
+}

+ 177 - 0
libnetwork/datastore/datastore_test.go

@@ -0,0 +1,177 @@
+package datastore
+
+import (
+	"encoding/json"
+	"testing"
+
+	_ "github.com/docker/libnetwork/netutils"
+	"github.com/docker/libnetwork/options"
+)
+
+var dummyKey = "dummy"
+
+func TestInvalidDataStore(t *testing.T) {
+	config := &StoreConfiguration{}
+	config.Provider = "invalid"
+	config.Addrs = []string{"localhost:8500"}
+	_, err := NewDataStore(config)
+	if err == nil {
+		t.Fatal("Invalid Datastore connection configuration must result in a failure")
+	}
+}
+
+func TestKVObjectFlatKey(t *testing.T) {
+	mockStore := newMockStore()
+	store := datastore{store: mockStore}
+	expected := dummyKVObject("1000", true)
+	err := store.PutObject(expected)
+	if err != nil {
+		t.Fatal(err)
+	}
+	keychain := []string{dummyKey, "1000"}
+	data, _, err := store.KVStore().Get(Key(keychain...))
+	if err != nil {
+		t.Fatal(err)
+	}
+	var n dummyObject
+	json.Unmarshal(data, &n)
+	if n.Name != expected.Name {
+		t.Fatalf("Dummy object doesnt match the expected object")
+	}
+}
+
+func TestAtomicKVObjectFlatKey(t *testing.T) {
+	mockStore := newMockStore()
+	store := datastore{store: mockStore}
+	expected := dummyKVObject("1111", true)
+	err := store.PutObjectAtomic(expected)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// PutObjectAtomic automatically sets the Index again. Hence the following must pass.
+
+	err = store.PutObjectAtomic(expected)
+	if err != nil {
+		t.Fatal("Atomic update with an older Index must fail")
+	}
+
+	// Get the latest index and try PutObjectAtomic again for the same Key
+	// This must succeed as well
+	data, index, err := store.KVStore().Get(Key(expected.Key()...))
+	if err != nil {
+		t.Fatal(err)
+	}
+	n := dummyObject{}
+	json.Unmarshal(data, &n)
+	n.ID = "1111"
+	n.DBIndex = index
+	n.ReturnValue = true
+	err = store.PutObjectAtomic(&n)
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// dummy data used to test the datastore
+type dummyObject struct {
+	Name        string                `kv:"leaf"`
+	NetworkType string                `kv:"leaf"`
+	EnableIPv6  bool                  `kv:"leaf"`
+	Rec         *recStruct            `kv:"recursive"`
+	Dict        map[string]*recStruct `kv:"iterative"`
+	Generic     options.Generic       `kv:"iterative"`
+	ID          string
+	DBIndex     uint64
+	ReturnValue bool
+}
+
+func (n *dummyObject) Key() []string {
+	return []string{dummyKey, n.ID}
+}
+func (n *dummyObject) Value() []byte {
+	if !n.ReturnValue {
+		return nil
+	}
+
+	b, err := json.Marshal(n)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (n *dummyObject) Index() uint64 {
+	return n.DBIndex
+}
+
+func (n *dummyObject) SetIndex(index uint64) {
+	n.DBIndex = index
+}
+
+func (n *dummyObject) MarshalJSON() ([]byte, error) {
+	netMap := make(map[string]interface{})
+	netMap["name"] = n.Name
+	netMap["networkType"] = n.NetworkType
+	netMap["enableIPv6"] = n.EnableIPv6
+	netMap["generic"] = n.Generic
+	return json.Marshal(netMap)
+}
+
+func (n *dummyObject) UnmarshalJSON(b []byte) (err error) {
+	var netMap map[string]interface{}
+	if err := json.Unmarshal(b, &netMap); err != nil {
+		return err
+	}
+	n.Name = netMap["name"].(string)
+	n.NetworkType = netMap["networkType"].(string)
+	n.EnableIPv6 = netMap["enableIPv6"].(bool)
+	n.Generic = netMap["generic"].(map[string]interface{})
+	return nil
+}
+
+// dummy structure to test "recursive" cases
+type recStruct struct {
+	Name    string            `kv:"leaf"`
+	Field1  int               `kv:"leaf"`
+	Dict    map[string]string `kv:"iterative"`
+	DBIndex uint64
+}
+
+func (r *recStruct) Key() []string {
+	return []string{"recStruct"}
+}
+func (r *recStruct) Value() []byte {
+	b, err := json.Marshal(r)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (r *recStruct) Index() uint64 {
+	return r.DBIndex
+}
+
+func (r *recStruct) SetIndex(index uint64) {
+	r.DBIndex = index
+}
+
+func dummyKVObject(id string, retValue bool) *dummyObject {
+	cDict := make(map[string]string)
+	cDict["foo"] = "bar"
+	cDict["hello"] = "world"
+	n := dummyObject{
+		Name:        "testNw",
+		NetworkType: "bridge",
+		EnableIPv6:  true,
+		Rec:         &recStruct{"gen", 5, cDict, 0},
+		ID:          id,
+		DBIndex:     0,
+		ReturnValue: retValue}
+	generic := make(map[string]interface{})
+	generic["label1"] = &recStruct{"value1", 1, cDict, 0}
+	generic["label2"] = "subnet=10.1.1.0/16"
+	n.Generic = generic
+	return &n
+}

+ 131 - 0
libnetwork/datastore/mock_store.go

@@ -0,0 +1,131 @@
+package datastore
+
+import (
+	"errors"
+	"time"
+
+	"github.com/docker/swarm/pkg/store"
+)
+
+var (
+	// ErrNotImplmented exported
+	ErrNotImplmented = errors.New("Functionality not implemented")
+)
+
+// MockData exported
+type MockData struct {
+	Data  []byte
+	Index uint64
+}
+
+// MockStore exported
+type MockStore struct {
+	db map[string]*MockData
+}
+
+// NewMockStore creates a Map backed Datastore that is useful for mocking
+func NewMockStore() *MockStore {
+	db := make(map[string]*MockData)
+	return &MockStore{db}
+}
+
+// Get the value at "key", returns the last modified index
+// to use in conjunction to CAS calls
+func (s *MockStore) Get(key string) (value []byte, lastIndex uint64, err error) {
+	mData := s.db[key]
+	if mData == nil {
+		return nil, 0, nil
+	}
+	return mData.Data, mData.Index, nil
+
+}
+
+// Put a value at "key"
+func (s *MockStore) Put(key string, value []byte) error {
+	mData := s.db[key]
+	if mData == nil {
+		mData = &MockData{value, 0}
+	}
+	mData.Index = mData.Index + 1
+	s.db[key] = mData
+	return nil
+}
+
+// Delete a value at "key"
+func (s *MockStore) Delete(key string) error {
+	delete(s.db, key)
+	return nil
+}
+
+// Exists checks that the key exists inside the store
+func (s *MockStore) Exists(key string) (bool, error) {
+	_, ok := s.db[key]
+	return ok, nil
+}
+
+// GetRange gets a range of values at "directory"
+func (s *MockStore) GetRange(prefix string) (values []store.KVEntry, err error) {
+	return nil, ErrNotImplmented
+}
+
+// DeleteRange deletes a range of values at "directory"
+func (s *MockStore) DeleteRange(prefix string) error {
+	return ErrNotImplmented
+}
+
+// Watch a single key for modifications
+func (s *MockStore) Watch(key string, heartbeat time.Duration, callback store.WatchCallback) error {
+	return ErrNotImplmented
+}
+
+// CancelWatch cancels a watch, sends a signal to the appropriate
+// stop channel
+func (s *MockStore) CancelWatch(key string) error {
+	return ErrNotImplmented
+}
+
+// Internal function to check if a key has changed
+func (s *MockStore) waitForChange(key string) <-chan uint64 {
+	return nil
+}
+
+// WatchRange triggers a watch on a range of values at "directory"
+func (s *MockStore) WatchRange(prefix string, filter string, heartbeat time.Duration, callback store.WatchCallback) error {
+	return ErrNotImplmented
+}
+
+// CancelWatchRange stops the watch on the range of values, sends
+// a signal to the appropriate stop channel
+func (s *MockStore) CancelWatchRange(prefix string) error {
+	return ErrNotImplmented
+}
+
+// Acquire the lock for "key"/"directory"
+func (s *MockStore) Acquire(key string, value []byte) (string, error) {
+	return "", ErrNotImplmented
+}
+
+// Release the lock for "key"/"directory"
+func (s *MockStore) Release(id string) error {
+	return ErrNotImplmented
+}
+
+// AtomicPut put a value at "key" if the key has not been
+// modified in the meantime, throws an error if this is the case
+func (s *MockStore) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
+	mData := s.db[key]
+	if mData != nil && mData.Index != index {
+		return false, errInvalidAtomicRequest
+	}
+	return true, s.Put(key, newValue)
+}
+
+// AtomicDelete deletes a value at "key" if the key has not
+// been modified in the meantime, throws an error if this is the case
+func (s *MockStore) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
+	mData := s.db[key]
+	if mData != nil && mData.Index != index {
+		return false, errInvalidAtomicRequest
+	}
+	return true, s.Delete(key)
+}

+ 47 - 0
libnetwork/network.go

@@ -1,6 +1,7 @@
 package libnetwork
 package libnetwork
 
 
 import (
 import (
+	"encoding/json"
 	"sync"
 	"sync"
 
 
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/stringid"
@@ -56,6 +57,7 @@ type network struct {
 	enableIPv6  bool
 	enableIPv6  bool
 	endpoints   endpointTable
 	endpoints   endpointTable
 	generic     options.Generic
 	generic     options.Generic
+	dbIndex     uint64
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
@@ -75,6 +77,51 @@ func (n *network) Type() string {
 	return n.driver.Type()
 	return n.driver.Type()
 }
 }
 
 
+func (n *network) Key() []string {
+	return []string{"network", string(n.id)}
+}
+
+func (n *network) Value() []byte {
+	b, err := json.Marshal(n)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (n *network) Index() uint64 {
+	return n.dbIndex
+}
+
+func (n *network) SetIndex(index uint64) {
+	n.dbIndex = index
+}
+
+// TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
+func (n *network) MarshalJSON() ([]byte, error) {
+	netMap := make(map[string]interface{})
+	netMap["name"] = n.name
+	netMap["id"] = string(n.id)
+	netMap["networkType"] = n.networkType
+	netMap["enableIPv6"] = n.enableIPv6
+	netMap["generic"] = n.generic
+	return json.Marshal(netMap)
+}
+
+// TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
+func (n *network) UnmarshalJSON(b []byte) (err error) {
+	var netMap map[string]interface{}
+	if err := json.Unmarshal(b, &netMap); err != nil {
+		return err
+	}
+	n.name = netMap["name"].(string)
+	n.id = netMap["id"].(types.UUID)
+	n.networkType = netMap["networkType"].(string)
+	n.enableIPv6 = netMap["enableIPv6"].(bool)
+	n.generic = netMap["generic"].(map[string]interface{})
+	return nil
+}
+
 // NetworkOption is a option setter function type used to pass varios options to
 // NetworkOption is a option setter function type used to pass varios options to
 // NewNetwork method. The various setter functions of type NetworkOption are
 // NewNetwork method. The various setter functions of type NetworkOption are
 // provided by libnetwork, they look like NetworkOptionXXXX(...)
 // provided by libnetwork, they look like NetworkOptionXXXX(...)