Browse Source

Merge pull request #908 from aboch/dds

Allow pass global datastore config after boot
Santhosh Manohar 9 years ago
parent
commit
f845a45b4f

+ 16 - 0
libnetwork/config/config.go

@@ -61,6 +61,22 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
 	return cfg, nil
 	return cfg, nil
 }
 }
 
 
+// ParseConfigOptions parses the configuration options and returns
+// a reference to the corresponding Config structure
+func ParseConfigOptions(cfgOptions ...Option) *Config {
+	cfg := &Config{
+		Daemon: DaemonCfg{
+			DriverCfg: make(map[string]interface{}),
+		},
+		Scopes: make(map[string]*datastore.ScopeCfg),
+	}
+
+	cfg.ProcessOptions(cfgOptions...)
+	cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
+
+	return cfg
+}
+
 // Option is an option setter function type used to pass various configurations
 // Option is an option setter function type used to pass various configurations
 // to the controller
 // to the controller
 type Option func(c *Config)
 type Option func(c *Config)

+ 103 - 17
libnetwork/controller.go

@@ -106,6 +106,9 @@ type NetworkController interface {
 
 
 	// Stop network controller
 	// Stop network controller
 	Stop()
 	Stop()
+
+	// ReloadCondfiguration updates the controller configuration
+	ReloadConfiguration(cfgOptions ...config.Option) error
 }
 }
 
 
 // NetworkWalker is a client provided function which will be used to walk the Networks.
 // NetworkWalker is a client provided function which will be used to walk the Networks.
@@ -129,7 +132,6 @@ type ipamData struct {
 }
 }
 
 
 type driverTable map[string]*driverData
 type driverTable map[string]*driverData
-
 type ipamTable map[string]*ipamData
 type ipamTable map[string]*ipamData
 type sandboxTable map[string]*sandbox
 type sandboxTable map[string]*sandbox
 
 
@@ -153,22 +155,9 @@ type controller struct {
 
 
 // New creates a new instance of network controller.
 // New creates a new instance of network controller.
 func New(cfgOptions ...config.Option) (NetworkController, error) {
 func New(cfgOptions ...config.Option) (NetworkController, error) {
-	var cfg *config.Config
-	cfg = &config.Config{
-		Daemon: config.DaemonCfg{
-			DriverCfg: make(map[string]interface{}),
-		},
-		Scopes: make(map[string]*datastore.ScopeCfg),
-	}
-
-	if len(cfgOptions) > 0 {
-		cfg.ProcessOptions(cfgOptions...)
-	}
-	cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
-
 	c := &controller{
 	c := &controller{
 		id:          stringid.GenerateRandomID(),
 		id:          stringid.GenerateRandomID(),
-		cfg:         cfg,
+		cfg:         config.ParseConfigOptions(cfgOptions...),
 		sandboxes:   sandboxTable{},
 		sandboxes:   sandboxTable{},
 		drivers:     driverTable{},
 		drivers:     driverTable{},
 		ipamDrivers: ipamTable{},
 		ipamDrivers: ipamTable{},
@@ -179,8 +168,8 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	if cfg != nil && cfg.Cluster.Watcher != nil {
-		if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
+	if c.cfg != nil && c.cfg.Cluster.Watcher != nil {
+		if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
 			// Failing to initalize discovery is a bad situation to be in.
 			// Failing to initalize discovery is a bad situation to be in.
 			// But it cannot fail creating the Controller
 			// But it cannot fail creating the Controller
 			log.Errorf("Failed to Initialize Discovery : %v", err)
 			log.Errorf("Failed to Initialize Discovery : %v", err)
@@ -206,6 +195,83 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
 	return c, nil
 	return c, nil
 }
 }
 
 
+var procReloadConfig = make(chan (bool), 1)
+
+func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
+	procReloadConfig <- true
+	defer func() { <-procReloadConfig }()
+
+	// For now we accept the configuration reload only as a mean to provide a global store config after boot.
+	// Refuse the configuration if it alters an existing datastore client configuration.
+	update := false
+	cfg := config.ParseConfigOptions(cfgOptions...)
+	for s := range c.cfg.Scopes {
+		if _, ok := cfg.Scopes[s]; !ok {
+			return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client")
+		}
+	}
+	for s, nSCfg := range cfg.Scopes {
+		if eSCfg, ok := c.cfg.Scopes[s]; ok {
+			if eSCfg.Client.Provider != nSCfg.Client.Provider ||
+				eSCfg.Client.Address != nSCfg.Client.Address {
+				return types.ForbiddenErrorf("cannot accept new configuration because it modifies an existing datastore client")
+			}
+		} else {
+			update = true
+		}
+	}
+	if !update {
+		return nil
+	}
+
+	c.Lock()
+	c.cfg = cfg
+	c.Unlock()
+
+	if err := c.initStores(); err != nil {
+		return err
+	}
+
+	if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
+		if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
+			log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
+		}
+	}
+
+	var dsConfig *discoverapi.DatastoreConfigData
+	for scope, sCfg := range cfg.Scopes {
+		if scope == datastore.LocalScope || !sCfg.IsValid() {
+			continue
+		}
+		dsConfig = &discoverapi.DatastoreConfigData{
+			Scope:    scope,
+			Provider: sCfg.Client.Provider,
+			Address:  sCfg.Client.Address,
+			Config:   sCfg.Client.Config,
+		}
+		break
+	}
+	if dsConfig == nil {
+		return nil
+	}
+
+	for nm, id := range c.getIpamDrivers() {
+		err := id.driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
+		if err != nil {
+			log.Errorf("Failed to set datastore in driver %s: %v", nm, err)
+		}
+	}
+
+	for nm, id := range c.getNetDrivers() {
+		err := id.driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
+		if err != nil {
+			log.Errorf("Failed to set datastore in driver %s: %v", nm, err)
+		}
+	}
+
+	return nil
+}
+
 func (c *controller) ID() string {
 func (c *controller) ID() string {
 	return c.id
 	return c.id
 }
 }
@@ -726,6 +792,26 @@ func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) {
 	return id.driver, nil
 	return id.driver, nil
 }
 }
 
 
