浏览代码

bitseq to provide atomic functions

- Also add validation for passed ordinal

Signed-off-by: Alessandro Boch <aboch@docker.com>
Alessandro Boch 10 年之前
父节点
当前提交
01d6585a31
共有 3 个文件被更改,包括 291 次插入85 次删除
  1. 164 25
      libnetwork/bitseq/sequence.go
  2. 126 59
      libnetwork/bitseq/sequence_test.go
  3. 1 1
      libnetwork/bitseq/store.go

+ 164 - 25
libnetwork/bitseq/sequence.go

@@ -9,6 +9,7 @@ import (
 	"sync"
 
 	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/types"
 )
 
 // block sequence constants
@@ -21,6 +22,10 @@ const (
 	invalidPos    = blockMAX
 )
 
+var (
+	errNoBitAvailable = fmt.Errorf("no bit available")
+)
+
 // Handle contains the sequece representing the bitmask and its identifier
 type Handle struct {
 	bits       uint32
@@ -145,7 +150,7 @@ func (s *sequence) toByteArray() ([]byte, error) {
 	return bb, nil
 }
 
-// FromByteArray construct the sequence from the byte array
+// fromByteArray construct the sequence from the byte array
 func (s *sequence) fromByteArray(data []byte) error {
 	l := len(data)
 	if l%8 != 0 {
@@ -168,7 +173,21 @@ func (s *sequence) fromByteArray(data []byte) error {
 	return nil
 }
 
+func (h *Handle) getCopy() *Handle {
+	return &Handle{
+		bits:       h.bits,
+		unselected: h.unselected,
+		head:       h.head.getCopy(),
+		app:        h.app,
+		id:         h.id,
+		dbIndex:    h.dbIndex,
+		dbExists:   h.dbExists,
+		store:      h.store,
+	}
+}
+
 // GetFirstAvailable returns the byte and bit position of the first unset bit
+// @Deprecated Use SetAny() instead
 func (h *Handle) GetFirstAvailable() (uint32, uint32, error) {
 	h.Lock()
 	defer h.Unlock()
@@ -177,45 +196,157 @@ func (h *Handle) GetFirstAvailable() (uint32, uint32, error) {
 
 // CheckIfAvailable checks if the bit correspondent to the specified ordinal is unset
 // If the ordinal is beyond the sequence limits, a negative response is returned
+// @Deprecated Use IsSet() instead
 func (h *Handle) CheckIfAvailable(ordinal uint32) (uint32, uint32, error) {
+	if err := h.validateOrdinal(ordinal); err != nil {
+		return invalidPos, invalidPos, err
+	}
 	h.Lock()
 	defer h.Unlock()
 	return checkIfAvailable(h.head, ordinal)
 }
 
-// PushReservation pushes the bit reservation inside the bitmask.
-func (h *Handle) PushReservation(bytePos, bitPos uint32, release bool) error {
-	// Create a copy of the current handler
-	h.Lock()
-	nh := &Handle{
-		app:      h.app,
-		id:       h.id,
-		store:    h.store,
-		dbIndex:  h.dbIndex,
-		head:     h.head.GetCopy(),
-		dbExists: h.dbExists,
+// SetAny atomically sets the first unset bit in the sequence and returns the corresponding ordinal
+func (h *Handle) SetAny() (uint32, error) {
+	if h.Unselected() == 0 {
+		return invalidPos, errNoBitAvailable
+	}
+	return h.set(0, true, false)
+}
+
+// Set atomically sets the corresponding bit in the sequence
+func (h *Handle) Set(ordinal uint32) error {
+	if err := h.validateOrdinal(ordinal); err != nil {
+		return err
+	}
+	_, err := h.set(ordinal, false, false)
+	return err
+}
+
+// Unset atomically unsets the corresponding bit in the sequence
+func (h *Handle) Unset(ordinal uint32) error {
+	if err := h.validateOrdinal(ordinal); err != nil {
+		return err
 	}
+	_, err := h.set(ordinal, false, true)
+	return err
+}
+
+// IsSet atomically checks if the ordinal bit is set. In case ordinal
+// is outside of the bit sequence limits, false is returned.
+func (h *Handle) IsSet(ordinal uint32) bool {
+	if err := h.validateOrdinal(ordinal); err != nil {
+		return false
+	}
+	h.Lock()
+	_, _, err := checkIfAvailable(h.head, ordinal)
 	h.Unlock()
+	return err != nil
+}
 
-	nh.head = pushReservation(bytePos, bitPos, nh.head, release)
+// set/reset the bit
+func (h *Handle) set(ordinal uint32, any bool, release bool) (uint32, error) {
+	var (
+		bitPos  uint32
+		bytePos uint32
+		ret     uint32
+		err     error
+	)
 
-	err := nh.writeToStore()
-	if err == nil {
-		// Commit went through, save locally
+	for {
 		h.Lock()
-		h.head = nh.head
+		// Get position if available
 		if release {
-			h.unselected++
+			bytePos, bitPos = ordinalToPos(ordinal)
 		} else {
-			h.unselected--
+			if any {
+				bytePos, bitPos, err = getFirstAvailable(h.head)
+				ret = posToOrdinal(bytePos, bitPos)
+			} else {
+				bytePos, bitPos, err = checkIfAvailable(h.head, ordinal)
+				ret = ordinal
+			}
 		}
-		// Can't use SetIndex() since we're locked.
-		h.dbIndex = nh.Index()
-		h.dbExists = true
+		if err != nil {
+			h.Unlock()
+			return ret, err
+		}
+
+		// Create a private copy of h and work on it, also copy the current db index
+		nh := h.getCopy()
+		ci := h.dbIndex
 		h.Unlock()
+
+		nh.head = pushReservation(bytePos, bitPos, nh.head, release)
+		if release {
+			nh.unselected++
+		} else {
+			nh.unselected--
+		}
+
+		// Attempt to write private copy to store
+		if err := nh.writeToStore(); err != nil {
+			if _, ok := err.(types.RetryError); !ok {
+				return ret, fmt.Errorf("internal failure while setting the bit: %v", err)
+			}
+			// Retry
+			continue
+		}
+
+		// Unless unexpected error, save private copy to local copy
+		h.Lock()
+		defer h.Unlock()
+		if h.dbIndex != ci {
+			return ret, fmt.Errorf("unexected database index change")
+		}
+		h.unselected = nh.unselected
+		h.head = nh.head
+		h.dbExists = nh.dbExists
+		h.dbIndex = nh.dbIndex
+		return ret, nil
 	}
+}
 
-	return err
+// checks is needed because to cover the case where the number of bits is not a multiple of blockLen
+func (h *Handle) validateOrdinal(ordinal uint32) error {
+	if ordinal > h.bits {
+		return fmt.Errorf("bit does not belong to the sequence")
+	}
+	return nil
+}
+
+// PushReservation pushes the bit reservation inside the bitmask.
+// @Deprecated Use Set() instead
+func (h *Handle) PushReservation(bytePos, bitPos uint32, release bool) error {
+	// Create a private copy of h and work on it, also copy the current db index
+	h.Lock()
+	nh := h.getCopy()
+	ci := h.dbIndex
+	h.Unlock()
+
+	nh.head = pushReservation(bytePos, bitPos, nh.head, release)
+	if release {
+		nh.unselected++
+	} else {
+		nh.unselected--
+	}
+
+	// Attempt to write private copy to store
+	if err := nh.writeToStore(); err != nil {
+		return err
+	}
+
+	// Unless unexpected error, save private copy to local copy
+	h.Lock()
+	defer h.Unlock()
+	if h.dbIndex != ci {
+		return fmt.Errorf("unexected database index change")
+	}
+	h.unselected = nh.unselected
+	h.head = nh.head
+	h.dbExists = nh.dbExists
+	h.dbIndex = nh.dbIndex
+	return nil
 }
 
 // Destroy removes from the datastore the data belonging to this handle
@@ -285,7 +416,7 @@ func getFirstAvailable(head *sequence) (uint32, uint32, error) {
 		byteIndex += current.count * blockBytes
 		current = current.next
 	}
-	return invalidPos, invalidPos, fmt.Errorf("no bit available")
+	return invalidPos, invalidPos, errNoBitAvailable
 }
 
 // checkIfAvailable checks if the bit correspondent to the specified ordinal is unset
@@ -363,7 +494,7 @@ func pushReservation(bytePos, bitPos uint32, head *sequence, release bool) *sequ
 	}
 
 	// Construct updated block
-	bitSel := uint32(blockFirstBit >> uint(inBlockBytePos*8+bitPos))
+	bitSel := blockFirstBit >> (inBlockBytePos*8 + bitPos)
 	newBlock := current.block
 	if release {
 		newBlock &^= bitSel
@@ -447,3 +578,11 @@ func getNumBlocks(numBits uint32) uint32 {
 	}
 	return numBlocks
 }
+
+func ordinalToPos(ordinal uint32) (uint32, uint32) {
+	return ordinal / 8, ordinal % 8
+}
+
+func posToOrdinal(bytePos, bitPos uint32) uint32 {
+	return bytePos*8 + bitPos
+}

+ 126 - 59
libnetwork/bitseq/sequence_test.go

@@ -90,48 +90,7 @@ func TestSequenceEqual(t *testing.T) {
 }
 
 func TestSequenceCopy(t *testing.T) {
-	s := &sequence{
-		block: 0x0,
-		count: 8,
-		next: &sequence{
-			block: 0x0,
-			count: 8,
-			next: &sequence{
-				block: 0x0,
-				count: 0,
-				next: &sequence{
-					block: 0x0,
-					count: 0,
-					next: &sequence{
-						block: 0x0,
-						count: 2,
-						next: &sequence{
-							block: 0x0,
-							count: 1,
-							next: &sequence{
-								block: 0x0,
-								count: 1,
-								next: &sequence{
-									block: 0x0,
-									count: 2,
-									next: &sequence{
-										block: 0x1,
-										count: 1,
-										next: &sequence{
-											block: 0x0,
-											count: 2,
-											next:  nil,
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		},
-	}
-
+	s := getTestSequence()
 	n := s.getCopy()
 	if !s.equal(n) {
 		t.Fatalf("copy of s failed")
@@ -388,43 +347,151 @@ func TestPushReservation(t *testing.T) {
 }
 
 func TestSerializeDeserialize(t *testing.T) {
-	s := &sequence{
-		block: 0xffffffff,
-		count: 1,
+	s := getTestSequence()
+
+	data, err := s.toByteArray()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	r := &sequence{}
+	err = r.fromByteArray(data)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if !s.equal(r) {
+		t.Fatalf("Sequences are different: \n%v\n%v", s, r)
+	}
+}
+
+func getTestSequence() *sequence {
+	// Returns a custom sequence of 1024 * 32 bits
+	return &sequence{
+		block: 0XFFFFFFFF,
+		count: 100,
 		next: &sequence{
-			block: 0xFF000000,
+			block: 0xFFFFFFFE,
 			count: 1,
 			next: &sequence{
-				block: 0xffffffff,
-				count: 6,
+				block: 0xFF000000,
+				count: 10,
 				next: &sequence{
-					block: 0xffffffff,
-					count: 1,
+					block: 0XFFFFFFFF,
+					count: 50,
 					next: &sequence{
-						block: 0xFF800000,
+						block: 0XFFFFFFFC,
 						count: 1,
 						next: &sequence{
-							block: 0xffffffff,
-							count: 6,
+							block: 0xFF800000,
+							count: 1,
+							next: &sequence{
+								block: 0XFFFFFFFF,
+								count: 87,
+								next: &sequence{
+									block: 0x0,
+									count: 150,
+									next: &sequence{
+										block: 0XFFFFFFFF,
+										count: 200,
+										next: &sequence{
+											block: 0x0000FFFF,
+											count: 1,
+											next: &sequence{
+												block: 0x0,
+												count: 399,
+												next: &sequence{
+													block: 0XFFFFFFFF,
+													count: 23,
+													next: &sequence{
+														block: 0x1,
+														count: 1,
+													},
+												},
+											},
+										},
+									},
+								},
+							},
 						},
 					},
 				},
 			},
 		},
 	}
+}
 
-	data, err := s.toByteArray()
+func TestSet(t *testing.T) {
+	hnd, err := NewHandle("", nil, "", 1024*32)
 	if err != nil {
 		t.Fatal(err)
 	}
+	hnd.head = getTestSequence()
 
-	r := &sequence{}
-	err = r.fromByteArray(data)
+	firstAv := uint32(32*100 + 31)
+	last := uint32(1024*32 - 1)
+
+	if hnd.IsSet(100000) {
+		t.Fatal("IsSet() returned wrong result")
+	}
+
+	if !hnd.IsSet(0) {
+		t.Fatal("IsSet() returned wrong result")
+	}
+
+	if hnd.IsSet(firstAv) {
+		t.Fatal("IsSet() returned wrong result")
+	}
+
+	if !hnd.IsSet(last) {
+		t.Fatal("IsSet() returned wrong result")
+	}
+
+	if err := hnd.Set(0); err == nil {
+		t.Fatalf("Expected failure, but succeeded")
+	}
+
+	os, err := hnd.SetAny()
 	if err != nil {
-		t.Fatal(err)
+		t.Fatalf("Unexpected failure: %v", err)
+	}
+	if os != firstAv {
+		t.Fatalf("SetAny returned unexpected ordinal. Expected %d. Got %d.", firstAv, os)
+	}
+	if !hnd.IsSet(firstAv) {
+		t.Fatal("IsSet() returned wrong result")
 	}
 
-	if !s.equal(r) {
-		t.Fatalf("Sequences are different: \n%v\n%v", s, r)
+	if err := hnd.Unset(firstAv); err != nil {
+		t.Fatalf("Unexpected failure: %v", err)
+	}
+
+	if hnd.IsSet(firstAv) {
+		t.Fatal("IsSet() returned wrong result")
+	}
+
+	if err := hnd.Set(firstAv); err != nil {
+		t.Fatalf("Unexpected failure: %v", err)
+	}
+
+	if err := hnd.Set(last); err == nil {
+		t.Fatalf("Expected failure, but succeeded")
+	}
+}
+
+func TestSetUnset(t *testing.T) {
+	numBits := uint32(64 * 1024)
+	hnd, err := NewHandle("", nil, "", numBits)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// set and unset all one by one
+	for hnd.Unselected() > 0 {
+		hnd.SetAny()
+	}
+	i := uint32(0)
+	for hnd.Unselected() < numBits {
+		hnd.Unset(i)
+		i++
 	}
 }

+ 1 - 1
libnetwork/bitseq/store.go

@@ -122,7 +122,7 @@ func (h *Handle) writeToStore() error {
 	}
 	err := store.PutObjectAtomic(h)
 	if err == datastore.ErrKeyModified {
-		return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err)
+		return types.RetryErrorf("failed to perform atomic write (%v). Retry might fix the error", err)
 	}
 	return err
 }