ソースを参照

IPAM watch removal and multistore support

Remove the need for watching for IPAM data
structures and add multi store support code and
data reorganization to simplify address space
management.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 年 前
コミット
a13f78369f

+ 6 - 3
libnetwork/bitseq/sequence.go

@@ -57,9 +57,6 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
 		return h, nil
 	}
 
-	// Register for status changes
-	h.watchForChanges()
-
 	// Get the initial status from the ds if present.
 	if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
 		return nil, err
@@ -252,6 +249,12 @@ func (h *Handle) set(ordinal, start, end uint32, any bool, release bool) (uint32
 	)
 
 	for {
+		if h.store != nil {
+			if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
+				return ret, err
+			}
+		}
+
 		h.Lock()
 		// Get position if available
 		if release {

+ 33 - 32
libnetwork/bitseq/store.go

@@ -70,46 +70,47 @@ func (h *Handle) Exists() bool {
 	return h.dbExists
 }
 
+// New method returns a handle based on the receiver handle
+func (h *Handle) New() datastore.KVObject {
+	h.Lock()
+	defer h.Unlock()
+
+	return &Handle{
+		app:   h.app,
+		id:    h.id,
+		store: h.store,
+	}
+}
+
+// CopyTo deep copies the handle into the passed destination object
+func (h *Handle) CopyTo(o datastore.KVObject) error {
+	h.Lock()
+	defer h.Unlock()
+
+	dstH := o.(*Handle)
+	dstH.bits = h.bits
+	dstH.unselected = h.unselected
+	dstH.head = h.head.getCopy()
+	dstH.app = h.app
+	dstH.id = h.id
+	dstH.dbIndex = h.dbIndex
+	dstH.dbExists = h.dbExists
+	dstH.store = h.store
+
+	return nil
+}
+
 // Skip provides a way for a KV Object to avoid persisting it in the KV Store
 func (h *Handle) Skip() bool {
 	return false
 }
 
 // DataScope method returns the storage scope of the datastore
-func (h *Handle) DataScope() datastore.DataScope {
-	return datastore.GlobalScope
-}
-
-func (h *Handle) watchForChanges() error {
+func (h *Handle) DataScope() string {
 	h.Lock()
-	store := h.store
-	h.Unlock()
-
-	if store == nil {
-		return nil
-	}
+	defer h.Unlock()
 
-	kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil)
-	if err != nil {
-		return err
-	}
-	go func() {
-		for {
-			select {
-			case kvPair := <-kvpChan:
-				// Only process remote update
-				if kvPair != nil && (kvPair.LastIndex != h.Index()) {
-					err := h.fromDsValue(kvPair.Value)
-					if err != nil {
-						log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error())
-					} else {
-						h.SetIndex(kvPair.LastIndex)
-					}
-				}
-			}
-		}
-	}()
-	return nil
+	return h.store.Scope()
 }
 
 func (h *Handle) fromDsValue(value []byte) error {

+ 130 - 104
libnetwork/ipam/allocator.go

@@ -6,7 +6,6 @@ import (
 	"sync"
 
 	log "github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/ipamapi"
@@ -30,13 +29,10 @@ const (
 type Allocator struct {
 	// Predefined pools for default address spaces
 	predefined map[string][]*net.IPNet
-	// Static subnet information
-	localSubnets  *PoolsConfig
-	globalSubnets *PoolsConfig
+	addrSpaces map[string]*addrSpace
+	// stores        []datastore.Datastore
 	// Allocated addresses in each address space's subnet
 	addresses map[SubnetKey]*bitseq.Handle
-	// Datastore
-	addrSpace2Configs map[string]*PoolsConfig
 	sync.Mutex
 }
 
@@ -44,71 +40,84 @@ type Allocator struct {
 func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) {
 	a := &Allocator{}
 
-	a.localSubnets = &PoolsConfig{
-		subnets: map[SubnetKey]*PoolData{},
-		id:      dsConfigKey + "/Pools",
-		scope:   datastore.LocalScope,
-		ds:      lcDs,
-		alloc:   a,
-	}
-
-	a.globalSubnets = &PoolsConfig{
-		subnets: map[SubnetKey]*PoolData{},
-		id:      dsConfigKey + "/Pools",
-		scope:   datastore.GlobalScope,
-		ds:      glDs,
-		alloc:   a,
-	}
-
+	// Load predefined subnet pools
 	a.predefined = map[string][]*net.IPNet{
 		localAddressSpace:  initLocalPredefinedPools(),
 		globalAddressSpace: initGlobalPredefinedPools(),
 	}
 
-	a.addrSpace2Configs = map[string]*PoolsConfig{
-		localAddressSpace:  a.localSubnets,
-		globalAddressSpace: a.globalSubnets,
-	}
-
+	// Initialize bitseq map
 	a.addresses = make(map[SubnetKey]*bitseq.Handle)
 
-	cfgs := []struct {
-		cfg *PoolsConfig
-		dsc string
+	// Initialize address spaces
+	a.addrSpaces = make(map[string]*addrSpace)
+	for _, aspc := range []struct {
+		as string
+		ds datastore.DataStore
 	}{
-		{a.localSubnets, "local"},
-		{a.globalSubnets, "global"},
-	}
-	// Get the initial local/global pools configfrom the datastores
-	var inserterList []func() error
-	for _, e := range cfgs {
-		if e.cfg.ds == nil {
+		{localAddressSpace, lcDs},
+		{globalAddressSpace, glDs},
+	} {
+		if aspc.ds == nil {
 			continue
 		}
-		if err := e.cfg.watchForChanges(); err != nil {
-			log.Warnf("Error on registering watch for %s datastore: %v", e.dsc, err)
-		}
-		if err := e.cfg.readFromStore(); err != nil && err != store.ErrKeyNotFound {
-			return nil, fmt.Errorf("failed to retrieve the ipam %s pools config from datastore: %v", e.dsc, err)
+
+		a.addrSpaces[aspc.as] = &addrSpace{
+			subnets: map[SubnetKey]*PoolData{},
+			id:      dsConfigKey + "/" + aspc.as,
+			scope:   aspc.ds.Scope(),
+			ds:      aspc.ds,
+			alloc:   a,
 		}
-		e.cfg.Lock()
-		for k, v := range e.cfg.subnets {
-			if v.Range == nil {
-				inserterList = append(inserterList, func() error { return a.insertBitMask(e.cfg.ds, k, v.Pool) })
-			}
+	}
+
+	return a, nil
+}
+
+func (a *Allocator) refresh(as string) error {
+	aSpace, err := a.getAddressSpaceFromStore(as)
+	if err != nil {
+		return fmt.Errorf("error getting pools config from store during init: %v",
+			err)
+	}
+
+	if aSpace == nil {
+		return nil
+	}
+
+	if err := a.updateBitMasks(aSpace); err != nil {
+		return fmt.Errorf("error updating bit masks during init: %v", err)
+	}
+
+	a.Lock()
+	a.addrSpaces[as] = aSpace
+	a.Unlock()
+
+	return nil
+}
+
+func (a *Allocator) updateBitMasks(aSpace *addrSpace) error {
+	var inserterList []func() error
+
+	aSpace.Lock()
+	for k, v := range aSpace.subnets {
+		if v.Range == nil {
+			inserterList = append(inserterList,
+				func() error { return a.insertBitMask(k, v.Pool) })
 		}
-		e.cfg.Unlock()
 	}
+	aSpace.Unlock()
+
 	// Add the bitmasks (data could come from datastore)
 	if inserterList != nil {
 		for _, f := range inserterList {
 			if err := f(); err != nil {
-				return nil, err
+				return err
 			}
 		}
 	}
 
-	return a, nil
+	return nil
 }
 
 // GetDefaultAddressSpaces returns the local and global default address spaces
@@ -123,25 +132,29 @@ func (a *Allocator) RequestPool(addressSpace, pool, subPool string, options map[
 		return "", nil, nil, ipamapi.ErrInvalidPool
 	}
 
-	cfg, err := a.getPoolsConfig(addressSpace)
+retry:
+	if err := a.refresh(addressSpace); err != nil {
+		return "", nil, nil, err
+	}
+
+	aSpace, err := a.getAddrSpace(addressSpace)
 	if err != nil {
 		return "", nil, nil, err
 	}
 
-retry:
-	insert, err := cfg.updatePoolDBOnAdd(*k, nw, ipr)
+	insert, err := aSpace.updatePoolDBOnAdd(*k, nw, ipr)
 	if err != nil {
 		return "", nil, nil, err
 	}
-	if err := cfg.writeToStore(); err != nil {
+
+	if err := a.writeToStore(aSpace); err != nil {
 		if _, ok := err.(types.RetryError); !ok {
 			return "", nil, nil, types.InternalErrorf("pool configuration failed because of %s", err.Error())
 		}
-		if erru := cfg.readFromStore(); erru != nil {
-			return "", nil, nil, fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err)
-		}
+
 		goto retry
 	}
+
 	return k.String(), aw, nil, insert()
 }
 
@@ -152,23 +165,25 @@ func (a *Allocator) ReleasePool(poolID string) error {
 		return types.BadRequestErrorf("invalid pool id: %s", poolID)
 	}
 
-	cfg, err := a.getPoolsConfig(k.AddressSpace)
+retry:
+	if err := a.refresh(k.AddressSpace); err != nil {
+		return err
+	}
+
+	aSpace, err := a.getAddrSpace(k.AddressSpace)
 	if err != nil {
 		return err
 	}
 
-retry:
-	remove, err := cfg.updatePoolDBOnRemoval(k)
+	remove, err := aSpace.updatePoolDBOnRemoval(k)
 	if err != nil {
 		return err
 	}
-	if err = cfg.writeToStore(); err != nil {
+
+	if err = a.writeToStore(aSpace); err != nil {
 		if _, ok := err.(types.RetryError); !ok {
 			return types.InternalErrorf("pool (%s) removal failed because of %v", poolID, err)
 		}
-		if erru := cfg.readFromStore(); erru != nil {
-			return fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err)
-		}
 		goto retry
 	}
 
@@ -177,14 +192,14 @@ retry:
 
 // Given the address space, returns the local or global PoolConfig based on the
 // address space is local or global. AddressSpace locality is being registered with IPAM out of band.
-func (a *Allocator) getPoolsConfig(addrSpace string) (*PoolsConfig, error) {
+func (a *Allocator) getAddrSpace(as string) (*addrSpace, error) {
 	a.Lock()
 	defer a.Unlock()
-	cfg, ok := a.addrSpace2Configs[addrSpace]
+	aSpace, ok := a.addrSpaces[as]
 	if !ok {
-		return nil, types.BadRequestErrorf("cannot find locality of address space: %s", addrSpace)
+		return nil, types.BadRequestErrorf("cannot find locality of address space: %s", as)
 	}
-	return cfg, nil
+	return aSpace, nil
 }
 
 func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool) (*SubnetKey, *net.IPNet, *net.IPNet, *AddressRange, error) {
@@ -224,8 +239,14 @@ func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool
 	return &SubnetKey{AddressSpace: addressSpace, Subnet: nw.String(), ChildSubnet: subPool}, nw, aw, ipr, nil
 }
 
-func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool *net.IPNet) error {
+func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
 	log.Debugf("Inserting bitmask (%s, %s)", key.String(), pool.String())
+
+	store := a.getStore(key.AddressSpace)
+	if store == nil {
+		return fmt.Errorf("could not find store for address space %s while inserting bit mask", key.AddressSpace)
+	}
+
 	ipVer := getAddressVersion(pool.IP)
 	ones, bits := pool.Mask.Size()
 	numAddresses := uint32(1 << uint(bits-ones))
@@ -252,13 +273,13 @@ func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool
 	return nil
 }
 
-func (a *Allocator) retrieveBitmask(ds datastore.DataStore, k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) {
+func (a *Allocator) retrieveBitmask(k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) {
 	a.Lock()
 	bm, ok := a.addresses[k]
 	a.Unlock()
 	if !ok {
 		log.Debugf("Retrieving bitmask (%s, %s)", k.String(), n.String())
-		if err := a.insertBitMask(ds, k, n); err != nil {
+		if err := a.insertBitMask(k, n); err != nil {
 			return nil, fmt.Errorf("could not find bitmask in datastore for %s", k.String())
 		}
 		a.Lock()
@@ -289,7 +310,7 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error)
 		return nil, fmt.Errorf("no default pool availbale for non-default addresss spaces")
 	}
 
-	cfg, err := a.getPoolsConfig(as)
+	aSpace, err := a.getAddrSpace(as)
 	if err != nil {
 		return nil, err
 	}
@@ -298,14 +319,14 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error)
 		if v != getAddressVersion(nw.IP) {
 			continue
 		}
-		cfg.Lock()
-		_, ok := cfg.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}]
-		cfg.Unlock()
+		aSpace.Lock()
+		_, ok := aSpace.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}]
+		aSpace.Unlock()
 		if ok {
 			continue
 		}
 
-		if !cfg.contains(as, nw) {
+		if !aSpace.contains(as, nw) {
 			if as == localAddressSpace {
 				if err := netutils.CheckRouteOverlaps(nw); err == nil {
 					return nw, nil
@@ -326,31 +347,35 @@ func (a *Allocator) RequestAddress(poolID string, prefAddress net.IP, opts map[s
 		return nil, nil, types.BadRequestErrorf("invalid pool id: %s", poolID)
 	}
 
-	cfg, err := a.getPoolsConfig(k.AddressSpace)
+	if err := a.refresh(k.AddressSpace); err != nil {
+		return nil, nil, err
+	}
+
+	aSpace, err := a.getAddrSpace(k.AddressSpace)
 	if err != nil {
 		return nil, nil, err
 	}
 
-	cfg.Lock()
-	p, ok := cfg.subnets[k]
+	aSpace.Lock()
+	p, ok := aSpace.subnets[k]
 	if !ok {
-		cfg.Unlock()
+		aSpace.Unlock()
 		return nil, nil, types.NotFoundErrorf("cannot find address pool for poolID:%s", poolID)
 	}
 
 	if prefAddress != nil && !p.Pool.Contains(prefAddress) {
-		cfg.Unlock()
+		aSpace.Unlock()
 		return nil, nil, ipamapi.ErrIPOutOfRange
 	}
 
 	c := p
 	for c.Range != nil {
 		k = c.ParentKey
-		c, ok = cfg.subnets[k]
+		c, ok = aSpace.subnets[k]
 	}
-	cfg.Unlock()
+	aSpace.Unlock()
 
-	bm, err := a.retrieveBitmask(cfg.ds, k, c.Pool)
+	bm, err := a.retrieveBitmask(k, c.Pool)
 	if err != nil {
 		return nil, nil, fmt.Errorf("could not find bitmask in datastore for %s on address %v request from pool %s: %v",
 			k.String(), prefAddress, poolID, err)
@@ -370,29 +395,33 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error {
 		return types.BadRequestErrorf("invalid pool id: %s", poolID)
 	}
 
-	cfg, err := a.getPoolsConfig(k.AddressSpace)
+	if err := a.refresh(k.AddressSpace); err != nil {
+		return err
+	}
+
+	aSpace, err := a.getAddrSpace(k.AddressSpace)
 	if err != nil {
 		return err
 	}
 
-	cfg.Lock()
-	p, ok := cfg.subnets[k]
+	aSpace.Lock()
+	p, ok := aSpace.subnets[k]
 	if !ok {
-		cfg.Unlock()
+		aSpace.Unlock()
 		return ipamapi.ErrBadPool
 	}
 
 	if address == nil || !p.Pool.Contains(address) {
-		cfg.Unlock()
+		aSpace.Unlock()
 		return ipamapi.ErrInvalidRequest
 	}
 
 	c := p
 	for c.Range != nil {
 		k = c.ParentKey
-		c = cfg.subnets[k]
+		c = aSpace.subnets[k]
 	}
-	cfg.Unlock()
+	aSpace.Unlock()
 
 	mask := p.Pool.Mask
 	if p.Range != nil {
@@ -403,7 +432,7 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error {
 		return fmt.Errorf("failed to release address %s: %v", address.String(), err)
 	}
 
-	bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool)
+	bm, err := a.retrieveBitmask(k, c.Pool)
 	if err != nil {
 		return fmt.Errorf("could not find bitmask in datastore for %s on address %v release from pool %s: %v",
 			k.String(), address, poolID, err)
@@ -449,23 +478,20 @@ func (a *Allocator) DumpDatabase() string {
 	a.Lock()
 	defer a.Unlock()
 
-	s := fmt.Sprintf("\n\nLocal Pool Config")
-	a.localSubnets.Lock()
-	for k, config := range a.localSubnets.subnets {
-		s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
-	}
-	a.localSubnets.Unlock()
-
-	s = fmt.Sprintf("%s\n\nGlobal Pool Config", s)
-	a.globalSubnets.Lock()
-	for k, config := range a.globalSubnets.subnets {
-		s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
+	var s string
+	for as, aSpace := range a.addrSpaces {
+		s = fmt.Sprintf("\n\n%s Config", as)
+		aSpace.Lock()
+		for k, config := range aSpace.subnets {
+			s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
+		}
+		aSpace.Unlock()
 	}
-	a.globalSubnets.Unlock()
 
 	s = fmt.Sprintf("%s\n\nBitmasks", s)
 	for k, bm := range a.addresses {
 		s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n\t%s: %s\n\t%d", k, bm, bm.Unselected()))
 	}
+
 	return s
 }

+ 114 - 28
libnetwork/ipam/allocator_test.go

@@ -11,7 +11,6 @@ import (
 
 	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/bitseq"
-	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/ipamapi"
 	"github.com/docker/libnetwork/netutils"
@@ -32,9 +31,9 @@ func randomLocalStore() (datastore.DataStore, error) {
 	if err := tmp.Close(); err != nil {
 		return nil, fmt.Errorf("Error closing temp file: %v", err)
 	}
-	return datastore.NewDataStore(&config.DatastoreCfg{
+	return datastore.NewDataStore(datastore.LocalScope, &datastore.ScopeCfg{
 		Embedded: true,
-		Client: config.DatastoreClientCfg{
+		Client: datastore.ScopeClientCfg{
 			Provider: "boltdb",
 			Address:  defaultPrefix + tmp.Name(),
 			Config: &store.Config{
@@ -191,7 +190,11 @@ func TestSubnetsMarshal(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	cfg := a.localSubnets
+	cfg, err := a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	ba := cfg.Value()
 	if err := cfg.SetValue(ba); err != nil {
 		t.Fatal(err)
@@ -221,7 +224,7 @@ func TestAddSubnets(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	a.addrSpace2Configs["abc"] = a.addrSpace2Configs[localAddressSpace]
+	a.addrSpaces["abc"] = a.addrSpaces[localAddressSpace]
 
 	pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false)
 	if err != nil {
@@ -290,7 +293,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	subnets := a.localSubnets.subnets
+
+	aSpace, err := a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets := aSpace.subnets
 	pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false)
 	if err != nil {
 		t.Fatalf("Unexpected failure in adding pool")
@@ -298,6 +307,14 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := k0.FromString(pid0); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
+
 	if subnets[k0].RefCount != 1 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
@@ -309,6 +326,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := k1.FromString(pid1); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k1].RefCount != 1 {
 		t.Fatalf("Unexpected ref count for %s: %d", k1, subnets[k1].RefCount)
 	}
@@ -323,6 +347,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := k2.FromString(pid2); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k2].RefCount != 2 {
 		t.Fatalf("Unexpected ref count for %s: %d", k2, subnets[k2].RefCount)
 	}
@@ -334,12 +365,26 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := a.ReleasePool(pid1); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k0].RefCount != 2 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
 	if err := a.ReleasePool(pid0); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k0].RefCount != 1 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
@@ -351,6 +396,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if pid00 != pid0 {
 		t.Fatalf("main pool should still exist")
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k0].RefCount != 2 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
@@ -358,6 +410,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := a.ReleasePool(pid2); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k0].RefCount != 1 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
@@ -365,6 +424,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err := a.ReleasePool(pid00); err != nil {
 		t.Fatal(err)
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if bp, ok := subnets[k0]; ok {
 		t.Fatalf("Base pool %s is still present: %v", k0, bp)
 	}
@@ -373,6 +439,13 @@ func TestAddReleasePoolID(t *testing.T) {
 	if err != nil {
 		t.Fatalf("Unexpected failure in adding pool")
 	}
+
+	aSpace, err = a.getAddrSpace(localAddressSpace)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	subnets = aSpace.subnets
 	if subnets[k0].RefCount != 1 {
 		t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
 	}
@@ -417,18 +490,6 @@ func TestPredefinedPool(t *testing.T) {
 	if nw != a.predefined[localAddressSpace][i] {
 		t.Fatalf("Unexpected default network returned: %s", nw)
 	}
-
-	i, available, err = getFirstAvailablePool(a, globalAddressSpace, 2)
-	if err != nil {
-		t.Skip(err)
-	}
-	nw, err = a.getPredefinedPool(globalAddressSpace, false)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if nw != available {
-		t.Fatalf("Unexpected default network returned: %s", nw)
-	}
 }
 
 func getFirstAvailablePool(a *Allocator, as string, atLeast int) (int, *net.IPNet, error) {
@@ -475,7 +536,13 @@ func TestRemoveSubnet(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	a.addrSpace2Configs["splane"] = a.addrSpace2Configs[localAddressSpace]
+	a.addrSpaces["splane"] = &addrSpace{
+		id:      dsConfigKey + "/" + "splane",
+		ds:      a.addrSpaces[localAddressSpace].ds,
+		alloc:   a.addrSpaces[localAddressSpace].alloc,
+		scope:   a.addrSpaces[localAddressSpace].scope,
+		subnets: map[SubnetKey]*PoolData{},
+	}
 
 	input := []struct {
 		addrSpace string
@@ -512,7 +579,13 @@ func TestGetSameAddress(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	a.addrSpace2Configs["giallo"] = a.addrSpace2Configs[localAddressSpace]
+	a.addrSpaces["giallo"] = &addrSpace{
+		id:      dsConfigKey + "/" + "giallo",
+		ds:      a.addrSpaces[localAddressSpace].ds,
+		alloc:   a.addrSpaces[localAddressSpace].alloc,
+		scope:   a.addrSpaces[localAddressSpace].scope,
+		subnets: map[SubnetKey]*PoolData{},
+	}
 
 	pid, _, _, err := a.RequestPool("giallo", "192.168.100.0/24", "", nil, false)
 	if err != nil {
@@ -536,7 +609,13 @@ func TestRequestReleaseAddressFromSubPool(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	a.addrSpace2Configs["rosso"] = a.addrSpace2Configs[localAddressSpace]
+	a.addrSpaces["rosso"] = &addrSpace{
+		id:      dsConfigKey + "/" + "rosso",
+		ds:      a.addrSpaces[localAddressSpace].ds,
+		alloc:   a.addrSpaces[localAddressSpace].alloc,
+		scope:   a.addrSpaces[localAddressSpace].scope,
+		subnets: map[SubnetKey]*PoolData{},
+	}
 
 	poolID, _, _, err := a.RequestPool("rosso", "172.28.0.0/16", "172.28.30.0/24", nil, false)
 	if err != nil {
@@ -615,17 +694,23 @@ func TestGetAddress(t *testing.T) {
 
 func TestRequestSyntaxCheck(t *testing.T) {
 	var (
-		pool      = "192.168.0.0/16"
-		subPool   = "192.168.0.0/24"
-		addrSpace = "green"
-		err       error
+		pool    = "192.168.0.0/16"
+		subPool = "192.168.0.0/24"
+		as      = "green"
+		err     error
 	)
 
 	a, err := getAllocator()
 	if err != nil {
 		t.Fatal(err)
 	}
-	a.addrSpace2Configs[addrSpace] = a.addrSpace2Configs[localAddressSpace]
+	a.addrSpaces[as] = &addrSpace{
+		id:      dsConfigKey + "/" + as,
+		ds:      a.addrSpaces[localAddressSpace].ds,
+		alloc:   a.addrSpaces[localAddressSpace].alloc,
+		scope:   a.addrSpaces[localAddressSpace].scope,
+		subnets: map[SubnetKey]*PoolData{},
+	}
 
 	_, _, _, err = a.RequestPool("", pool, "", nil, false)
 	if err == nil {
@@ -637,12 +722,12 @@ func TestRequestSyntaxCheck(t *testing.T) {
 		t.Fatalf("Failed to detect wrong request: empty address space")
 	}
 
-	_, _, _, err = a.RequestPool(addrSpace, "", subPool, nil, false)
+	_, _, _, err = a.RequestPool(as, "", subPool, nil, false)
 	if err == nil {
 		t.Fatalf("Failed to detect wrong request: subPool specified and no pool")
 	}
 
-	pid, _, _, err := a.RequestPool(addrSpace, pool, subPool, nil, false)
+	pid, _, _, err := a.RequestPool(as, pool, subPool, nil, false)
 	if err != nil {
 		t.Fatalf("Unexpected failure: %v", err)
 	}
@@ -764,6 +849,7 @@ func TestRelease(t *testing.T) {
 	for i, inp := range toRelease {
 		ip0 := net.ParseIP(inp.address)
 		a.ReleaseAddress(pid, ip0)
+		bm = a.addresses[SubnetKey{localAddressSpace, subnet, ""}]
 		if bm.Unselected() != 1 {
 			t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.Unselected())
 		}

+ 65 - 68
libnetwork/ipam/store.go

@@ -2,30 +2,30 @@ package ipam
 
 import (
 	"encoding/json"
+	"fmt"
 
 	log "github.com/Sirupsen/logrus"
-	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/types"
 )
 
 // Key provides the Key to be used in KV Store
-func (cfg *PoolsConfig) Key() []string {
-	cfg.Lock()
-	defer cfg.Unlock()
-	return []string{cfg.id}
+func (aSpace *addrSpace) Key() []string {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+	return []string{aSpace.id}
 }
 
 // KeyPrefix returns the immediate parent key that can be used for tree walk
-func (cfg *PoolsConfig) KeyPrefix() []string {
-	cfg.Lock()
-	defer cfg.Unlock()
+func (aSpace *addrSpace) KeyPrefix() []string {
+	aSpace.Lock()
+	defer aSpace.Unlock()
 	return []string{dsConfigKey}
 }
 
 // Value marshals the data to be stored in the KV store
-func (cfg *PoolsConfig) Value() []byte {
-	b, err := json.Marshal(cfg)
+func (aSpace *addrSpace) Value() []byte {
+	b, err := json.Marshal(aSpace)
 	if err != nil {
 		log.Warnf("Failed to marshal ipam configured pools: %v", err)
 		return nil
@@ -34,97 +34,94 @@ func (cfg *PoolsConfig) Value() []byte {
 }
 
 // SetValue unmarshalls the data from the KV store.
-func (cfg *PoolsConfig) SetValue(value []byte) error {
-	rc := &PoolsConfig{subnets: make(map[SubnetKey]*PoolData)}
+func (aSpace *addrSpace) SetValue(value []byte) error {
+	rc := &addrSpace{subnets: make(map[SubnetKey]*PoolData)}
 	if err := json.Unmarshal(value, rc); err != nil {
 		return err
 	}
-	cfg.subnets = rc.subnets
+	aSpace.subnets = rc.subnets
 	return nil
 }
 
 // Index returns the latest DB Index as seen by this object
-func (cfg *PoolsConfig) Index() uint64 {
-	cfg.Lock()
-	defer cfg.Unlock()
-	return cfg.dbIndex
+func (aSpace *addrSpace) Index() uint64 {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+	return aSpace.dbIndex
 }
 
 // SetIndex method allows the datastore to store the latest DB Index into this object
-func (cfg *PoolsConfig) SetIndex(index uint64) {
-	cfg.Lock()
-	cfg.dbIndex = index
-	cfg.dbExists = true
-	cfg.Unlock()
+func (aSpace *addrSpace) SetIndex(index uint64) {
+	aSpace.Lock()
+	aSpace.dbIndex = index
+	aSpace.dbExists = true
+	aSpace.Unlock()
 }
 
 // Exists method is true if this object has been stored in the DB.
-func (cfg *PoolsConfig) Exists() bool {
-	cfg.Lock()
-	defer cfg.Unlock()
-	return cfg.dbExists
+func (aSpace *addrSpace) Exists() bool {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+	return aSpace.dbExists
 }
 
 // Skip provides a way for a KV Object to avoid persisting it in the KV Store
-func (cfg *PoolsConfig) Skip() bool {
+func (aSpace *addrSpace) Skip() bool {
 	return false
 }
 
-func (cfg *PoolsConfig) watchForChanges() error {
-	if cfg.ds == nil {
-		return nil
-	}
-	kvpChan, err := cfg.ds.KVStore().Watch(datastore.Key(cfg.Key()...), nil)
-	if err != nil {
-		return err
+func (a *Allocator) getStore(as string) datastore.DataStore {
+	a.Lock()
+	defer a.Unlock()
+
+	return a.addrSpaces[as].ds
+}
+
+func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) {
+	store := a.getStore(as)
+	if store == nil {
+		return nil, fmt.Errorf("store for address space %s not found", as)
 	}
-	go func() {
-		for {
-			select {
-			case kvPair := <-kvpChan:
-				if kvPair != nil {
-					cfg.readFromKey(kvPair)
-				}
-			}
+
+	pc := &addrSpace{id: dsConfigKey + "/" + as, ds: store, alloc: a}
+	if err := store.GetObject(datastore.Key(pc.Key()...), pc); err != nil {
+		if err == datastore.ErrKeyNotFound {
+			return nil, nil
 		}
-	}()
-	return nil
+
+		return nil, fmt.Errorf("could not get pools config from store: %v", err)
+	}
+
+	return pc, nil
 }
 
-func (cfg *PoolsConfig) writeToStore() error {
-	if cfg.ds == nil {
-		return nil
+func (a *Allocator) writeToStore(aSpace *addrSpace) error {
+	store := aSpace.store()
+	if store == nil {
+		return fmt.Errorf("invalid store while trying to write %s address space", aSpace.DataScope())
 	}
-	err := cfg.ds.PutObjectAtomic(cfg)
+
+	err := store.PutObjectAtomic(aSpace)
 	if err == datastore.ErrKeyModified {
 		return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err)
 	}
-	return err
-}
 
-func (cfg *PoolsConfig) readFromStore() error {
-	if cfg.ds == nil {
-		return nil
-	}
-	return cfg.ds.GetObject(datastore.Key(cfg.Key()...), cfg)
+	return err
 }
 
-func (cfg *PoolsConfig) readFromKey(kvPair *store.KVPair) {
-	if cfg.dbIndex < kvPair.LastIndex {
-		cfg.SetValue(kvPair.Value)
-		cfg.dbIndex = kvPair.LastIndex
-		cfg.dbExists = true
+func (a *Allocator) deleteFromStore(aSpace *addrSpace) error {
+	store := aSpace.store()
+	if store == nil {
+		return fmt.Errorf("invalid store while trying to delete %s address space", aSpace.DataScope())
 	}
-}
 
-func (cfg *PoolsConfig) deleteFromStore() error {
-	if cfg.ds == nil {
-		return nil
-	}
-	return cfg.ds.DeleteObjectAtomic(cfg)
+	return store.DeleteObjectAtomic(aSpace)
 }
 
 // DataScope method returns the storage scope of the datastore
-func (cfg *PoolsConfig) DataScope() datastore.DataScope {
-	return cfg.scope
+func (aSpace *addrSpace) DataScope() string {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+
+	return aSpace.scope
 }

+ 99 - 42
libnetwork/ipam/structures.go

@@ -27,13 +27,13 @@ type PoolData struct {
 	RefCount  int
 }
 
-// PoolsConfig contains the pool configurations
-type PoolsConfig struct {
+// addrSpace contains the pool configurations for the address space
+type addrSpace struct {
 	subnets  map[SubnetKey]*PoolData
 	dbIndex  uint64
 	dbExists bool
 	id       string
-	scope    datastore.DataScope
+	scope    string
 	ds       datastore.DataStore
 	alloc    *Allocator
 	sync.Mutex
@@ -153,18 +153,18 @@ func (p *PoolData) UnmarshalJSON(data []byte) error {
 	return nil
 }
 
-// MarshalJSON returns the JSON encoding of the PoolsConfig object
-func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) {
-	cfg.Lock()
-	defer cfg.Unlock()
+// MarshalJSON returns the JSON encoding of the addrSpace object
+func (aSpace *addrSpace) MarshalJSON() ([]byte, error) {
+	aSpace.Lock()
+	defer aSpace.Unlock()
 
 	m := map[string]interface{}{
-		"Scope": string(cfg.scope),
+		"Scope": string(aSpace.scope),
 	}
 
-	if cfg.subnets != nil {
+	if aSpace.subnets != nil {
 		s := map[string]*PoolData{}
-		for k, v := range cfg.subnets {
+		for k, v := range aSpace.subnets {
 			s[k.String()] = v
 		}
 		m["Subnets"] = s
@@ -173,10 +173,10 @@ func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) {
 	return json.Marshal(m)
 }
 
-// UnmarshalJSON decodes data into the PoolsConfig object
-func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
-	cfg.Lock()
-	defer cfg.Unlock()
+// UnmarshalJSON decodes data into the addrSpace object
+func (aSpace *addrSpace) UnmarshalJSON(data []byte) error {
+	aSpace.Lock()
+	defer aSpace.Unlock()
 
 	m := map[string]interface{}{}
 	err := json.Unmarshal(data, &m)
@@ -184,10 +184,10 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
 		return err
 	}
 
-	cfg.scope = datastore.LocalScope
+	aSpace.scope = datastore.LocalScope
 	s := m["Scope"].(string)
 	if s == string(datastore.GlobalScope) {
-		cfg.scope = datastore.GlobalScope
+		aSpace.scope = datastore.GlobalScope
 	}
 
 	if v, ok := m["Subnets"]; ok {
@@ -200,31 +200,81 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
 		for ks, v := range s {
 			k := SubnetKey{}
 			k.FromString(ks)
-			cfg.subnets[k] = v
+			aSpace.subnets[k] = v
 		}
 	}
 
 	return nil
 }
 
-func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) {
-	cfg.Lock()
-	defer cfg.Unlock()
+// CopyTo deep copies the pool data to the destination pooldata
+func (p *PoolData) CopyTo(dstP *PoolData) error {
+	dstP.ParentKey = p.ParentKey
+	dstP.Pool = types.GetIPNetCopy(p.Pool)
+
+	if p.Range != nil {
+		dstP.Range = &AddressRange{}
+		dstP.Range.Sub = types.GetIPNetCopy(p.Range.Sub)
+		dstP.Range.Start = p.Range.Start
+		dstP.Range.End = p.Range.End
+	}
+
+	dstP.RefCount = p.RefCount
+	return nil
+}
+
+func (aSpace *addrSpace) CopyTo(o datastore.KVObject) error {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+
+	dstAspace := o.(*addrSpace)
+
+	dstAspace.id = aSpace.id
+	dstAspace.ds = aSpace.ds
+	dstAspace.alloc = aSpace.alloc
+	dstAspace.scope = aSpace.scope
+	dstAspace.dbIndex = aSpace.dbIndex
+	dstAspace.dbExists = aSpace.dbExists
+
+	dstAspace.subnets = make(map[SubnetKey]*PoolData)
+	for k, v := range aSpace.subnets {
+		dstAspace.subnets[k] = &PoolData{}
+		v.CopyTo(dstAspace.subnets[k])
+	}
+
+	return nil
+}
+
+func (aSpace *addrSpace) New() datastore.KVObject {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+
+	return &addrSpace{
+		id:    aSpace.id,
+		ds:    aSpace.ds,
+		alloc: aSpace.alloc,
+		scope: aSpace.scope,
+	}
+}
+
+func (aSpace *addrSpace) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) {
+	aSpace.Lock()
+	defer aSpace.Unlock()
 
 	// Check if already allocated
-	if p, ok := cfg.subnets[k]; ok {
-		cfg.incRefCount(p, 1)
+	if p, ok := aSpace.subnets[k]; ok {
+		aSpace.incRefCount(p, 1)
 		return func() error { return nil }, nil
 	}
 
 	// If master pool, check for overlap
 	if ipr == nil {
-		if cfg.contains(k.AddressSpace, nw) {
+		if aSpace.contains(k.AddressSpace, nw) {
 			return nil, ipamapi.ErrPoolOverlap
 		}
 		// This is a new master pool, add it along with corresponding bitmask
-		cfg.subnets[k] = &PoolData{Pool: nw, RefCount: 1}
-		return func() error { return cfg.alloc.insertBitMask(cfg.ds, k, nw) }, nil
+		aSpace.subnets[k] = &PoolData{Pool: nw, RefCount: 1}
+		return func() error { return aSpace.alloc.insertBitMask(k, nw) }, nil
 	}
 
 	// This is a new non-master pool
@@ -234,38 +284,38 @@ func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *Addre
 		Range:     ipr,
 		RefCount:  1,
 	}
-	cfg.subnets[k] = p
+	aSpace.subnets[k] = p
 
 	// Look for parent pool
-	pp, ok := cfg.subnets[p.ParentKey]
+	pp, ok := aSpace.subnets[p.ParentKey]
 	if ok {
-		cfg.incRefCount(pp, 1)
+		aSpace.incRefCount(pp, 1)
 		return func() error { return nil }, nil
 	}
 
 	// Parent pool does not exist, add it along with corresponding bitmask
-	cfg.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1}
-	return func() error { return cfg.alloc.insertBitMask(cfg.ds, p.ParentKey, nw) }, nil
+	aSpace.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1}
+	return func() error { return aSpace.alloc.insertBitMask(p.ParentKey, nw) }, nil
 }
 
-func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) {
-	cfg.Lock()
-	defer cfg.Unlock()
+func (aSpace *addrSpace) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) {
+	aSpace.Lock()
+	defer aSpace.Unlock()
 
-	p, ok := cfg.subnets[k]
+	p, ok := aSpace.subnets[k]
 	if !ok {
 		return nil, ipamapi.ErrBadPool
 	}
 
-	cfg.incRefCount(p, -1)
+	aSpace.incRefCount(p, -1)
 
 	c := p
 	for ok {
 		if c.RefCount == 0 {
-			delete(cfg.subnets, k)
+			delete(aSpace.subnets, k)
 			if c.Range == nil {
 				return func() error {
-					bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool)
+					bm, err := aSpace.alloc.retrieveBitmask(k, c.Pool)
 					if err != nil {
 						return fmt.Errorf("could not find bitmask in datastore for pool %s removal: %v", k.String(), err)
 					}
@@ -274,24 +324,24 @@ func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error)
 			}
 		}
 		k = c.ParentKey
-		c, ok = cfg.subnets[k]
+		c, ok = aSpace.subnets[k]
 	}
 
 	return func() error { return nil }, nil
 }
 
-func (cfg *PoolsConfig) incRefCount(p *PoolData, delta int) {
+func (aSpace *addrSpace) incRefCount(p *PoolData, delta int) {
 	c := p
 	ok := true
 	for ok {
 		c.RefCount += delta
-		c, ok = cfg.subnets[c.ParentKey]
+		c, ok = aSpace.subnets[c.ParentKey]
 	}
 }
 
 // Checks whether the passed subnet is a superset or subset of any of the subset in this config db
-func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool {
-	for k, v := range cfg.subnets {
+func (aSpace *addrSpace) contains(space string, nw *net.IPNet) bool {
+	for k, v := range aSpace.subnets {
 		if space == k.AddressSpace && k.ChildSubnet == "" {
 			if nw.Contains(v.Pool.IP) || v.Pool.Contains(nw.IP) {
 				return true
@@ -300,3 +350,10 @@ func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool {
 	}
 	return false
 }
+
+func (aSpace *addrSpace) store() datastore.DataStore {
+	aSpace.Lock()
+	defer aSpace.Unlock()
+
+	return aSpace.ds
+}