+func (c *controller) getIpamDrivers() ipamTable {
+	c.Lock()
+	defer c.Unlock()
+	table := ipamTable{}
+	for i, d := range c.ipamDrivers {
+		table[i] = d
+	}
+	return table
+}
+
+func (c *controller) getNetDrivers() driverTable {
+	c.Lock()
+	defer c.Unlock()
+	table := driverTable{}
+	for i, d := range c.drivers {
+		table[i] = d
+	}
+	return table
+}
+
 func (c *controller) Stop() {
 func (c *controller) Stop() {
 	c.closeStores()
 	c.closeStores()
 	c.stopExternalKeyListener()
 	c.stopExternalKeyListener()

+ 29 - 0
libnetwork/datastore/datastore.go

@@ -13,6 +13,7 @@ import (
 	"github.com/docker/libkv/store/consul"
 	"github.com/docker/libkv/store/consul"
 	"github.com/docker/libkv/store/etcd"
 	"github.com/docker/libkv/store/etcd"
 	"github.com/docker/libkv/store/zookeeper"
 	"github.com/docker/libkv/store/zookeeper"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
 )
 )
 
 
@@ -253,6 +254,34 @@ func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
 	return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
 	return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
 }
 }
 
 
+// NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
+func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
+	var (
+		ok    bool
+		sCfgP *store.Config
+	)
+
+	sCfgP, ok = dsc.Config.(*store.Config)
+	if !ok && dsc.Config != nil {
+		return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
+	}
+
+	scopeCfg := &ScopeCfg{
+		Client: ScopeClientCfg{
+			Address:  dsc.Address,
+			Provider: dsc.Provider,
+			Config:   sCfgP,
+		},
+	}
+
+	ds, err := NewDataStore(dsc.Scope, scopeCfg)
+	if err != nil {
+		return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
+	}
+
+	return ds, err
+}
+
 func (ds *datastore) Close() {
 func (ds *datastore) Close() {
 	ds.store.Close()
 	ds.store.Close()
 }
 }

+ 5 - 4
libnetwork/discoverapi/discoverapi.go

@@ -16,8 +16,8 @@ type DiscoveryType int
 const (
 const (
 	// NodeDiscovery represents Node join/leave events provided by discovery
 	// NodeDiscovery represents Node join/leave events provided by discovery
 	NodeDiscovery = iota + 1
 	NodeDiscovery = iota + 1
-	// DatastoreUpdate represents a add/remove datastore event
-	DatastoreUpdate
+	// DatastoreConfig represents a add/remove datastore event
+	DatastoreConfig
 )
 )
 
 
 // NodeDiscoveryData represents the structure backing the node discovery data json string
 // NodeDiscoveryData represents the structure backing the node discovery data json string
@@ -26,8 +26,9 @@ type NodeDiscoveryData struct {
 	Self    bool
 	Self    bool
 }
 }
 
 
