Browse Source

Datastore additions to bitmask management

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 10 years ago
parent
commit
c395cf2eb6

+ 16 - 7
libnetwork/bitseq/sequence.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"sync"
 
+	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/netutils"
 )
 
@@ -21,21 +22,28 @@ const (
 
 // Handle contains the sequece representing the bitmask and its identifier
 type Handle struct {
-	ID   string
-	Head *Sequence
+	App     string
+	ID      string
+	Head    *Sequence
+	store   datastore.DataStore
+	dbIndex uint64
 	sync.Mutex
 }
 
 // NewHandle returns a thread-safe instance of the bitmask handler
-func NewHandle(id string, numElements uint32) *Handle {
-	return &Handle{
-		ID: id,
+func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32) *Handle {
+	h := &Handle{
+		App:   app,
+		ID:    id,
+		store: ds,
 		Head: &Sequence{
 			Block: 0x0,
 			Count: getNumBlocks(numElements),
 			Next:  nil,
 		},
 	}
+	h.watchForChanges()
+	return h
 }
 
 // Sequence reresents a recurring sequence of 32 bits long bitmasks
@@ -151,10 +159,11 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) {
 }
 
 // PushReservation pushes the bit reservation inside the bitmask.
-func (h *Handle) PushReservation(bytePos, bitPos int, release bool) {
+func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
 	h.Lock()
-	defer h.Unlock()
 	h.Head = PushReservation(bytePos, bitPos, h.Head, release)
+	h.Unlock()
+	return h.writeToStore()
 }
 
 // GetFirstAvailable looks for the first unset bit in passed mask

+ 100 - 0
libnetwork/bitseq/store.go

@@ -0,0 +1,100 @@
+package bitseq
+
+import (
+	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/types"
+)
+
+// Key provides the Key to be used in KV Store
+func (h *Handle) Key() []string {
+	h.Lock()
+	defer h.Unlock()
+	return []string{h.App, h.ID}
+}
+
+// KeyPrefix returns the immediate parent key that can be used for tree walk
+func (h *Handle) KeyPrefix() []string {
+	h.Lock()
+	defer h.Unlock()
+	return []string{h.App}
+}
+
+// Value marshala the data to be stored in the KV store
+func (h *Handle) Value() []byte {
+	h.Lock()
+	defer h.Unlock()
+	head := h.Head
+	if head == nil {
+		return []byte{}
+	}
+	b, err := head.ToByteArray()
+	if err != nil {
+		return []byte{}
+	}
+	return b
+}
+
+// Index returns the latest DB Index as seen by this object
+func (h *Handle) Index() uint64 {
+	h.Lock()
+	defer h.Unlock()
+	return h.dbIndex
+}
+
+// SetIndex method allows the datastore to store the latest DB Index into this object
+func (h *Handle) SetIndex(index uint64) {
+	h.Lock()
+	h.dbIndex = index
+	h.Unlock()
+}
+
+func (h *Handle) watchForChanges() error {
+	h.Lock()
+	store := h.store
+	h.Unlock()
+
+	if store == nil {
+		return nil
+	}
+
+	kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case kvPair := <-kvpChan:
+				h.Lock()
+				h.dbIndex = kvPair.LastIndex
+				h.Head.FromByteArray(kvPair.Value)
+				h.Unlock()
+			}
+		}
+	}()
+	return nil
+}
+
+func (h *Handle) writeToStore() error {
+	h.Lock()
+	store := h.store
+	h.Unlock()
+	if store == nil {
+		return nil
+	}
+	err := store.PutObjectAtomic(h)
+	if err == datastore.ErrKeyModified {
+		return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err)
+	}
+	return err
+}
+
+func (h *Handle) deleteFromStore() error {
+	h.Lock()
+	store := h.store
+	h.Unlock()
+	if store == nil {
+		return nil
+	}
+	return store.DeleteObjectAtomic(h)
+}

+ 3 - 2
libnetwork/idm/idm.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 
 	"github.com/docker/libnetwork/bitseq"
+	"github.com/docker/libnetwork/datastore"
 )
 
 // Idm manages the reservation/release of numerical ids from a contiguos set
@@ -15,14 +16,14 @@ type Idm struct {
 }
 
 // New returns an instance of id manager for a set of [start-end] numerical ids
