Quellcode durchsuchen

Add datastore to IPAM for configuration

- IPAM to use datastore for the subnets configurations

Signed-off-by: Alessandro Boch <aboch@docker.com>
Alessandro Boch vor 10 Jahren
Ursprung
Commit
b818ea981d

+ 87 - 29
libnetwork/bitseq/sequence.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"sync"
 
+	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/netutils"
 )
@@ -22,28 +23,32 @@ const (
 
 // Handle contains the sequece representing the bitmask and its identifier
 type Handle struct {
-	App     string
-	ID      string
-	Head    *Sequence
-	store   datastore.DataStore
-	dbIndex uint64
+	bits       uint32
+	unselected uint32
+	head       *Sequence
+	app        string
+	id         string
+	dbIndex    uint64
+	store      datastore.DataStore
 	sync.Mutex
 }
 
 // NewHandle returns a thread-safe instance of the bitmask handler
-func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32) *Handle {
+func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32) (*Handle, error) {
 	h := &Handle{
-		App:   app,
-		ID:    id,
-		store: ds,
-		Head: &Sequence{
+		app:        app,
+		id:         id,
+		store:      ds,
+		bits:       numElements,
+		unselected: numElements,
+		head: &Sequence{
 			Block: 0x0,
 			Count: getNumBlocks(numElements),
 		},
 	}
 
 	if h.store == nil {
-		return h
+		return h, nil
 	}
 
 	// Register for status changes
@@ -56,10 +61,11 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
 	// node to go through a retry.
 	var bs []byte
 	if err := h.store.GetObject(datastore.Key(h.Key()...), bs); err == nil {
-		h.Head.FromByteArray(bs)
+		h.FromByteArray(bs)
+	} else if err != store.ErrKeyNotFound {
+		return nil, err
 	}
-
-	return h
+	return h, nil
 }
 
 // Sequence reresents a recurring sequence of 32 bits long bitmasks
@@ -176,7 +182,7 @@ func (s *Sequence) FromByteArray(data []byte) error {
 func (h *Handle) GetFirstAvailable() (int, int, error) {
 	h.Lock()
 	defer h.Unlock()
-	return GetFirstAvailable(h.Head)
+	return GetFirstAvailable(h.head)
 }
 
 // CheckIfAvailable checks if the bit correspondent to the specified ordinal is unset
@@ -184,29 +190,91 @@ func (h *Handle) GetFirstAvailable() (int, int, error) {
 func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) {
 	h.Lock()
 	defer h.Unlock()
-	return CheckIfAvailable(h.Head, ordinal)
+	return CheckIfAvailable(h.head, ordinal)
 }
 
 // PushReservation pushes the bit reservation inside the bitmask.
 func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
 	// Create a copy of the current handler
 	h.Lock()
-	nh := &Handle{App: h.App, ID: h.ID, store: h.store, dbIndex: h.dbIndex, Head: h.Head.GetCopy()}
+	nh := &Handle{app: h.app, id: h.id, store: h.store, dbIndex: h.dbIndex, head: h.head.GetCopy()}
 	h.Unlock()
 
-	nh.Head = PushReservation(bytePos, bitPos, nh.Head, release)
+	nh.head = PushReservation(bytePos, bitPos, nh.head, release)
 
 	err := nh.writeToStore()
 	if err == nil {
 		// Commit went through, save locally
 		h.Lock()
-		h.Head = nh.Head
+		h.head = nh.head
+		if release {
+			h.unselected++
+		} else {
+			h.unselected--
+		}
 		h.Unlock()
 	}
 
 	return err
 }
 
+// Destroy removes from the datastore the data belonging to this handle
+func (h *Handle) Destroy() {
+	h.deleteFromStore()
+}
+
+// ToByteArray converts this handle's data into a byte array
+func (h *Handle) ToByteArray() ([]byte, error) {
+	ba := make([]byte, 8)
+
+	h.Lock()
+	defer h.Unlock()
+	copy(ba[0:4], netutils.U32ToA(h.bits))
+	copy(ba[4:8], netutils.U32ToA(h.unselected))
+	bm, err := h.head.ToByteArray()
+	if err != nil {
+		return nil, err
+	}
+	ba = append(ba, bm...)
+
+	return ba, nil
+}
+
+// FromByteArray reads his handle's data from a byte array
+func (h *Handle) FromByteArray(ba []byte) error {
+	nh := &Sequence{}
+	err := nh.FromByteArray(ba[8:])
+	if err != nil {
+		return err
+	}
+
+	h.Lock()
+	h.head = nh
+	h.bits = netutils.ATo32(ba[0:4])
+	h.unselected = netutils.ATo32(ba[4:8])
+	h.Unlock()
+
+	return nil
+}
+
+// Bits returns the length of the bit sequence
+func (h *Handle) Bits() uint32 {
+	return h.bits
+}
+
+// Unselected returns the number of bits which are not selected
+func (h *Handle) Unselected() uint32 {
+	h.Lock()
+	defer h.Unlock()
+	return h.unselected
+}
+
+func (h *Handle) getDBIndex() uint64 {
+	h.Lock()
+	defer h.Unlock()
+	return h.dbIndex
+}
+
 // GetFirstAvailable looks for the first unset bit in passed mask
 func GetFirstAvailable(head *Sequence) (int, int, error) {
 	byteIndex := 0
@@ -375,16 +443,6 @@ func mergeSequences(seq *Sequence) {
 	}
 }
 
-// Serialize converts the sequence into a byte array
-func Serialize(head *Sequence) ([]byte, error) {
-	return nil, nil
-}
-
-// Deserialize decodes the byte array into a sequence
-func Deserialize(data []byte) (*Sequence, error) {
-	return nil, nil
-}
-
 func getNumBlocks(numBits uint32) uint32 {
 	numBlocks := numBits / blockLen
 	if numBits%blockLen != 0 {

+ 10 - 13
libnetwork/bitseq/store.go

@@ -9,25 +9,19 @@ import (
 func (h *Handle) Key() []string {
 	h.Lock()
 	defer h.Unlock()
-	return []string{h.App, h.ID}
+	return []string{h.app, h.id}
 }
 
 // KeyPrefix returns the immediate parent key that can be used for tree walk
 func (h *Handle) KeyPrefix() []string {
 	h.Lock()
 	defer h.Unlock()
-	return []string{h.App}
+	return []string{h.app}
 }
 
 // Value marshals the data to be stored in the KV store
 func (h *Handle) Value() []byte {
-	h.Lock()
-	defer h.Unlock()
-	head := h.Head
-	if head == nil {
-		return []byte{}
-	}
-	b, err := head.ToByteArray()
+	b, err := h.ToByteArray()
 	if err != nil {
 		return []byte{}
 	}
@@ -65,10 +59,13 @@ func (h *Handle) watchForChanges() error {
 		for {
 			select {
 			case kvPair := <-kvpChan:
-				h.Lock()
-				h.dbIndex = kvPair.LastIndex
-				h.Head.FromByteArray(kvPair.Value)
-				h.Unlock()
+				// Only process remote update
+				if kvPair != nil && (kvPair.LastIndex != h.getDBIndex()) {
+					h.Lock()
+					h.dbIndex = kvPair.LastIndex
+					h.Unlock()
+					h.FromByteArray(kvPair.Value)
+				}
 			}
 		}
 	}()

+ 14 - 0
libnetwork/datastore/datastore.go

@@ -15,6 +15,8 @@ import (
 type DataStore interface {
 	// GetObject gets data from datastore and unmarshals to the specified object
 	GetObject(key string, o interface{}) error
+	// GetUpdatedObject gets data from datastore along with its index and unmarshals to the specified object
+	GetUpdatedObject(key string, o interface{}) (uint64, error)
 	// PutObject adds a new Record based on an object into the datastore
 	PutObject(kvObject KV) error
 	// PutObjectAtomic provides an atomic add and update operation for a Record
@@ -152,6 +154,18 @@ func (ds *datastore) GetObject(key string, o interface{}) error {
 	return json.Unmarshal(kvPair.Value, o)
 }
 
+// GetUpdateObject returns a record matching the key
+func (ds *datastore) GetUpdatedObject(key string, o interface{}) (uint64, error) {
+	kvPair, err := ds.store.Get(key)
+	if err != nil {
+		return 0, err
+	}
+	if err := json.Unmarshal(kvPair.Value, o); err != nil {
+		return 0, err
+	}
+	return kvPair.LastIndex, nil
+}
+
 // DeleteObject unconditionally deletes a record from the store
 func (ds *datastore) DeleteObject(kvObject KV) error {
 	return ds.store.Delete(Key(kvObject.Key()...))

+ 37 - 16
libnetwork/idm/idm.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/types"
 )
 
 // Idm manages the reservation/release of numerical ids from a contiguos set
@@ -23,7 +24,13 @@ func New(ds datastore.DataStore, id string, start, end uint32) (*Idm, error) {
 	if end <= start {
 		return nil, fmt.Errorf("Invalid set range: [%d, %d]", start, end)
 	}
-	return &Idm{start: start, end: end, handle: bitseq.NewHandle("idm", ds, id, uint32(1+end-start))}, nil
+
+	h, err := bitseq.NewHandle("idm", ds, id, uint32(1+end-start))
+	if err != nil {
+		return nil, err
+	}
+
+	return &Idm{start: start, end: end, handle: h}, nil
 }
 
 // GetID returns the first available id in the set
@@ -32,20 +39,27 @@ func (i *Idm) GetID() (uint32, error) {
 		return 0, fmt.Errorf("ID set is not initialized")
 	}
 
-	bytePos, bitPos, err := i.handle.GetFirstAvailable()
-	if err != nil {
-		return 0, fmt.Errorf("no available ids")
-	}
-	id := i.start + uint32(bitPos+bytePos*8)
+	for {
+		bytePos, bitPos, err := i.handle.GetFirstAvailable()
+		if err != nil {
+			return 0, fmt.Errorf("no available ids")
+		}
+		id := i.start + uint32(bitPos+bytePos*8)
 
-	// for sets which length is non multiple of 32 this check is needed
-	if i.end < id {
-		return 0, fmt.Errorf("no available ids")
-	}
+		// for sets which length is non multiple of 32 this check is needed
+		if i.end < id {
+			return 0, fmt.Errorf("no available ids")
+		}
 
-	i.handle.PushReservation(bytePos, bitPos, false)
+		if err := i.handle.PushReservation(bytePos, bitPos, false); err != nil {
+			if _, ok := err.(types.RetryError); !ok {
+				return 0, fmt.Errorf("internal failure while reserving the id: %s", err.Error())
+			}
+			continue
+		}
 
-	return id, nil
+		return id, nil
+	}
 }
 
 // GetSpecificID tries to reserve the specified id
@@ -58,12 +72,19 @@ func (i *Idm) GetSpecificID(id uint32) error {
 		return fmt.Errorf("Requested id does not belong to the set")
 	}
 
-	if bytePos, bitPos, err := i.handle.CheckIfAvailable(int(id - i.start)); err == nil {
-		i.handle.PushReservation(bytePos, bitPos, false)
+	for {
+		bytePos, bitPos, err := i.handle.CheckIfAvailable(int(id - i.start))
+		if err != nil {
+			return fmt.Errorf("requested id is not available")
+		}
+		if err := i.handle.PushReservation(bytePos, bitPos, false); err != nil {
+			if _, ok := err.(types.RetryError); !ok {
+				return fmt.Errorf("internal failure while reserving the id: %s", err.Error())
+			}
+			continue
+		}
 		return nil
 	}
-
-	return fmt.Errorf("requested id is not available")
 }
 
 // Release releases the specified id

+ 178 - 75
libnetwork/ipam/allocator.go

@@ -7,6 +7,7 @@ import (
 	"sync"
 
 	log "github.com/Sirupsen/logrus"
+	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/types"
@@ -20,6 +21,9 @@ const (
 	minNetSizeV6Eff = 96
 	// The size of the host subnet used internally, it's the most granular sequence addresses
 	defaultInternalHostSize = 16
+	// datastore keyes for ipam obkects
+	dsConfigKey = "ipam-config" // ipam-config/<domain>/<map of subent configs>
+	dsDataKey   = "ipam-data"   // ipam-data/<domain>/<subnet>/<child-sudbnet>/<bitmask>
 )
 
 // Allocator provides per address space ipv4/ipv6 book keeping
@@ -29,20 +33,75 @@ type Allocator struct {
 	// Static subnet information
 	subnets map[subnetKey]*SubnetInfo
 	// Allocated addresses in each address space's internal subnet
-	addresses map[subnetKey]*bitmask
+	addresses map[subnetKey]*bitseq.Handle
 	// Datastore
-	store datastore.DataStore
+	store   datastore.DataStore
+	App     string
+	ID      string
+	dbIndex uint64
 	sync.Mutex
 }
 
 // NewAllocator returns an instance of libnetwork ipam
-func NewAllocator(ds datastore.DataStore) *Allocator {
+func NewAllocator(ds datastore.DataStore) (*Allocator, error) {
 	a := &Allocator{}
 	a.subnets = make(map[subnetKey]*SubnetInfo)
-	a.addresses = make(map[subnetKey]*bitmask)
+	a.addresses = make(map[subnetKey]*bitseq.Handle)
 	a.internalHostSize = defaultInternalHostSize
 	a.store = ds
-	return a
+	a.App = "ipam"
+	a.ID = dsConfigKey
+
+	if a.store == nil {
+		return a, nil
+	}
+
+	// Register for status changes
+	a.watchForChanges()
+
+	// Get the initial subnet configs status from the ds if present.
+	kvPair, err := a.store.KVStore().Get(datastore.Key(a.Key()...))
+	if err != nil {
+		if err != store.ErrKeyNotFound {
+			return nil, fmt.Errorf("failed to retrieve the ipam subnet configs from datastore: %v", err)
+		}
+		return a, nil
+	}
+	a.subnetConfigFromStore(kvPair)
+
+	// Now retrieve the list of small subnets
+	var inserterList []func() error
+	a.Lock()
+	for k, v := range a.subnets {
+		inserterList = append(inserterList,
+			func() error {
+				subnetList, err := getInternalSubnets(v.Subnet, a.internalHostSize)
+				if err != nil {
+					return fmt.Errorf("failed to load address bitmask for configured subnet %s because of %s", v.Subnet.String(), err.Error())
+				}
+				a.insertAddressMasks(k, subnetList)
+				return nil
+			})
+	}
+	a.Unlock()
+
+	// Add the bitmasks, data could come from datastore
+	for _, f := range inserterList {
+		if err := f(); err != nil {
+			return nil, err
+		}
+	}
+
+	return a, nil
+}
+
+func (a *Allocator) subnetConfigFromStore(kvPair *store.KVPair) {
+	a.Lock()
+	if a.dbIndex < kvPair.LastIndex {
+		a.subnets = byteArrayToSubnets(kvPair.Value)
+		a.dbIndex = kvPair.LastIndex
+	}
+	a.Unlock()
 }
 
 // Pointer to the configured subnets in each address space
@@ -78,12 +137,18 @@ func (s *subnetKey) FromString(str string) error {
 	return nil
 }
 
-// The structs containing the address allocation bitmask for the internal subnet.
-// The bitmask is stored a run-length encoded seq.Sequence of 4 bytes blcoks.
-type bitmask struct {
-	subnet        *net.IPNet
-	addressMask   *bitseq.Handle
-	freeAddresses int
+func (s *subnetKey) canonicalSubnet() *net.IPNet {
+	if _, sub, err := net.ParseCIDR(s.subnet); err == nil {
+		return sub
+	}
+	return nil
+}
+
+func (s *subnetKey) canonicalChildSubnet() *net.IPNet {
+	if _, sub, err := net.ParseCIDR(s.childSubnet); err == nil {
+		return sub
+	}
+	return nil
 }
 
 type ipVersion int
@@ -106,38 +171,59 @@ func (a *Allocator) AddSubnet(addrSpace AddressSpace, subnetInfo *SubnetInfo) er
 	if subnetInfo == nil || subnetInfo.Subnet == nil {
 		return ErrInvalidSubnet
 	}
-	if a.contains(addrSpace, subnetInfo) {
-		return ErrOverlapSubnet
-	}
-
 	// Convert to smaller internal subnets (if needed)
 	subnetList, err := getInternalSubnets(subnetInfo.Subnet, a.internalHostSize)
 	if err != nil {
 		return err
 	}
+retry:
+	if a.contains(addrSpace, subnetInfo) {
+		return ErrOverlapSubnet
+	}
 
-	// Store the configured subnet information
+	// Store the configured subnet and sync to datatstore
 	key := subnetKey{addrSpace, subnetInfo.Subnet.String(), ""}
 	a.Lock()
 	a.subnets[key] = subnetInfo
 	a.Unlock()
+	err = a.writeToStore()
+	if err != nil {
+		if _, ok := err.(types.RetryError); !ok {
+			return types.InternalErrorf("subnet configuration failed because of %s", err.Error())
+		}
+		// Update to latest
+		if erru := a.readFromStore(); erru != nil {
+			// Restore and bail out
+			a.Lock()
+			delete(a.addresses, key)
+			a.Unlock()
+			return fmt.Errorf("failed to get updated subnets config from datastore (%v) after (%v)", erru, err)
+		}
+		goto retry
+	}
+
+	// Insert respective bitmasks for this subnet
+	a.insertAddressMasks(key, subnetList)
+
+	return nil
+}
 
-	// Create and insert the internal subnet(s) addresses masks into the address database
-	for _, sub := range subnetList {
-		ones, bits := sub.Mask.Size()
+// Create and insert the internal subnet(s) addresses masks into the address database. Mask data may come from the bitseq datastore.
+func (a *Allocator) insertAddressMasks(parentKey subnetKey, internalSubnetList []*net.IPNet) error {
+	for _, intSub := range internalSubnetList {
+		var err error
+		ones, bits := intSub.Mask.Size()
 		numAddresses := 1 << uint(bits-ones)
-		smallKey := subnetKey{addrSpace, key.subnet, sub.String()}
+		smallKey := subnetKey{parentKey.addressSpace, parentKey.subnet, intSub.String()}
 
-		// Add the new address masks
+		// Insert the new address masks. AddressMask content may come from datastore
 		a.Lock()
-		a.addresses[smallKey] = &bitmask{
-			subnet:        sub,
-			addressMask:   bitseq.NewHandle("ipam", a.store, smallKey.String(), uint32(numAddresses)),
-			freeAddresses: numAddresses,
-		}
+		a.addresses[smallKey], err = bitseq.NewHandle(dsDataKey, a.store, smallKey.String(), uint32(numAddresses))
 		a.Unlock()
+		if err != nil {
+			return err
+		}
 	}
-
 	return nil
 }
 
@@ -224,17 +310,37 @@ func (a *Allocator) RemoveSubnet(addrSpace AddressSpace, subnet *net.IPNet) erro
 	if subnet == nil {
 		return ErrInvalidSubnet
 	}
-
+retry:
 	// Look for the respective subnet configuration data
 	// Remove it along with the internal subnets
 	subKey := subnetKey{addrSpace, subnet.String(), ""}
 	a.Lock()
-	_, ok := a.subnets[subKey]
+	current, ok := a.subnets[subKey]
 	a.Unlock()
 	if !ok {
 		return ErrSubnetNotFound
 	}
 
+	// Remove config and sync to datastore
+	a.Lock()
+	delete(a.subnets, subKey)
+	a.Unlock()
+	err := a.writeToStore()
+	if err != nil {
+		if _, ok := err.(types.RetryError); !ok {
+			return types.InternalErrorf("subnet removal failed because of %s", err.Error())
+		}
+		// Update to latest
+		if erru := a.readFromStore(); erru != nil {
+			// Restore and bail out
+			a.Lock()
+			a.subnets[subKey] = current
+			a.Unlock()
+			return fmt.Errorf("failed to get updated subnets config from datastore (%v) after (%v)", erru, err)
+		}
+		goto retry
+	}
+
 	// Get the list of smaller internal subnets
 	subnetList, err := getInternalSubnets(subnet, a.internalHostSize)
 	if err != nil {
@@ -242,15 +348,15 @@ func (a *Allocator) RemoveSubnet(addrSpace AddressSpace, subnet *net.IPNet) erro
 	}
 
 	for _, s := range subnetList {
+		sk := subnetKey{addrSpace, subKey.subnet, s.String()}
 		a.Lock()
-		delete(a.addresses, subnetKey{addrSpace, subKey.subnet, s.String()})
+		if bm, ok := a.addresses[sk]; ok {
+			bm.Destroy()
+		}
+		delete(a.addresses, sk)
 		a.Unlock()
 	}
 
-	a.Lock()
-	delete(a.subnets, subKey)
-	a.Unlock()
-
 	return nil
 
 }
@@ -320,14 +426,14 @@ func (a *Allocator) Release(addrSpace AddressSpace, address net.IP) {
 		a.Lock()
 		space := a.addresses[subKey]
 		a.Unlock()
-		sub := space.subnet
+		sub := subKey.canonicalChildSubnet()
 		if sub.Contains(address) {
 			// Retrieve correspondent ordinal in the subnet
 			ordinal := ipToInt(getHostPortionIP(address, sub))
 			// Release it
 			for {
 				var err error
-				if err = space.addressMask.PushReservation(ordinal/8, ordinal%8, true); err == nil {
+				if err = space.PushReservation(ordinal/8, ordinal%8, true); err == nil {
 					break
 				}
 				if _, ok := err.(types.RetryError); ok {
@@ -337,7 +443,6 @@ func (a *Allocator) Release(addrSpace AddressSpace, address net.IP) {
 				log.Warnf("Failed to release address %s because of internal error: %s", address.String(), err.Error())
 				return
 			}
-			space.freeAddresses++
 			return
 		}
 
@@ -368,9 +473,13 @@ func (a *Allocator) reserveAddress(addrSpace AddressSpace, subnet *net.IPNet, pr
 
 	for _, key := range keyList {
 		a.Lock()
-		smallSubnet := a.addresses[key]
+		bitmask, ok := a.addresses[key]
 		a.Unlock()
-		address, err := a.getAddress(smallSubnet, prefAddress, ver)
+		if !ok {
+			fmt.Printf("\nDid not find a bitmask for subnet key: %s", key.String())
+			continue
+		}
+		address, err := a.getAddress(key.canonicalChildSubnet(), bitmask, prefAddress, ver)
 		if err == nil {
 			return address, subnet, nil
 		}
@@ -385,7 +494,7 @@ func (a *Allocator) getSubnetList(addrSpace AddressSpace, ver ipVersion) []subne
 	ind := 0
 	a.Lock()
 	for subKey := range a.addresses {
-		_, s, _ := net.ParseCIDR(subKey.subnet)
+		s := subKey.canonicalSubnet()
 		subVer := getAddressVersion(s.IP)
 		if subKey.addressSpace == addrSpace && subVer == ver {
 			list[ind] = subKey
@@ -396,54 +505,48 @@ func (a *Allocator) getSubnetList(addrSpace AddressSpace, ver ipVersion) []subne
 	return list[0:ind]
 }
 
-func (a *Allocator) getAddress(smallSubnet *bitmask, prefAddress net.IP, ver ipVersion) (net.IP, error) {
+func (a *Allocator) getAddress(subnet *net.IPNet, bitmask *bitseq.Handle, prefAddress net.IP, ver ipVersion) (net.IP, error) {
 	var (
 		bytePos, bitPos int
 		ordinal         int
 		err             error
 	)
-	// Look for free IP, skip .0 and .255, they will be automatically reserved
-again:
-	if smallSubnet.freeAddresses <= 0 {
-		return nil, ErrNoAvailableIPs
-	}
-	if prefAddress == nil {
-		bytePos, bitPos, err = smallSubnet.addressMask.GetFirstAvailable()
-	} else {
-		ordinal = ipToInt(getHostPortionIP(prefAddress, smallSubnet.subnet))
-		bytePos, bitPos, err = smallSubnet.addressMask.CheckIfAvailable(ordinal)
-	}
-	if err != nil {
-		return nil, ErrNoAvailableIPs
-	}
 
-pushsame:
-	// Lock it
-	if err = smallSubnet.addressMask.PushReservation(bytePos, bitPos, false); err != nil {
-		if _, ok := err.(types.RetryError); !ok {
-			return nil, fmt.Errorf("internal failure while reserving the address: %s", err.Error())
+	// Look for free IP, skip .0 and .255, they will be automatically reserved
+	for {
+		if bitmask.Unselected() <= 0 {
+			return nil, ErrNoAvailableIPs
 		}
-		// bitmask view must have changed. Selected address may or may no longer be available
-		if prefAddress != nil {
-			if _, _, err = smallSubnet.addressMask.CheckIfAvailable(ordinal); err == nil {
-				//still available
-				goto pushsame
+		if prefAddress == nil {
+			bytePos, bitPos, err = bitmask.GetFirstAvailable()
+		} else {
+			ordinal = ipToInt(getHostPortionIP(prefAddress, subnet))
+			bytePos, bitPos, err = bitmask.CheckIfAvailable(ordinal)
+		}
+		if err != nil {
+			return nil, ErrNoAvailableIPs
+		}
+
+		// Lock it
+		if err = bitmask.PushReservation(bytePos, bitPos, false); err != nil {
+			if _, ok := err.(types.RetryError); !ok {
+				return nil, fmt.Errorf("internal failure while reserving the address: %s", err.Error())
 			}
-			goto again
+			continue
 		}
-	}
-	smallSubnet.freeAddresses--
 
-	// Build IP ordinal
-	ordinal = bitPos + bytePos*8
+		// Build IP ordinal
+		ordinal = bitPos + bytePos*8
 
-	// For v4, let reservation of .0 and .255 happen automatically
-	if ver == v4 && !isValidIP(ordinal) {
-		goto again
+		// For v4, let reservation of .0 and .255 happen automatically
+		if ver == v4 && !isValidIP(ordinal) {
+			continue
+		}
+		break
 	}
 
 	// Convert IP ordinal for this subnet into IP address
-	return generateAddress(ordinal, smallSubnet.subnet), nil
+	return generateAddress(ordinal, subnet), nil
 }
 
 // DumpDatabase dumps the internal info
@@ -456,7 +559,7 @@ func (a *Allocator) DumpDatabase() {
 		for _, s := range subnetList {
 			internKey := subnetKey{k.addressSpace, config.Subnet.String(), s.String()}
 			bm := a.addresses[internKey]
-			fmt.Printf("\n\t%s: %s\n\t%d", bm.subnet, bm.addressMask, bm.freeAddresses)
+			fmt.Printf("\n\t%s: %s\n\t%d", internKey.childSubnet, bm, bm.Unselected())
 		}
 	}
 }

+ 41 - 29
libnetwork/ipam/allocator_test.go

@@ -9,8 +9,11 @@ import (
 	"github.com/docker/libnetwork/bitseq"
 )
 
-func getAllocator(subnet *net.IPNet) *Allocator {
-	a := NewAllocator(nil)
+func getAllocator(t *testing.T, subnet *net.IPNet) *Allocator {
+	a, err := NewAllocator(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 	a.AddSubnet("default", &SubnetInfo{Subnet: subnet})
 	return a
 }
@@ -90,10 +93,13 @@ func TestKeyString(t *testing.T) {
 }
 
 func TestAddSubnets(t *testing.T) {
-	a := NewAllocator(nil)
+	a, err := NewAllocator(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	_, sub0, _ := net.ParseCIDR("10.0.0.0/8")
-	err := a.AddSubnet("default", &SubnetInfo{Subnet: sub0})
+	err = a.AddSubnet("default", &SubnetInfo{Subnet: sub0})
 	if err != nil {
 		t.Fatalf("Unexpected failure in adding subent")
 	}
@@ -165,7 +171,10 @@ func TestAdjustAndCheckSubnet(t *testing.T) {
 }
 
 func TestRemoveSubnet(t *testing.T) {
-	a := NewAllocator(nil)
+	a, err := NewAllocator(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	input := []struct {
 		addrSpace AddressSpace
@@ -279,7 +288,10 @@ func TestGetAddress(t *testing.T) {
 }
 
 func TestGetSubnetList(t *testing.T) {
-	a := NewAllocator(nil)
+	a, err := NewAllocator(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 	input := []struct {
 		addrSpace AddressSpace
 		subnet    string
@@ -327,18 +339,22 @@ func TestGetSubnetList(t *testing.T) {
 
 func TestRequestSyntaxCheck(t *testing.T) {
 	var (
-		a        = NewAllocator(nil)
 		subnet   = "192.168.0.0/16"
 		addSpace = AddressSpace("green")
 	)
 
+	a, err := NewAllocator(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	// Add subnet and create base request
 	_, sub, _ := net.ParseCIDR(subnet)
 	a.AddSubnet(addSpace, &SubnetInfo{Subnet: sub})
 	req := &AddressRequest{Subnet: *sub}
 
 	// Empty address space request
-	_, err := a.Request("", req)
+	_, err = a.Request("", req)
 	if err == nil {
 		t.Fatalf("Failed to detect wrong request: empty address space")
 	}
@@ -397,7 +413,7 @@ func TestRelease(t *testing.T) {
 	)
 
 	_, sub, _ := net.ParseCIDR(subnet)
-	a := getAllocator(sub)
+	a := getAllocator(t, sub)
 	req = &AddressRequest{Subnet: *sub}
 	bm := a.addresses[subnetKey{"default", subnet, subnet}]
 
@@ -438,8 +454,8 @@ func TestRelease(t *testing.T) {
 	for i, inp := range toRelease {
 		address := net.ParseIP(inp.address)
 		a.Release("default", address)
-		if bm.freeAddresses != 1 {
-			t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.freeAddresses)
+		if bm.Unselected() != 1 {
+			t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.Unselected())
 		}
 
 		rsp, err := a.Request("default", req)
@@ -485,33 +501,29 @@ func assertGetAddress(t *testing.T, subnet string) {
 	zeroes := bits - ones
 	numAddresses := 1 << uint(zeroes)
 
-	var expectedMax uint32
-	if numAddresses >= 32 {
-		expectedMax = uint32(1<<32 - 1)
-	} else {
-		expectedMax = (1<<uint(numAddresses) - 1) << uint(32-numAddresses)
-	}
-
-	bm := &bitmask{
-		subnet:        sub,
-		addressMask:   bitseq.NewHandle("ipam_test", nil, "default/192.168.0.0/24", uint32(numAddresses)),
-		freeAddresses: numAddresses,
+	bm, err := bitseq.NewHandle("ipam_test", nil, "default/192.168.0.0/24", uint32(numAddresses))
+	if err != nil {
+		t.Fatal(err)
 	}
-	numBlocks := bm.addressMask.Head.Count
 
 	start := time.Now()
 	run := 0
 	for err != ErrNoAvailableIPs {
-		_, err = a.getAddress(bm, nil, v4)
+		_, err = a.getAddress(sub, bm, nil, v4)
 		run++
 	}
 	if printTime {
 		fmt.Printf("\nTaken %v, to allocate all addresses on %s. (nemAddresses: %d. Runs: %d)", time.Since(start), subnet, numAddresses, run)
 	}
-	if bm.addressMask.Head.Block != expectedMax || bm.addressMask.Head.Count != numBlocks {
-		t.Fatalf("Failed to effectively reserve all addresses on %s. Expected (0x%x, %d) as first sequence. Found (0x%x,%d)",
-			subnet, expectedMax, numBlocks, bm.addressMask.Head.Block, bm.addressMask.Head.Count)
+	if bm.Unselected() != 0 {
+		t.Fatalf("Unexpected free count after reserving all addresses: %d", bm.Unselected())
 	}
+	/*
+		if bm.Head.Block != expectedMax || bm.Head.Count != numBlocks {
+			t.Fatalf("Failed to effectively reserve all addresses on %s. Expected (0x%x, %d) as first sequence. Found (0x%x,%d)",
+				subnet, expectedMax, numBlocks, bm.Head.Block, bm.Head.Count)
+		}
+	*/
 }
 
 func assertNRequests(t *testing.T, subnet string, numReq int, lastExpectedIP string) {
@@ -525,7 +537,7 @@ func assertNRequests(t *testing.T, subnet string, numReq int, lastExpectedIP str
 	_, sub, _ := net.ParseCIDR(subnet)
 	lastIP := net.ParseIP(lastExpectedIP)
 
-	a := getAllocator(sub)
+	a := getAllocator(t, sub)
 	req = &AddressRequest{Subnet: *sub}
 
 	i := 0
@@ -545,7 +557,7 @@ func assertNRequests(t *testing.T, subnet string, numReq int, lastExpectedIP str
 func benchmarkRequest(subnet *net.IPNet) {
 	var err error
 
-	a := NewAllocator(nil)
+	a, _ := NewAllocator(nil)
 	a.internalHostSize = 20
 	a.AddSubnet("default", &SubnetInfo{Subnet: subnet})
 

+ 164 - 0
libnetwork/ipam/store.go

@@ -0,0 +1,164 @@
+package ipam
+
+import (
+	"encoding/json"
+	"net"
+
+	log "github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/types"
+)
+
+// Key provides the Key to be used in KV Store
+func (a *Allocator) Key() []string {
+	a.Lock()
+	defer a.Unlock()
+	return []string{a.App, a.ID}
+}
+
+// KeyPrefix returns the immediate parent key that can be used for tree walk
+func (a *Allocator) KeyPrefix() []string {
+	a.Lock()
+	defer a.Unlock()
+	return []string{a.App}
+}
+
+// Value marshals the data to be stored in the KV store
+func (a *Allocator) Value() []byte {
+	a.Lock()
+	defer a.Unlock()
+
+	if a.subnets == nil {
+		return []byte{}
+	}
+
+	b, err := subnetsToByteArray(a.subnets)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func subnetsToByteArray(m map[subnetKey]*SubnetInfo) ([]byte, error) {
+	if m == nil {
+		return nil, nil
+	}
+
+	mm := make(map[string]string, len(m))
+	for k, v := range m {
+		mm[k.String()] = v.Subnet.String()
+	}
+
+	return json.Marshal(mm)
+}
+
+func byteArrayToSubnets(ba []byte) map[subnetKey]*SubnetInfo {
+	m := map[subnetKey]*SubnetInfo{}
+
+	if ba == nil || len(ba) == 0 {
+		return m
+	}
+
+	var mm map[string]string
+	err := json.Unmarshal(ba, &mm)
+	if err != nil {
+		log.Warnf("Failed to decode subnets byte array: %v", err)
+		return m
+	}
+	for ks, vs := range mm {
+		sk := subnetKey{}
+		if err := sk.FromString(ks); err != nil {
+			log.Warnf("Failed to decode subnets map entry: (%s, %s)", ks, vs)
+			continue
+		}
+		si := &SubnetInfo{}
+		_, nw, err := net.ParseCIDR(vs)
+		if err != nil {
+			log.Warnf("Failed to decode subnets map entry value: (%s, %s)", ks, vs)
+			continue
+		}
+		si.Subnet = nw
+		m[sk] = si
+	}
+	return m
+}
+
+// Index returns the latest DB Index as seen by this object
+func (a *Allocator) Index() uint64 {
+	a.Lock()
+	defer a.Unlock()
+	return a.dbIndex
+}
+
+// SetIndex method allows the datastore to store the latest DB Index into this object
+func (a *Allocator) SetIndex(index uint64) {
+	a.Lock()
+	a.dbIndex = index
+	a.Unlock()
+}
+
+func (a *Allocator) watchForChanges() error {
+	if a.store == nil {
+		return nil
+	}
+
+	kvpChan, err := a.store.KVStore().Watch(datastore.Key(a.Key()...), nil)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case kvPair := <-kvpChan:
+				if kvPair != nil {
+					log.Debugf("Got notification for key %v: %v", kvPair.Key, kvPair.Value)
+					a.subnetConfigFromStore(kvPair)
+				}
+			}
+		}
+	}()
+	return nil
+}
+
+func (a *Allocator) readFromStore() error {
+	a.Lock()
+	store := a.store
+	a.Unlock()
+
+	if store == nil {
+		return nil
+	}
+
+	kvPair, err := a.store.KVStore().Get(datastore.Key(a.Key()...))
+	if err != nil {
+		return err
+	}
+
+	a.subnetConfigFromStore(kvPair)
+
+	return nil
+}
+
+func (a *Allocator) writeToStore() error {
+	a.Lock()
+	store := a.store
+	a.Unlock()
+	if store == nil {
+		return nil
+	}
+	err := store.PutObjectAtomic(a)
+	if err == datastore.ErrKeyModified {
+		return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err)
+	}
+	return err
+}
+
+func (a *Allocator) deleteFromStore() error {
+	a.Lock()
+	store := a.store
+	a.Unlock()
+	if store == nil {
+		return nil
+	}
+	return store.DeleteObjectAtomic(a)
+}