-// DatastoreUpdateData is the data for the datastore update event message
-type DatastoreUpdateData struct {
+// DatastoreConfigData is the data for the datastore update event message
+type DatastoreConfigData struct {
+	Scope    string
 	Provider string
 	Provider string
 	Address  string
 	Address  string
 	Config   interface{}
 	Config   interface{}

+ 7 - 4
libnetwork/drivers.go

@@ -3,6 +3,7 @@ package libnetwork
 import (
 import (
 	"strings"
 	"strings"
 
 
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/netlabel"
@@ -57,10 +58,12 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
 		if !v.IsValid() {
 		if !v.IsValid() {
 			continue
 			continue
 		}
 		}
-
-		config[netlabel.MakeKVProvider(k)] = v.Client.Provider
-		config[netlabel.MakeKVProviderURL(k)] = v.Client.Address
-		config[netlabel.MakeKVProviderConfig(k)] = v.Client.Config
+		config[netlabel.MakeKVClient(k)] = discoverapi.DatastoreConfigData{
+			Scope:    k,
+			Provider: v.Client.Provider,
+			Address:  v.Client.Address,
+			Config:   v.Client.Config,
+		}
 	}
 	}
 
 
 	return config
 	return config

+ 8 - 20
libnetwork/drivers/bridge/bridge_store.go

@@ -6,9 +6,9 @@ import (
 	"net"
 	"net"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/store/boltdb"
 	"github.com/docker/libkv/store/boltdb"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
 )
 )
@@ -16,27 +16,15 @@ import (
 const bridgePrefix = "bridge"
 const bridgePrefix = "bridge"
 
 
 func (d *driver) initStore(option map[string]interface{}) error {
 func (d *driver) initStore(option map[string]interface{}) error {
-	var err error
-
-	provider, provOk := option[netlabel.LocalKVProvider]
-	provURL, urlOk := option[netlabel.LocalKVProviderURL]
-
-	if provOk && urlOk {
-		cfg := &datastore.ScopeCfg{
-			Client: datastore.ScopeClientCfg{
-				Provider: provider.(string),
-				Address:  provURL.(string),
-			},
+	if data, ok := option[netlabel.LocalKVClient]; ok {
+		var err error
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
 		}
 		}
-
-		provConfig, confOk := option[netlabel.LocalKVProviderConfig]
-		if confOk {
-			cfg.Client.Config = provConfig.(*store.Config)
-		}
-
-		d.store, err = datastore.NewDataStore(datastore.LocalScope, cfg)
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
 		if err != nil {
 		if err != nil {
-			return fmt.Errorf("bridge driver failed to initialize data store: %v", err)
+			return types.InternalErrorf("bridge driver failed to initialize data store: %v", err)
 		}
 		}
 
 
 		return d.populateNetworks()
 		return d.populateNetworks()

+ 52 - 32
libnetwork/drivers/overlay/overlay.go

@@ -6,12 +6,12 @@ import (
 	"sync"
 	"sync"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/idm"
 	"github.com/docker/libnetwork/idm"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/netlabel"
+	"github.com/docker/libnetwork/types"
 	"github.com/hashicorp/serf/serf"
 	"github.com/hashicorp/serf/serf"
 )
 )
 
 
@@ -25,6 +25,8 @@ const (
 	vxlanVethMTU = 1450
 	vxlanVethMTU = 1450
 )
 )
 
 
+var initVxlanIdm = make(chan (bool), 1)
+
 type driver struct {
 type driver struct {
 	eventCh      chan serf.Event
 	eventCh      chan serf.Event
 	notifyCh     chan ovNotify
 	notifyCh     chan ovNotify
@@ -56,6 +58,18 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
 		config: config,
 		config: config,
 	}
 	}
 
 
+	if data, ok := config[netlabel.GlobalKVClient]; ok {
+		var err error
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
+		}
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
+		if err != nil {
+			return types.InternalErrorf("failed to initialize data store: %v", err)
+		}
+	}
+
 	return dc.RegisterDriver(networkType, d, c)
 	return dc.RegisterDriver(networkType, d, c)
 }
 }
 
 