-func New(id string, start, end uint32) (*Idm, error) {
+func New(ds datastore.DataStore, id string, start, end uint32) (*Idm, error) {
 	if id == "" {
 		return nil, fmt.Errorf("Invalid id")
 	}
 	if end <= start {
 		return nil, fmt.Errorf("Invalid set range: [%d, %d]", start, end)
 	}
-	return &Idm{start: start, end: end, handle: bitseq.NewHandle(id, 1+end-start)}, nil
+	return &Idm{start: start, end: end, handle: bitseq.NewHandle("idm", ds, id, uint32(1+end-start))}, nil
 }
 
 // GetID returns the first available id in the set

+ 4 - 4
libnetwork/idm/idm_test.go

@@ -5,17 +5,17 @@ import (
 )
 
 func TestNew(t *testing.T) {
-	_, err := New("", 0, 1)
+	_, err := New(nil, "", 0, 1)
 	if err == nil {
 		t.Fatalf("Expected failure, but succeeded")
 	}
 
-	_, err = New("myset", 1<<10, 0)
+	_, err = New(nil, "myset", 1<<10, 0)
 	if err == nil {
 		t.Fatalf("Expected failure, but succeeded")
 	}
 
-	i, err := New("myset", 0, 10)
+	i, err := New(nil, "myset", 0, 10)
 	if err != nil {
 		t.Fatalf("Unexpected failure: %v", err)
 	}
@@ -31,7 +31,7 @@ func TestNew(t *testing.T) {
 }
 
 func TestAllocate(t *testing.T) {
-	i, err := New("myids", 50, 52)
+	i, err := New(nil, "myids", 50, 52)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 6 - 2
libnetwork/ipam/allocator.go

@@ -6,6 +6,7 @@ import (
 	"sync"
 
 	"github.com/docker/libnetwork/bitseq"
+	"github.com/docker/libnetwork/datastore"
 )
 
 const (
@@ -26,15 +27,18 @@ type Allocator struct {
 	subnetsInfo map[subnetKey]*SubnetInfo
 	// Allocated addresses in each address space's internal subnet
 	addresses map[subnetKey]*bitmask
+	// Datastore
+	store datastore.DataStore
 	sync.Mutex
 }
 
 // NewAllocator returns an instance of libnetwork ipam
-func NewAllocator() *Allocator {
+func NewAllocator(ds datastore.DataStore) *Allocator {
 	a := &Allocator{}
 	a.subnetsInfo = make(map[subnetKey]*SubnetInfo)
 	a.addresses = make(map[subnetKey]*bitmask)
 	a.internalHostSize = defaultInternalHostSize
+	a.store = ds
 	return a
 }
 
@@ -102,7 +106,7 @@ func (a *Allocator) AddSubnet(addrSpace AddressSpace, subnetInfo *SubnetInfo) er
 		a.Lock()
 		a.addresses[smallKey] = &bitmask{
 			subnet:        sub,
-			addressMask:   bitseq.NewHandle(smallKey.String(), uint32(numAddresses)),
+			addressMask:   bitseq.NewHandle("ipam", a.store, smallKey.String(), uint32(numAddresses)),
 			freeAddresses: numAddresses,
 		}
 		a.Unlock()

+ 7 - 7
libnetwork/ipam/allocator_test.go

@@ -10,7 +10,7 @@ import (
 )
 
 func getAllocator(subnet *net.IPNet) *Allocator {
-	a := NewAllocator()
+	a := NewAllocator(nil)
 	a.AddSubnet("default", &SubnetInfo{Subnet: subnet})
 	return a
 }
@@ -58,7 +58,7 @@ func TestGetAddressVersion(t *testing.T) {
 }
 
 func TestAddSubnets(t *testing.T) {
-	a := NewAllocator()
+	a := NewAllocator(nil)
 
 	_, sub0, _ := net.ParseCIDR("10.0.0.0/8")
 	err := a.AddSubnet("default", &SubnetInfo{Subnet: sub0})
@@ -133,7 +133,7 @@ func TestAdjustAndCheckSubnet(t *testing.T) {
 }
 
 func TestRemoveSubnet(t *testing.T) {
-	a := NewAllocator()
+	a := NewAllocator(nil)
 
 	input := []struct {
 		addrSpace AddressSpace
@@ -247,7 +247,7 @@ func TestGetAddress(t *testing.T) {
 }
 
 func TestGetSubnetList(t *testing.T) {
-	a := NewAllocator()
+	a := NewAllocator(nil)
 	input := []struct {
 		addrSpace AddressSpace
 		subnet    string
@@ -295,7 +295,7 @@ func TestGetSubnetList(t *testing.T) {
 
 func TestRequestSyntaxCheck(t *testing.T) {
 	var (
-		a        = NewAllocator()
+		a        = NewAllocator(nil)
 		subnet   = "192.168.0.0/16"
 		addSpace = AddressSpace("green")
 	)
@@ -462,7 +462,7 @@ func assertGetAddress(t *testing.T, subnet string) {
 
 	bm := &bitmask{
 		subnet:        sub,
-		addressMask:   bitseq.NewHandle("default/192.168.0.0/24", uint32(numAddresses)),
+		addressMask:   bitseq.NewHandle("ipam_test", nil, "default/192.168.0.0/24", uint32(numAddresses)),
 		freeAddresses: numAddresses,
 	}
 	numBlocks := bm.addressMask.Head.Count
@@ -513,7 +513,7 @@ func assertNRequests(t *testing.T, subnet string, numReq int, lastExpectedIP str
 func benchmarkRequest(subnet *net.IPNet) {
 	var err error
 
-	a := NewAllocator()
+	a := NewAllocator(nil)
 	a.internalHostSize = 20
 	a.AddSubnet("default", &SubnetInfo{Subnet: subnet})