浏览代码

Fix datastore value handling in bitseq

- and do not discard errors

Signed-off-by: Alessandro Boch <aboch@docker.com>
Alessandro Boch 10 年之前
父节点
当前提交
cc6fb95c0c
共有 3 个文件被更改,包括 48 次插入16 次删除
  1. 17 10
      libnetwork/bitseq/sequence.go
  2. 30 5
      libnetwork/bitseq/store.go
  3. 1 1
      libnetwork/idm/idm.go

+ 17 - 10
libnetwork/bitseq/sequence.go

@@ -7,7 +7,6 @@ import (
 	"fmt"
 	"fmt"
 	"sync"
 	"sync"
 
 
-	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/netutils"
 	"github.com/docker/libnetwork/netutils"
 )
 )
@@ -59,13 +58,16 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
 	// (GetObject() does not set it): It is ok for now,
 	// (GetObject() does not set it): It is ok for now,
 	// it will only cause the first allocation on this
 	// it will only cause the first allocation on this
 	// node to go through a retry.
 	// node to go through a retry.
-	var bs []byte
-	if err := h.store.GetObject(datastore.Key(h.Key()...), bs); err == nil {
-		h.FromByteArray(bs)
-	} else if err != store.ErrKeyNotFound {
-		return nil, err
+	var bah []byte
+	if err := h.store.GetObject(datastore.Key(h.Key()...), &bah); err != nil {
+		if err != datastore.ErrKeyNotFound {
+			return nil, err
+		}
+		return h, nil
 	}
 	}
-	return h, nil
+	err := h.FromByteArray(bah)
+
+	return h, err
 }
 }
 
 
 // Sequence reresents a recurring sequence of 32 bits long bitmasks
 // Sequence reresents a recurring sequence of 32 bits long bitmasks
@@ -159,7 +161,7 @@ func (s *Sequence) ToByteArray() ([]byte, error) {
 func (s *Sequence) FromByteArray(data []byte) error {
 func (s *Sequence) FromByteArray(data []byte) error {
 	l := len(data)
 	l := len(data)
 	if l%8 != 0 {
 	if l%8 != 0 {
-		return fmt.Errorf("cannot deserialize byte sequence of lenght %d", l)
+		return fmt.Errorf("cannot deserialize byte sequence of lenght %d (%v)", l, data)
 	}
 	}
 
 
 	p := s
 	p := s
@@ -212,6 +214,7 @@ func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
 		} else {
 		} else {
 			h.unselected--
 			h.unselected--
 		}
 		}
+		h.dbIndex = nh.dbIndex
 		h.Unlock()
 		h.Unlock()
 	}
 	}
 
 
@@ -233,7 +236,7 @@ func (h *Handle) ToByteArray() ([]byte, error) {
 	copy(ba[4:8], netutils.U32ToA(h.unselected))
 	copy(ba[4:8], netutils.U32ToA(h.unselected))
 	bm, err := h.head.ToByteArray()
 	bm, err := h.head.ToByteArray()
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("failed to serialize head: %s", err.Error())
 	}
 	}
 	ba = append(ba, bm...)
 	ba = append(ba, bm...)
 
 
@@ -242,10 +245,14 @@ func (h *Handle) ToByteArray() ([]byte, error) {
 
 
 // FromByteArray reads his handle's data from a byte array
 // FromByteArray reads his handle's data from a byte array
 func (h *Handle) FromByteArray(ba []byte) error {
 func (h *Handle) FromByteArray(ba []byte) error {
+	if ba == nil {
+		return fmt.Errorf("nil byte array")
+	}
+
 	nh := &Sequence{}
 	nh := &Sequence{}
 	err := nh.FromByteArray(ba[8:])
 	err := nh.FromByteArray(ba[8:])
 	if err != nil {
 	if err != nil {
-		return err
+		return fmt.Errorf("failed to deserialize head: %s", err.Error())
 	}
 	}
 
 
 	h.Lock()
 	h.Lock()

+ 30 - 5
libnetwork/bitseq/store.go

@@ -1,6 +1,10 @@
 package bitseq
 package bitseq
 
 
 import (
 import (
+	"encoding/json"
+	"fmt"
+
+	log "github.com/Sirupsen/logrus"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
 )
 )
@@ -23,9 +27,15 @@ func (h *Handle) KeyPrefix() []string {
 func (h *Handle) Value() []byte {
 func (h *Handle) Value() []byte {
 	b, err := h.ToByteArray()
 	b, err := h.ToByteArray()
 	if err != nil {
 	if err != nil {
+		log.Warnf("Failed to serialize Handle: %v", err)
+		b = []byte{}
+	}
+	jv, err := json.Marshal(b)
+	if err != nil {
+		log.Warnf("Failed to json encode bitseq handler byte array: %v", err)
 		return []byte{}
 		return []byte{}
 	}
 	}
-	return b
+	return jv
 }
 }
 
 
 // Index returns the latest DB Index as seen by this object
 // Index returns the latest DB Index as seen by this object
@@ -61,10 +71,14 @@ func (h *Handle) watchForChanges() error {
 			case kvPair := <-kvpChan:
 			case kvPair := <-kvpChan:
 				// Only process remote update
 				// Only process remote update
 				if kvPair != nil && (kvPair.LastIndex != h.getDBIndex()) {
 				if kvPair != nil && (kvPair.LastIndex != h.getDBIndex()) {
-					h.Lock()
-					h.dbIndex = kvPair.LastIndex
-					h.Unlock()
-					h.FromByteArray(kvPair.Value)
+					err := h.fromDsValue(kvPair.Value)
+					if err != nil {
+						log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error())
+					} else {
+						h.Lock()
+						h.dbIndex = kvPair.LastIndex
+						h.Unlock()
+					}
 				}
 				}
 			}
 			}
 		}
 		}
@@ -72,6 +86,17 @@ func (h *Handle) watchForChanges() error {
 	return nil
 	return nil
 }
 }
 
 
+func (h *Handle) fromDsValue(value []byte) error {
+	var ba []byte
+	if err := json.Unmarshal(value, &ba); err != nil {
+		return fmt.Errorf("failed to decode json: %s", err.Error())
+	}
+	if err := h.FromByteArray(ba); err != nil {
+		return fmt.Errorf("failed to decode handle: %s", err.Error())
+	}
+	return nil
+}
+
 func (h *Handle) writeToStore() error {
 func (h *Handle) writeToStore() error {
 	h.Lock()
 	h.Lock()
 	store := h.store
 	store := h.store

+ 1 - 1
libnetwork/idm/idm.go

@@ -27,7 +27,7 @@ func New(ds datastore.DataStore, id string, start, end uint32) (*Idm, error) {
 
 
 	h, err := bitseq.NewHandle("idm", ds, id, uint32(1+end-start))
 	h, err := bitseq.NewHandle("idm", ds, id, uint32(1+end-start))
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("failed to initialize bit sequence handler: %s", err.Error())
 	}
 	}
 
 
 	return &Idm{start: start, end: end, handle: h}, nil
 	return &Idm{start: start, end: end, handle: h}, nil