Pārlūkot izejas kodu

Handle datastore update in Ipam and overlay drivers

Signed-off-by: Alessandro Boch <aboch@docker.com>
Alessandro Boch 9 gadi atpakaļ
vecāks
revīzija
5dc5acfa58

+ 24 - 0
libnetwork/datastore/datastore.go

@@ -13,6 +13,7 @@ import (
 	"github.com/docker/libkv/store/consul"
 	"github.com/docker/libkv/store/etcd"
 	"github.com/docker/libkv/store/zookeeper"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/types"
 )
 
@@ -253,6 +254,29 @@ func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
 	return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
 }
 
+// NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
+func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
+	sCfg, ok := dsc.Config.(*store.Config)
+	if !ok {
+		return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
+	}
+
+	scopeCfg := &ScopeCfg{
+		Client: ScopeClientCfg{
+			Address:  dsc.Address,
+			Provider: dsc.Provider,
+			Config:   sCfg,
+		},
+	}
+
+	ds, err := NewDataStore(dsc.Scope, scopeCfg)
+	if err != nil {
+		return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
+	}
+
+	return ds, err
+}
+
 func (ds *datastore) Close() {
 	ds.store.Close()
 }

+ 5 - 4
libnetwork/discoverapi/discoverapi.go

@@ -16,8 +16,8 @@ type DiscoveryType int
 const (
 	// NodeDiscovery represents Node join/leave events provided by discovery
 	NodeDiscovery = iota + 1
-	// DatastoreUpdate represents a add/remove datastore event
-	DatastoreUpdate
+	// DatastoreConfig represents a add/remove datastore event
+	DatastoreConfig
 )
 
 // NodeDiscoveryData represents the structure backing the node discovery data json string
@@ -26,8 +26,9 @@ type NodeDiscoveryData struct {
 	Self    bool
 }
 