@@ -73,42 +87,33 @@ func Fini(drv driverapi.Driver) {
 }
 }
 
 
 func (d *driver) configure() error {
 func (d *driver) configure() error {
-	var err error
+	if d.store == nil {
+		return types.NoServiceErrorf("datastore is not available")
+	}
 
 
-	if len(d.config) == 0 {
-		return nil
+	if d.vxlanIdm == nil {
+		return d.initializeVxlanIdm()
 	}
 	}
 
 
-	d.once.Do(func() {
-		provider, provOk := d.config[netlabel.GlobalKVProvider]
-		provURL, urlOk := d.config[netlabel.GlobalKVProviderURL]
+	return nil
+}
 
 
-		if provOk && urlOk {
-			cfg := &datastore.ScopeCfg{
-				Client: datastore.ScopeClientCfg{
-					Provider: provider.(string),
-					Address:  provURL.(string),
-				},
-			}
-			provConfig, confOk := d.config[netlabel.GlobalKVProviderConfig]
-			if confOk {
-				cfg.Client.Config = provConfig.(*store.Config)
-			}
-			d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg)
-			if err != nil {
-				err = fmt.Errorf("failed to initialize data store: %v", err)
-				return
-			}
-		}
+func (d *driver) initializeVxlanIdm() error {
+	var err error
 
 
-		d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
-		if err != nil {
-			err = fmt.Errorf("failed to initialize vxlan id manager: %v", err)
-			return
-		}
-	})
+	initVxlanIdm <- true
+	defer func() { <-initVxlanIdm }()
+
+	if d.vxlanIdm != nil {
+		return nil
+	}
+
+	d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
+	if err != nil {
+		return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
+	}
 
 
-	return err
+	return nil
 }
 }
 
 
 func (d *driver) Type() string {
 func (d *driver) Type() string {
@@ -187,12 +192,27 @@ func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
 
 
 // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
 // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
 func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
 func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
-	if dType == discoverapi.NodeDiscovery {
+	switch dType {
+	case discoverapi.NodeDiscovery:
 		nodeData, ok := data.(discoverapi.NodeDiscoveryData)
 		nodeData, ok := data.(discoverapi.NodeDiscoveryData)
 		if !ok || nodeData.Address == "" {
 		if !ok || nodeData.Address == "" {
 			return fmt.Errorf("invalid discovery data")
 			return fmt.Errorf("invalid discovery data")
 		}
 		}
 		d.nodeJoin(nodeData.Address, nodeData.Self)
 		d.nodeJoin(nodeData.Address, nodeData.Self)
+	case discoverapi.DatastoreConfig:
+		var err error
+		if d.store != nil {
+			return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
+		}
+		dsc, ok := data.(discoverapi.DatastoreConfigData)
+		if !ok {
+			return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
+		}
+		d.store, err = datastore.NewDataStoreFromConfig(dsc)
+		if err != nil {
+			return types.InternalErrorf("failed to initialize data store: %v", err)
+		}
+	default:
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 13 - 4
libnetwork/drivers/overlay/overlay_test.go

@@ -8,6 +8,7 @@ import (
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/driverapi"
 	_ "github.com/docker/libnetwork/testutils"
 	_ "github.com/docker/libnetwork/testutils"
+	"github.com/docker/libnetwork/types"
 )
 )
 
 
 type driverTester struct {
 type driverTester struct {
@@ -23,8 +24,12 @@ func setupDriver(t *testing.T) *driverTester {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	if err := dt.d.configure(); err != nil {
-		t.Fatal(err)
+	err := dt.d.configure()
+	if err == nil {
+		t.Fatalf("Failed to detect nil store")
+	}
+	if _, ok := err.(types.NoServiceError); !ok {
+		t.Fatalf("Unexpected error type: %v", err)
 	}
 	}
 
 
 	iface, err := net.InterfaceByName("eth0")
 	iface, err := net.InterfaceByName("eth0")
@@ -94,8 +99,12 @@ func TestOverlayNilConfig(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	if err := dt.d.configure(); err != nil {
-		t.Fatal(err)
+	err := dt.d.configure()
+	if err == nil {
+		t.Fatalf("Failed to detect nil store")
+	}
+	if _, ok := err.(types.NoServiceError); !ok {
+		t.Fatalf("Unexpected error type: %v", err)
 	}
 	}
 
 
 	cleanupDriver(t, dt)
 	cleanupDriver(t, dt)

+ 62 - 13
libnetwork/ipam/allocator.go

@@ -8,6 +8,7 @@ import (
 	log "github.com/Sirupsen/logrus"
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipamutils"
 	"github.com/docker/libnetwork/ipamutils"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
@@ -60,19 +61,9 @@ func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) {
 		if aspc.ds == nil {
 		if aspc.ds == nil {
 			continue
 			continue
 		}
 		}
-
-		a.addrSpaces[aspc.as] = &addrSpace{
-			subnets: map[SubnetKey]*PoolData{},
-			id:      dsConfigKey + "/" + aspc.as,
-			scope:   aspc.ds.Scope(),
-			ds:      aspc.ds,
-			alloc:   a,
-		}
+		a.initializeAddressSpace(aspc.as, aspc.ds)
 	}
 	}
 
 
-	a.checkConsistency(localAddressSpace)
-	a.checkConsistency(globalAddressSpace)
-
 	return a, nil
 	return a, nil
 }
 }
 
 
@@ -118,25 +109,83 @@ func (a *Allocator) updateBitMasks(aSpace *addrSpace) error {
 	return nil
 	return nil
 }
 }
 
 
-// Checks for and fixes damaged bitmask. Meant to be called in constructor only.
+// Checks for and fixes damaged bitmask.
 func (a *Allocator) checkConsistency(as string) {
 func (a *Allocator) checkConsistency(as string) {
+	var sKeyList []SubnetKey
+
 	// Retrieve this address space's configuration and bitmasks from the datastore
 	// Retrieve this address space's configuration and bitmasks from the datastore
 	a.refresh(as)
 	a.refresh(as)
+	a.Lock()
 	aSpace, ok := a.addrSpaces[as]
 	aSpace, ok := a.addrSpaces[as]
+	a.Unlock()
 	if !ok {
 	if !ok {
 		return
 		return
 	}
 	}
 	a.updateBitMasks(aSpace)
 	a.updateBitMasks(aSpace)
+
+	aSpace.Lock()
 	for sk, pd := range aSpace.subnets {
 	for sk, pd := range aSpace.subnets {
 		if pd.Range != nil {
 		if pd.Range != nil {
 			continue
 			continue
 		}
 		}
-		if err := a.addresses[sk].CheckConsistency(); err != nil {
+		sKeyList = append(sKeyList, sk)
+	}
+	aSpace.Unlock()
+
+	for _, sk := range sKeyList {
+		a.Lock()
+		bm := a.addresses[sk]
+		a.Unlock()
+		if err := bm.CheckConsistency(); err != nil {
 			log.Warnf("Error while running consistency check for %s: %v", sk, err)
 			log.Warnf("Error while running consistency check for %s: %v", sk, err)
 		}
 		}
 	}
 	}
 }
 }
 
 
+func (a *Allocator) initializeAddressSpace(as string, ds datastore.DataStore) error {
+	a.Lock()
+	if _, ok := a.addrSpaces[as]; ok {
+		a.Unlock()
+		return types.ForbiddenErrorf("tried to add an axisting address space: %s", as)
+	}
+	a.addrSpaces[as] = &addrSpace{
+		subnets: map[SubnetKey]*PoolData{},
+		id:      dsConfigKey + "/" + as,
+		scope:   ds.Scope(),
+		ds:      ds,
+		alloc:   a,
+	}
+	a.Unlock()
+
+	a.checkConsistency(as)
+
+	return nil
+}
+
+// DiscoverNew informs the allocator about a new global scope datastore
+func (a *Allocator) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
+	if dType != discoverapi.DatastoreConfig {
+		return nil
+	}
+
+	dsc, ok := data.(discoverapi.DatastoreConfigData)
+	if !ok {
+		return types.InternalErrorf("incorrect data in datastore update notification: %v", data)
+	}
+
+	ds, err := datastore.NewDataStoreFromConfig(dsc)
+	if err != nil {
+		return err
+	}
+
+	return a.initializeAddressSpace(globalAddressSpace, ds)
+}
+
+// DiscoverDelete is a notification of no interest for the allocator
+func (a *Allocator) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}
+
 // GetDefaultAddressSpaces returns the local and global default address spaces
 // GetDefaultAddressSpaces returns the local and global default address spaces
 func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) {
 func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) {
 	return localAddressSpace, globalAddressSpace, nil
 	return localAddressSpace, globalAddressSpace, nil

+ 3 - 0
libnetwork/ipamapi/contract.go

@@ -4,6 +4,7 @@ package ipamapi
 import (
 import (
 	"net"
 	"net"
 
 
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
 )
 )
 
 
@@ -56,6 +57,8 @@ var (
 // Ipam represents the interface the IPAM service plugins must implement
 // Ipam represents the interface the IPAM service plugins must implement
 // in order to allow injection/modification of IPAM database.
 // in order to allow injection/modification of IPAM database.
 type Ipam interface {
 type Ipam interface {
+	discoverapi.Discover
+
 	// GetDefaultAddressSpaces returns the default local and global address spaces for this ipam
 	// GetDefaultAddressSpaces returns the default local and global address spaces for this ipam
 	GetDefaultAddressSpaces() (string, string, error)
 	GetDefaultAddressSpaces() (string, string, error)
 	// RequestPool returns an address pool along with its unique id. Address space is a mandatory field
 	// RequestPool returns an address pool along with its unique id. Address space is a mandatory field

+ 11 - 0
libnetwork/ipams/remote/remote.go

@@ -6,6 +6,7 @@ import (
 
 
 	log "github.com/Sirupsen/logrus"
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/plugins"
 	"github.com/docker/docker/pkg/plugins"
+	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/ipams/remote/api"
 	"github.com/docker/libnetwork/ipams/remote/api"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
@@ -124,3 +125,13 @@ func (a *allocator) ReleaseAddress(poolID string, address net.IP) error {
 	res := &api.ReleaseAddressResponse{}
 	res := &api.ReleaseAddressResponse{}
 	return a.call("ReleaseAddress", req, res)
 	return a.call("ReleaseAddress", req, res)
 }
 }
+
+// DiscoverNew is a notification for a new discovery event, such as a new global datastore
+func (a *allocator) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}
+
+// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
+func (a *allocator) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
+	return nil
+}

+ 11 - 0
libnetwork/netlabel/labels.go

@@ -56,6 +56,9 @@ var (
 	// GlobalKVProviderConfig constant represents the KV provider Config
 	// GlobalKVProviderConfig constant represents the KV provider Config
 	GlobalKVProviderConfig = MakeKVProviderConfig("global")
 	GlobalKVProviderConfig = MakeKVProviderConfig("global")
 
 
+	// GlobalKVClient constants represents the global kv store client
+	GlobalKVClient = MakeKVClient("global")
+
 	// LocalKVProvider constant represents the KV provider backend
 	// LocalKVProvider constant represents the KV provider backend
 	LocalKVProvider = MakeKVProvider("local")
 	LocalKVProvider = MakeKVProvider("local")
 
 
@@ -64,6 +67,9 @@ var (
 
 
 	// LocalKVProviderConfig constant represents the KV provider Config
 	// LocalKVProviderConfig constant represents the KV provider Config
 	LocalKVProviderConfig = MakeKVProviderConfig("local")
 	LocalKVProviderConfig = MakeKVProviderConfig("local")
+
+	// LocalKVClient constants represents the local kv store client
+	LocalKVClient = MakeKVClient("local")
 )
 )
 
 
 // MakeKVProvider returns the kvprovider label for the scope
 // MakeKVProvider returns the kvprovider label for the scope
@@ -81,6 +87,11 @@ func MakeKVProviderConfig(scope string) string {
 	return DriverPrivatePrefix + scope + "kv_provider_config"
 	return DriverPrivatePrefix + scope + "kv_provider_config"
 }
 }
 
 
+// MakeKVClient returns the kv client label for the scope
+func MakeKVClient(scope string) string {
+	return DriverPrivatePrefix + scope + "kv_client"
+}
+
 // Key extracts the key portion of the label
 // Key extracts the key portion of the label
 func Key(label string) (key string) {
 func Key(label string) (key string) {
 	if kv := strings.SplitN(label, "=", 2); len(kv) > 0 {
 	if kv := strings.SplitN(label, "=", 2); len(kv) > 0 {

+ 4 - 0
libnetwork/store.go

@@ -14,6 +14,7 @@ func (c *controller) initStores() error {
 		return nil
 		return nil
 	}
 	}
 	scopeConfigs := c.cfg.Scopes
 	scopeConfigs := c.cfg.Scopes
+	c.stores = nil
 	c.Unlock()
 	c.Unlock()
 
 
 	for scope, scfg := range scopeConfigs {
 	for scope, scfg := range scopeConfigs {
@@ -418,6 +419,9 @@ func (c *controller) watchLoop() {
 }
 }
 
 
 func (c *controller) startWatch() {
 func (c *controller) startWatch() {
+	if c.watchCh != nil {
+		return
+	}
 	c.watchCh = make(chan *endpoint)
 	c.watchCh = make(chan *endpoint)
 	c.unWatchCh = make(chan *endpoint)
 	c.unWatchCh = make(chan *endpoint)
 	c.nmap = make(map[string]*netWatch)
 	c.nmap = make(map[string]*netWatch)