Просмотр исходного кода

Rework push reservation w/ datastore

- At Handle creation, first check if an instance of the
  the respective object is already present in the datastore.
- Handle sequence must be saved only if commit
  to datastore is succesfull
- Caller (ipam) needs to manage the retry

Signed-off-by: Alessandro Boch <aboch@docker.com>
Alessandro Boch 10 лет назад
Родитель
Сommit
e39fc16c55
4 измененных файлов с 129 добавлено и 8 удалено
  1. 44 3
      libnetwork/bitseq/sequence.go
  2. 52 0
      libnetwork/bitseq/sequence_test.go
  3. 1 1
      libnetwork/bitseq/store.go
  4. 32 4
      libnetwork/ipam/allocator.go

+ 44 - 3
libnetwork/bitseq/sequence.go

@@ -39,10 +39,26 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
 		Head: &Sequence{
 		Head: &Sequence{
 			Block: 0x0,
 			Block: 0x0,
 			Count: getNumBlocks(numElements),
 			Count: getNumBlocks(numElements),
-			Next:  nil,
 		},
 		},
 	}
 	}
+
+	if h.store == nil {
+		return h
+	}
+
+	// Register for status changes
 	h.watchForChanges()
 	h.watchForChanges()
+
+	// Get the initial status from the ds if present.
+	// We will be getting an instance without a dbIndex
+	// (GetObject() does not set it): It is ok for now,
+	// it will only cause the first allocation on this
+	// node to go through a retry.
+	var bs []byte
+	if err := h.store.GetObject(datastore.Key(h.Key()...), bs); err == nil {
+		h.Head.FromByteArray(bs)
+	}
+
 	return h
 	return h
 }
 }
 
 
@@ -83,6 +99,19 @@ func (s *Sequence) GetAvailableBit() (bytePos, bitPos int) {
 	return bits / 8, bits % 8
 	return bits / 8, bits % 8
 }
 }
 
 
+// GetCopy returns a copy of the linked list rooted at this node
+func (s *Sequence) GetCopy() *Sequence {
+	n := &Sequence{Block: s.Block, Count: s.Count}
+	pn := n
+	ps := s.Next
+	for ps != nil {
+		pn.Next = &Sequence{Block: ps.Block, Count: ps.Count}
+		pn = pn.Next
+		ps = ps.Next
+	}
+	return n
+}
+
 // Equal checks if this sequence is equal to the passed one
 // Equal checks if this sequence is equal to the passed one
 func (s *Sequence) Equal(o *Sequence) bool {
 func (s *Sequence) Equal(o *Sequence) bool {
 	this := s
 	this := s
@@ -160,10 +189,22 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) {
 
 
 // PushReservation pushes the bit reservation inside the bitmask.
 // PushReservation pushes the bit reservation inside the bitmask.
 func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
 func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
+	// Create a copy of the current handler
 	h.Lock()
 	h.Lock()
-	h.Head = PushReservation(bytePos, bitPos, h.Head, release)
+	nh := &Handle{App: h.App, ID: h.ID, store: h.store, dbIndex: h.dbIndex, Head: h.Head.GetCopy()}
 	h.Unlock()
 	h.Unlock()
-	return h.writeToStore()
+
+	nh.Head = PushReservation(bytePos, bitPos, nh.Head, release)
+
+	err := nh.writeToStore()
+	if err == nil {
+		// Commit went through, save locally
+		h.Lock()
+		h.Head = nh.Head
+		h.Unlock()
+	}
+
+	return err
 }
 }
 
 
 // GetFirstAvailable looks for the first unset bit in passed mask
 // GetFirstAvailable looks for the first unset bit in passed mask

+ 52 - 0
libnetwork/bitseq/sequence_test.go