-// DatastoreUpdateData is the data for the datastore update event message
-type DatastoreUpdateData struct {
+// DatastoreConfigData is the data for the datastore update event message
+type DatastoreConfigData struct {
+	Scope    string
 	Provider string
 	Address  string
 	Config   interface{}

+ 7 - 4
libnetwork/drivers.go

@@ -3,6 +3,7 @@ package libnetwork
 import (
 	"strings"
 
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	builtinIpam "github.com/docker/libnetwork/ipams/builtin"
@@ -56,10 +57,12 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
 		if !v.IsValid() {
 			continue
 		}
-
-		config[netlabel.MakeKVProvider(k)] = v.Client.Provider
-		config[netlabel.MakeKVProviderURL(k)] = v.Client.Address
-		config[netlabel.MakeKVProviderConfig(k)] = v.Client.Config
+		config[netlabel.MakeKVClient(k)] = discoverapi.DatastoreConfigData{
+			Scope:    k,
+			Provider: v.Client.Provider,
+			Address:  v.Client.Address,
+			Config:   v.Client.Config,
+		}
 	}
 
 	return config

+ 8 - 20
libnetwork/drivers/bridge/bridge_store.go

@@ -6,9 +6,9 @@ import (
 	"net"
 
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/store/boltdb"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/types"
 )
@@ -16,27 +16,15 @@ import (
 const bridgePrefix = "bridge"
 
 func (d *driver) initStore(option map[string]interface{}) error {
-	var err error
-
-	provider, provOk := option[netlabel.LocalKVProvider]
-	provURL, urlOk := option[netlabel.LocalKVProviderURL]
-
-	if provOk && urlOk {
-		cfg := &datastore.ScopeCfg{
-			Client: datastore.ScopeClientCfg{
-				Provider: provider.(string),
-				Address:  provURL.(string),
-			},
+	if data, ok := option[netlabel.LocalKVClient]; ok {
+		var err error
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
 		}
-
-		provConfig, confOk := option[netlabel.LocalKVProviderConfig]
-		if confOk {
-			cfg.Client.Config = provConfig.(*store.Config)
-		}
-
-		d.store, err = datastore.NewDataStore(datastore.LocalScope, cfg)
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
 		if err != nil {
-			return fmt.Errorf("bridge driver failed to initialize data store: %v", err)
+			return types.InternalErrorf("bridge driver failed to initialize data store: %v", err)
 		}
 
 		return d.populateNetworks()

+ 52 - 32
libnetwork/drivers/overlay/overlay.go

@@ -6,12 +6,12 @@ import (
 	"sync"
 
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/idm"
 	"github.com/docker/libnetwork/netlabel"
+	"github.com/docker/libnetwork/types"
 	"github.com/hashicorp/serf/serf"
 )
 
@@ -25,6 +25,8 @@ const (
 	vxlanVethMTU = 1450
 )
 
+var initVxlanIdm = make(chan (bool), 1)
+
 type driver struct {
 	eventCh      chan serf.Event
 	notifyCh     chan ovNotify
@@ -56,6 +58,18 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
 		config: config,
 	}
 
+	if data, ok := config[netlabel.GlobalKVClient]; ok {
+		var err error
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
+		}
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
+		if err != nil {
+			return types.InternalErrorf("failed to initialize data store: %v", err)
+		}
+	}
+
 	return dc.RegisterDriver(networkType, d, c)
 }
 
@@ -73,42 +87,33 @@ func Fini(drv driverapi.Driver) {
 }
 
 func (d *driver) configure() error {
-	var err error
+	if d.store == nil {
+		return types.NoServiceErrorf("datastore is not available")
+	}
 
-	if len(d.config) == 0 {
-		return nil
+	if d.vxlanIdm == nil {
+		return d.initializeVxlanIdm()
 	}
 
-	d.once.Do(func() {
-		provider, provOk := d.config[netlabel.GlobalKVProvider]
-		provURL, urlOk := d.config[netlabel.GlobalKVProviderURL]
+	return nil
+}
 
-		if provOk && urlOk {
-			cfg := &datastore.ScopeCfg{
-				Client: datastore.ScopeClientCfg{
-					Provider: provider.(string),
-					Address:  provURL.(string),
-				},
-			}
-			provConfig, confOk := d.config[netlabel.GlobalKVProviderConfig]
-			if confOk {
-				cfg.Client.Config = provConfig.(*store.Config)
-			}
-			d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg)
-			if err != nil {
-				err = fmt.Errorf("failed to initialize data store: %v", err)
-				return
-			}
-		}
+func (d *driver) initializeVxlanIdm() error {
+	var err error
 
-		d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
-		if err != nil {
-			err = fmt.Errorf("failed to initialize vxlan id manager: %v", err)
-			return
-		}
-	})
+	initVxlanIdm <- true
+	defer func() { <-initVxlanIdm }()
+
+	if d.vxlanIdm != nil {
+		return nil
+	}
+
+	d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
+	if err != nil {
+		return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
+	}
 
-	return err
+	return nil
 }
 
 func (d *driver) Type() string {
@@ -187,12 +192,27 @@ func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
 
 // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
 func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
-	if dType == discoverapi.NodeDiscovery {
+	switch dType {
+	case discoverapi.NodeDiscovery:
 		nodeData, ok := data.(discoverapi.NodeDiscoveryData)
 		if !ok || nodeData.Address == "" {
 			return fmt.Errorf("invalid discovery data")
 		}
 		d.nodeJoin(nodeData.Address, nodeData.Self)
+	case discoverapi.DatastoreConfig:
+		var err error
+		if d.store != nil {
+			return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
+		}
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
+		}
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
+		if err != nil {
+			return types.InternalErrorf("failed to initialize data store: %v", err)
+		}
+	default:
 	}
 	return nil
 }

+ 13 - 4
libnetwork/drivers/overlay/overlay_test.go

@@ -8,6 +8,7 @@ import (
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	_ "github.com/docker/libnetwork/testutils"
+	"github.com/docker/libnetwork/types"
 )
 
 type driverTester struct {
@@ -23,8 +24,12 @@ func setupDriver(t *testing.T) *driverTester {
 		t.Fatal(err)
 	}
 
-	if err := dt.d.configure(); err != nil {
-		t.Fatal(err)
+	err := dt.d.configure()
+	if err == nil {
+		t.Fatalf("Failed to detect nil store")
+	}
+	if _, ok := err.(types.NoServiceError); !ok {
+		t.Fatalf("Unexpected error type: %v", err)
 	}
 
 	iface, err := net.InterfaceByName("eth0")
@@ -94,8 +99,12 @@ func TestOverlayNilConfig(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	if err := dt.d.configure(); err != nil {
-		t.Fatal(err)
+	err := dt.d.configure()
+	if err == nil {
+		t.Fatalf("Failed to detect nil store")
+	}
+	if _, ok := err.(types.NoServiceError); !ok {
+		t.Fatalf("Unexpected error type: %v", err)
 	}
 
 	cleanupDriver(t, dt)

+ 62 - 13
libnetwork/ipam/allocator.go

@@ -8,6 +8,7 @@ import (
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipamutils"
 	"github.com/docker/libnetwork/types"
@@ -60,19 +61,9 @@ func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) {
 		if aspc.ds == nil {
 			continue
 		}
-
-		a.addrSpaces[aspc.as] = &addrSpace{
-			subnets: map[SubnetKey]*PoolData{},
-			id:      dsConfigKey + "/" + aspc.as,
-			scope:   aspc.ds.Scope(),
-			ds:      aspc.ds,
-			alloc:   a,
-		}
+		a.initializeAddressSpace(aspc.as, aspc.ds)
 	}
 
-	a.checkConsistency(localAddressSpace)
-	a.checkConsistency(globalAddressSpace)
-
 	return a, nil
 }
 
@@ -118,25 +109,83 @@ func (a *Allocator) updateBitMasks(aSpace *addrSpace) error {
 	return nil
 }
 
-// Checks for and fixes damaged bitmask. Meant to be called in constructor only.
+// Checks for and fixes damaged bitmask.
 func (a *Allocator) checkConsistency(as string) {
+	var sKeyList []SubnetKey
+
 	// Retrieve this address space's configuration and bitmasks from the datastore
 	a.refresh(as)
+	a.Lock()
 	aSpace, ok := a.addrSpaces[as]
+	a.Unlock()
 	if !ok {
 		return
 	}
 	a.updateBitMasks(aSpace)
+
+	aSpace.Lock()
 	for sk, pd := range aSpace.subnets {
 		if pd.Range != nil {
 			continue
 		}
-		if err := a.addresses[sk].CheckConsistency(); err != nil {
+		sKeyList = append(sKeyList, sk)
+	}
+	aSpace.Unlock()
+
+	for _, sk := range sKeyList {
+		a.Lock()
+		bm := a.addresses[sk]
+		a.Unlock()
+		if err := bm.CheckConsistency(); err != nil {
 			log.Warnf("Error while running consistency check for %s: %v", sk, err)
 		}
 	}
 }
 
+func (a *Allocator) initializeAddressSpace(as string, ds datastore.DataStore) error {
+	a.Lock()
+	if _, ok := a.addrSpaces[as]; ok {
+		a.Unlock()
+		return types.ForbiddenErrorf("tried to add an axisting address space: %s", as)
+	}
+	a.addrSpaces[as] = &addrSpace{
+		subnets: map[SubnetKey]*PoolData{},
+		id:      dsConfigKey + "/" + as,
+		scope:   ds.Scope(),
+		ds:      ds,
+		alloc:   a,
+	}
+	a.Unlock()
+
+	a.checkConsistency(as)
+
+	return nil
+}
+
+// DiscoverNew informs the allocator about a new global scope datastore
+func (a *Allocator) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
+	if dType != discoverapi.DatastoreConfig {
+		return nil
+	}
+
+	dsc, ok := data.(discoverapi.DatastoreConfigData)
+	if !ok {
+		return types.InternalErrorf("incorrect data in datastore update notification: %v", data)
+	}
+
+	ds, err := datastore.NewDataStoreFromConfig(dsc)
+	if err != nil {
+		return err
+	}
+
+	return a.initializeAddressSpace(globalAddressSpace, ds)
+}
+
+// DiscoverDelete is a notification of no interest for the allocator
+func (a *Allocator) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}
+
 // GetDefaultAddressSpaces returns the local and global default address spaces
 func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) {
 	return localAddressSpace, globalAddressSpace, nil

+ 3 - 0
libnetwork/ipamapi/contract.go

@@ -4,6 +4,7 @@ package ipamapi
 import (
 	"net"
 
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/types"
 )
 
@@ -56,6 +57,8 @@ var (
 // Ipam represents the interface the IPAM service plugins must implement
 // in order to allow injection/modification of IPAM database.
 type Ipam interface {
+	discoverapi.Discover
+
 	// GetDefaultAddressSpaces returns the default local and global address spaces for this ipam
 	GetDefaultAddressSpaces() (string, string, error)
 	// RequestPool returns an address pool along with its unique id. Address space is a mandatory field

+ 11 - 0
libnetwork/ipams/remote/remote.go

@@ -6,6 +6,7 @@ import (
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/plugins"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipams/remote/api"
 	"github.com/docker/libnetwork/types"
@@ -124,3 +125,13 @@ func (a *allocator) ReleaseAddress(poolID string, address net.IP) error {
 	res := &api.ReleaseAddressResponse{}
 	return a.call("ReleaseAddress", req, res)
 }
+
+// DiscoverNew is a notification for a new discovery event, such as a new global datastore
+func (a *allocator) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}
+
+// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
+func (a *allocator) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}

+ 11 - 0
libnetwork/netlabel/labels.go

@@ -56,6 +56,9 @@ var (
 	// GlobalKVProviderConfig constant represents the KV provider Config
 	GlobalKVProviderConfig = MakeKVProviderConfig("global")
 
+	// GlobalKVClient constants represents the global kv store client
+	GlobalKVClient = MakeKVClient("global")
+
 	// LocalKVProvider constant represents the KV provider backend
 	LocalKVProvider = MakeKVProvider("local")
 
@@ -64,6 +67,9 @@ var (
 
 	// LocalKVProviderConfig constant represents the KV provider Config
 	LocalKVProviderConfig = MakeKVProviderConfig("local")
+
+	// LocalKVClient constants represents the local kv store client
+	LocalKVClient = MakeKVClient("local")
 )
 
 // MakeKVProvider returns the kvprovider label for the scope
@@ -81,6 +87,11 @@ func MakeKVProviderConfig(scope string) string {
 	return DriverPrivatePrefix + scope + "kv_provider_config"
 }
 
+// MakeKVClient returns the kv client label for the scope
+func MakeKVClient(scope string) string {
+	return DriverPrivatePrefix + scope + "kv_client"
+}
+
 // Key extracts the key portion of the label
 func Key(label string) (key string) {
 	if kv := strings.SplitN(label, "=", 2); len(kv) > 0 {