@@ -87,6 +87,58 @@ func TestSequenceEqual(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestSequenceCopy(t *testing.T) {
+	s := &Sequence{
+		Block: 0x0,
+		Count: 8,
+		Next: &Sequence{
+			Block: 0x0,
+			Count: 8,
+			Next: &Sequence{
+				Block: 0x0,
+				Count: 0,
+				Next: &Sequence{
+					Block: 0x0,
+					Count: 0,
+					Next: &Sequence{
+						Block: 0x0,
+						Count: 2,
+						Next: &Sequence{
+							Block: 0x0,
+							Count: 1,
+							Next: &Sequence{
+								Block: 0x0,
+								Count: 1,
+								Next: &Sequence{
+									Block: 0x0,
+									Count: 2,
+									Next: &Sequence{
+										Block: 0x1,
+										Count: 1,
+										Next: &Sequence{
+											Block: 0x0,
+											Count: 2,
+											Next:  nil,
+										},
+									},
+								},
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+
+	n := s.GetCopy()
+	if !s.Equal(n) {
+		t.Fatalf("copy of s failed")
+	}
+	if n == s {
+		t.Fatalf("not true copy of s")
+	}
+}
+
 func TestGetFirstAvailable(t *testing.T) {
 func TestGetFirstAvailable(t *testing.T) {
 	input := []struct {
 	input := []struct {
 		mask    *Sequence
 		mask    *Sequence

+ 1 - 1
libnetwork/bitseq/store.go

@@ -19,7 +19,7 @@ func (h *Handle) KeyPrefix() []string {
 	return []string{h.App}
 	return []string{h.App}
 }
 }
 
 
-// Value marshala the data to be stored in the KV store
+// Value marshals the data to be stored in the KV store
 func (h *Handle) Value() []byte {
 func (h *Handle) Value() []byte {
 	h.Lock()
 	h.Lock()
 	defer h.Unlock()
 	defer h.Unlock()

+ 32 - 4
libnetwork/ipam/allocator.go

@@ -6,8 +6,10 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 
 
+	log "github.com/Sirupsen/logrus"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/bitseq"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/types"
 )
 )
 
 
 const (
 const (
@@ -323,10 +325,22 @@ func (a *Allocator) Release(addrSpace AddressSpace, address net.IP) {
 			// Retrieve correspondent ordinal in the subnet
 			// Retrieve correspondent ordinal in the subnet
 			ordinal := ipToInt(getHostPortionIP(address, sub))
 			ordinal := ipToInt(getHostPortionIP(address, sub))
 			// Release it
 			// Release it
-			space.addressMask.PushReservation(ordinal/8, ordinal%8, true)
+			for {
+				var err error
+				if err = space.addressMask.PushReservation(ordinal/8, ordinal%8, true); err == nil {
+					break
+				}
+				if _, ok := err.(types.RetryError); ok {
+					// bitmask must have changed, retry delete
+					continue
+				}
+				log.Warnf("Failed to release address %s because of internal error: %s", address.String(), err.Error())
+				return
+			}
 			space.freeAddresses++
 			space.freeAddresses++
 			return
 			return
 		}
 		}
+
 	}
 	}
 }
 }
 
 
@@ -385,6 +399,7 @@ func (a *Allocator) getSubnetList(addrSpace AddressSpace, ver ipVersion) []subne
 func (a *Allocator) getAddress(smallSubnet *bitmask, prefAddress net.IP, ver ipVersion) (net.IP, error) {
 func (a *Allocator) getAddress(smallSubnet *bitmask, prefAddress net.IP, ver ipVersion) (net.IP, error) {
 	var (
 	var (
 		bytePos, bitPos int
 		bytePos, bitPos int
+		ordinal         int
 		err             error
 		err             error
 	)
 	)
 	// Look for free IP, skip .0 and .255, they will be automatically reserved
 	// Look for free IP, skip .0 and .255, they will be automatically reserved
@@ -395,19 +410,32 @@ again:
 	if prefAddress == nil {
 	if prefAddress == nil {
 		bytePos, bitPos, err = smallSubnet.addressMask.GetFirstAvailable()
 		bytePos, bitPos, err = smallSubnet.addressMask.GetFirstAvailable()
 	} else {
 	} else {
-		ordinal := ipToInt(getHostPortionIP(prefAddress, smallSubnet.subnet))
+		ordinal = ipToInt(getHostPortionIP(prefAddress, smallSubnet.subnet))
 		bytePos, bitPos, err = smallSubnet.addressMask.CheckIfAvailable(ordinal)
 		bytePos, bitPos, err = smallSubnet.addressMask.CheckIfAvailable(ordinal)
 	}
 	}
 	if err != nil {
 	if err != nil {
 		return nil, ErrNoAvailableIPs
 		return nil, ErrNoAvailableIPs
 	}
 	}
 
 
+pushsame:
 	// Lock it
 	// Lock it
-	smallSubnet.addressMask.PushReservation(bytePos, bitPos, false)
+	if err = smallSubnet.addressMask.PushReservation(bytePos, bitPos, false); err != nil {
+		if _, ok := err.(types.RetryError); !ok {
+			return nil, fmt.Errorf("internal failure while reserving the address: %s", err.Error())
+		}
+		// bitmask view must have changed. Selected address may or may no longer be available
+		if prefAddress != nil {
+			if _, _, err = smallSubnet.addressMask.CheckIfAvailable(ordinal); err == nil {
+				//still available
+				goto pushsame
+			}
+			goto again
+		}
+	}
 	smallSubnet.freeAddresses--
 	smallSubnet.freeAddresses--
 
 
 	// Build IP ordinal
 	// Build IP ordinal
-	ordinal := bitPos + bytePos*8
+	ordinal = bitPos + bytePos*8
 
 
 	// For v4, let reservation of .0 and .255 happen automatically
 	// For v4, let reservation of .0 and .255 happen automatically
 	if ver == v4 && !isValidIP(ordinal) {
 	if ver == v4 && !isValidIP(ordinal) {