Datastore handles creating objects atomically.
In that commit, AtomicPutCreate takes previous = nil to Atomically create keys that don't exist. We need a create operation that is atomic to prevent races between multiple libnetworks creating the same object. Previously, we just created new KVs with an index of 0 and wrote them to the datastore. Consul accepts this behaviour and interprets index of 0 as non-existing, but other data backends do no. - Add Exists() to the KV interface. SetIndex() should also modify a KV so that it exists. - Call SetIndex() from within the GetObject() method on DataStore interface. - This ensures objects have the updated values for exists and index. - Add SetValue() to the KV interface. This allows implementers to define their own method to marshall and unmarshall (as bitseq and allocator have). - Update existing users of the DataStore (endpoint, network, bitseq, allocator, ov_network) to new interfaces. - Fix UTs.
This commit is contained in:
parent
ce88039f44
commit
04bd8f67ad
14 changed files with 186 additions and 67 deletions
2
libnetwork/Godeps/Godeps.json
generated
2
libnetwork/Godeps/Godeps.json
generated
|
@ -76,7 +76,7 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/docker/libkv",
|
||||
"Rev": "ab16c3d4a8785a9877c62d0b11ea4441cf09120c"
|
||||
"Rev": "60c7c881345b3c67defc7f93a8297debf041d43c"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/godbus/dbus",
|
||||
|
|
|
@ -28,6 +28,7 @@ type Handle struct {
|
|||
app string
|
||||
id string
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
store datastore.DataStore
|
||||
sync.Mutex
|
||||
}
|
||||
|
@ -54,18 +55,10 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
|
|||
h.watchForChanges()
|
||||
|
||||
// Get the initial status from the ds if present.
|
||||
// We will be getting an instance without a dbIndex
|
||||
// (GetObject() does not set it): It is ok for now,
|
||||
// it will only cause the first allocation on this
|
||||
// node to go through a retry.
|
||||
var bah []byte
|
||||
if err := h.store.GetObject(datastore.Key(h.Key()...), &bah); err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
return nil, err
|
||||
}
|
||||
return h, nil
|
||||
err := h.store.GetObject(datastore.Key(h.Key()...), h)
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
return nil, err
|
||||
}
|
||||
err := h.FromByteArray(bah)
|
||||
|
||||
return h, err
|
||||
}
|
||||
|
@ -199,7 +192,14 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) {
|
|||
func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
|
||||
// Create a copy of the current handler
|
||||
h.Lock()
|
||||
nh := &Handle{app: h.app, id: h.id, store: h.store, dbIndex: h.dbIndex, head: h.head.GetCopy()}
|
||||
nh := &Handle{
|
||||
app: h.app,
|
||||
id: h.id,
|
||||
store: h.store,
|
||||
dbIndex: h.dbIndex,
|
||||
head: h.head.GetCopy(),
|
||||
dbExists: h.dbExists,
|
||||
}
|
||||
h.Unlock()
|
||||
|
||||
nh.head = PushReservation(bytePos, bitPos, nh.head, release)
|
||||
|
@ -214,7 +214,9 @@ func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
|
|||
} else {
|
||||
h.unselected--
|
||||
}
|
||||
h.dbIndex = nh.dbIndex
|
||||
// Can't use SetIndex() since we're locked.
|
||||
h.dbIndex = nh.Index()
|
||||
h.dbExists = true
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
|
@ -276,12 +278,6 @@ func (h *Handle) Unselected() uint32 {
|
|||
return h.unselected
|
||||
}
|
||||
|
||||
func (h *Handle) getDBIndex() uint64 {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
return h.dbIndex
|
||||
}
|
||||
|
||||
// GetFirstAvailable looks for the first unset bit in passed mask
|
||||
func GetFirstAvailable(head *Sequence) (int, int, error) {
|
||||
byteIndex := 0
|
||||
|
|
|
@ -38,6 +38,11 @@ func (h *Handle) Value() []byte {
|
|||
return jv
|
||||
}
|
||||
|
||||
// SetValue unmarshals the data from the KV store
|
||||
func (h *Handle) SetValue(value []byte) error {
|
||||
return h.FromByteArray(value)
|
||||
}
|
||||
|
||||
// Index returns the latest DB Index as seen by this object
|
||||
func (h *Handle) Index() uint64 {
|
||||
h.Lock()
|
||||
|
@ -49,9 +54,17 @@ func (h *Handle) Index() uint64 {
|
|||
func (h *Handle) SetIndex(index uint64) {
|
||||
h.Lock()
|
||||
h.dbIndex = index
|
||||
h.dbExists = true
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
// Exists method is true if this object has been stored in the DB.
|
||||
func (h *Handle) Exists() bool {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
return h.dbExists
|
||||
}
|
||||
|
||||
func (h *Handle) watchForChanges() error {
|
||||
h.Lock()
|
||||
store := h.store
|
||||
|
@ -70,14 +83,12 @@ func (h *Handle) watchForChanges() error {
|
|||
select {
|
||||
case kvPair := <-kvpChan:
|
||||
// Only process remote update
|
||||
if kvPair != nil && (kvPair.LastIndex != h.getDBIndex()) {
|
||||
if kvPair != nil && (kvPair.LastIndex != h.Index()) {
|
||||
err := h.fromDsValue(kvPair.Value)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error())
|
||||
} else {
|
||||
h.Lock()
|
||||
h.dbIndex = kvPair.LastIndex
|
||||
h.Unlock()
|
||||
h.SetIndex(kvPair.LastIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/netlabel"
|
||||
)
|
||||
|
||||
|
@ -57,6 +58,7 @@ type Option func(c *Config)
|
|||
// OptionDefaultNetwork function returns an option setter for a default network
|
||||
func OptionDefaultNetwork(dn string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option DefaultNetwork: %s", dn)
|
||||
c.Daemon.DefaultNetwork = strings.TrimSpace(dn)
|
||||
}
|
||||
}
|
||||
|
@ -64,6 +66,7 @@ func OptionDefaultNetwork(dn string) Option {
|
|||
// OptionDefaultDriver function returns an option setter for default driver
|
||||
func OptionDefaultDriver(dd string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option DefaultDriver: %s", dd)
|
||||
c.Daemon.DefaultDriver = strings.TrimSpace(dd)
|
||||
}
|
||||
}
|
||||
|
@ -82,6 +85,7 @@ func OptionLabels(labels []string) Option {
|
|||
// OptionKVProvider function returns an option setter for kvstore provider
|
||||
func OptionKVProvider(provider string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionKVProvider: %s", provider)
|
||||
c.Datastore.Client.Provider = strings.TrimSpace(provider)
|
||||
}
|
||||
}
|
||||
|
@ -89,6 +93,7 @@ func OptionKVProvider(provider string) Option {
|
|||
// OptionKVProviderURL function returns an option setter for kvstore url
|
||||
func OptionKVProviderURL(url string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionKVProviderURL: %s", url)
|
||||
c.Datastore.Client.Address = strings.TrimSpace(url)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -262,6 +262,7 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
|
|||
}
|
||||
|
||||
if err := c.updateNetworkToStore(network); err != nil {
|
||||
log.Warnf("couldnt create network %s: %v", network.name, err)
|
||||
if e := network.Delete(); e != nil {
|
||||
log.Warnf("couldnt cleanup network %s: %v", network.name, err)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package datastore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
|
@ -14,9 +13,7 @@ import (
|
|||
//DataStore exported
|
||||
type DataStore interface {
|
||||
// GetObject gets data from datastore and unmarshals to the specified object
|
||||
GetObject(key string, o interface{}) error
|
||||
// GetUpdatedObject gets data from datastore along with its index and unmarshals to the specified object
|
||||
GetUpdatedObject(key string, o interface{}) (uint64, error)
|
||||
GetObject(key string, o KV) error
|
||||
// PutObject adds a new Record based on an object into the datastore
|
||||
PutObject(kvObject KV) error
|
||||
// PutObjectAtomic provides an atomic add and update operation for a Record
|
||||
|
@ -49,10 +46,15 @@ type KV interface {
|
|||
KeyPrefix() []string
|
||||
// Value method lets an object to marshal its content to be stored in the KV store
|
||||
Value() []byte
|
||||
// SetValue is used by the datastore to set the object's value when loaded from the data store.
|
||||
SetValue([]byte) error
|
||||
// Index method returns the latest DB Index as seen by the object
|
||||
Index() uint64
|
||||
// SetIndex method allows the datastore to store the latest DB Index into the object
|
||||
SetIndex(uint64)
|
||||
// True if the object exists in the datastore, false if it hasn't been stored yet.
|
||||
// When SetIndex() is called, the object has been stored.
|
||||
Exists() bool
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -121,7 +123,12 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error {
|
|||
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
|
||||
}
|
||||
|
||||
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
|
||||
var previous *store.KVPair
|
||||
if kvObject.Exists() {
|
||||
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
|
||||
} else {
|
||||
previous = nil
|
||||
}
|
||||
_, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -149,24 +156,20 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
|
|||
}
|
||||
|
||||
// GetObject returns a record matching the key
|
||||
func (ds *datastore) GetObject(key string, o interface{}) error {
|
||||
func (ds *datastore) GetObject(key string, o KV) error {
|
||||
kvPair, err := ds.store.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Unmarshal(kvPair.Value, o)
|
||||
}
|
||||
|
||||
// GetUpdateObject returns a record matching the key
|
||||
func (ds *datastore) GetUpdatedObject(key string, o interface{}) (uint64, error) {
|
||||
kvPair, err := ds.store.Get(key)
|
||||
err = o.SetValue(kvPair.Value)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
if err := json.Unmarshal(kvPair.Value, o); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return kvPair.LastIndex, nil
|
||||
|
||||
// Make sure the object has a correct view of the DB index in case we need to modify it
|
||||
// and update the DB.
|
||||
o.SetIndex(kvPair.LastIndex)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteObject unconditionally deletes a record from the store
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/docker/libnetwork/config"
|
||||
_ "github.com/docker/libnetwork/netutils"
|
||||
"github.com/docker/libnetwork/options"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var dummyKey = "dummy"
|
||||
|
@ -69,16 +70,18 @@ func TestKVObjectFlatKey(t *testing.T) {
|
|||
func TestAtomicKVObjectFlatKey(t *testing.T) {
|
||||
store := NewTestDataStore()
|
||||
expected := dummyKVObject("1111", true)
|
||||
assert.False(t, expected.Exists())
|
||||
err := store.PutObjectAtomic(expected)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.True(t, expected.Exists())
|
||||
|
||||
// PutObjectAtomic automatically sets the Index again. Hence the following must pass.
|
||||
|
||||
err = store.PutObjectAtomic(expected)
|
||||
if err != nil {
|
||||
t.Fatal("Atomic update with an older Index must fail")
|
||||
t.Fatal("Atomic update should succeed.")
|
||||
}
|
||||
|
||||
// Get the latest index and try PutObjectAtomic again for the same Key
|
||||
|
@ -90,12 +93,22 @@ func TestAtomicKVObjectFlatKey(t *testing.T) {
|
|||
n := dummyObject{}
|
||||
json.Unmarshal(data.Value, &n)
|
||||
n.ID = "1111"
|
||||
n.DBIndex = data.LastIndex
|
||||
n.SetIndex(data.LastIndex)
|
||||
n.ReturnValue = true
|
||||
err = store.PutObjectAtomic(&n)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get the Object using GetObject, then set again.
|
||||
newObj := dummyObject{}
|
||||
err = store.GetObject(Key(expected.Key()...), &newObj)
|
||||
assert.True(t, newObj.Exists())
|
||||
err = store.PutObjectAtomic(&n)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// dummy data used to test the datastore
|
||||
|
@ -108,6 +121,7 @@ type dummyObject struct {
|
|||
Generic options.Generic `kv:"iterative"`
|
||||
ID string
|
||||
DBIndex uint64
|
||||
DBExists bool
|
||||
ReturnValue bool
|
||||
}
|
||||
|
||||
|
@ -131,12 +145,21 @@ func (n *dummyObject) Value() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
func (n *dummyObject) SetValue(value []byte) error {
|
||||
return json.Unmarshal(value, n)
|
||||
}
|
||||
|
||||
func (n *dummyObject) Index() uint64 {
|
||||
return n.DBIndex
|
||||
}
|
||||
|
||||
func (n *dummyObject) SetIndex(index uint64) {
|
||||
n.DBIndex = index
|
||||
n.DBExists = true
|
||||
}
|
||||
|
||||
func (n *dummyObject) Exists() bool {
|
||||
return n.DBExists
|
||||
}
|
||||
|
||||
func (n *dummyObject) MarshalJSON() ([]byte, error) {
|
||||
|
@ -162,10 +185,11 @@ func (n *dummyObject) UnmarshalJSON(b []byte) (err error) {
|
|||
|
||||
// dummy structure to test "recursive" cases
|
||||
type recStruct struct {
|
||||
Name string `kv:"leaf"`
|
||||
Field1 int `kv:"leaf"`
|
||||
Dict map[string]string `kv:"iterative"`
|
||||
DBIndex uint64
|
||||
Name string `kv:"leaf"`
|
||||
Field1 int `kv:"leaf"`
|
||||
Dict map[string]string `kv:"iterative"`
|
||||
DBIndex uint64
|
||||
DBExists bool
|
||||
}
|
||||
|
||||
func (r *recStruct) Key() []string {
|
||||
|
@ -179,12 +203,21 @@ func (r *recStruct) Value() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
func (r *recStruct) SetValue(value []byte) error {
|
||||
return json.Unmarshal(value, r)
|
||||
}
|
||||
|
||||
func (r *recStruct) Index() uint64 {
|
||||
return r.DBIndex
|
||||
}
|
||||
|
||||
func (r *recStruct) SetIndex(index uint64) {
|
||||
r.DBIndex = index
|
||||
r.DBExists = true
|
||||
}
|
||||
|
||||
func (r *recStruct) Exists() bool {
|
||||
return r.DBExists
|
||||
}
|
||||
|
||||
func dummyKVObject(id string, retValue bool) *dummyObject {
|
||||
|
@ -195,12 +228,13 @@ func dummyKVObject(id string, retValue bool) *dummyObject {
|
|||
Name: "testNw",
|
||||
NetworkType: "bridge",
|
||||
EnableIPv6: true,
|
||||
Rec: &recStruct{"gen", 5, cDict, 0},
|
||||
Rec: &recStruct{"gen", 5, cDict, 0, false},
|
||||
ID: id,
|
||||
DBIndex: 0,
|
||||
ReturnValue: retValue}
|
||||
ReturnValue: retValue,
|
||||
DBExists: false}
|
||||
generic := make(map[string]interface{})
|
||||
generic["label1"] = &recStruct{"value1", 1, cDict, 0}
|
||||
generic["label1"] = &recStruct{"value1", 1, cDict, 0, false}
|
||||
generic["label2"] = "subnet=10.1.1.0/16"
|
||||
n.Generic = generic
|
||||
return &n
|
||||
|
|
|
@ -93,8 +93,18 @@ func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locke
|
|||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
||||
mData := s.db[key]
|
||||
if mData != nil && mData.Index != previous.LastIndex {
|
||||
return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index")
|
||||
|
||||
if previous == nil {
|
||||
if mData != nil {
|
||||
return false, nil, types.BadRequestErrorf("atomic put failed because key exists")
|
||||
} // Else OK.
|
||||
} else {
|
||||
if mData == nil {
|
||||
return false, nil, types.BadRequestErrorf("atomic put failed because key exists")
|
||||
}
|
||||
if mData != nil && mData.Index != previous.LastIndex {
|
||||
return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index")
|
||||
} // Else OK.
|
||||
}
|
||||
err := s.Put(key, newValue, nil)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,6 +22,7 @@ type network struct {
|
|||
id types.UUID
|
||||
vni uint32
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
sbox sandbox.Sandbox
|
||||
endpoints endpointTable
|
||||
ipAllocator *ipallocator.IPAllocator
|
||||
|
@ -260,6 +261,20 @@ func (n *network) Index() uint64 {
|
|||
|
||||
func (n *network) SetIndex(index uint64) {
|
||||
n.dbIndex = index
|
||||
n.dbExists = true
|
||||
}
|
||||
|
||||
func (n *network) Exists() bool {
|
||||
return n.dbExists
|
||||
}
|
||||
|
||||
func (n *network) SetValue(value []byte) error {
|
||||
var vni uint32
|
||||
err := json.Unmarshal(value, &vni)
|
||||
if err == nil {
|
||||
n.setVxlanID(vni)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (n *network) writeToStore() error {
|
||||
|
@ -297,8 +312,7 @@ func (n *network) obtainVxlanID() error {
|
|||
|
||||
for {
|
||||
var vxlanID uint32
|
||||
if err := n.driver.store.GetObject(datastore.Key(n.Key()...),
|
||||
&vxlanID); err != nil {
|
||||
if err := n.driver.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
|
||||
if err == datastore.ErrKeyNotFound {
|
||||
vxlanID, err = n.driver.vxlanIdm.GetID()
|
||||
if err != nil {
|
||||
|
@ -318,8 +332,6 @@ func (n *network) obtainVxlanID() error {
|
|||
}
|
||||
return fmt.Errorf("failed to obtain vxlan id from data store: %v", err)
|
||||
}
|
||||
|
||||
n.setVxlanID(vxlanID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,6 +124,7 @@ type endpoint struct {
|
|||
generic map[string]interface{}
|
||||
joinLeaveDone chan struct{}
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -258,6 +259,10 @@ func (ep *endpoint) Value() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
func (ep *endpoint) SetValue(value []byte) error {
|
||||
return json.Unmarshal(value, ep)
|
||||
}
|
||||
|
||||
func (ep *endpoint) Index() uint64 {
|
||||
ep.Lock()
|
||||
defer ep.Unlock()
|
||||
|
@ -268,6 +273,13 @@ func (ep *endpoint) SetIndex(index uint64) {
|
|||
ep.Lock()
|
||||
defer ep.Unlock()
|
||||
ep.dbIndex = index
|
||||
ep.dbExists = true
|
||||
}
|
||||
|
||||
func (ep *endpoint) Exists() bool {
|
||||
ep.Lock()
|
||||
defer ep.Unlock()
|
||||
return ep.dbExists
|
||||
}
|
||||
|
||||
func (ep *endpoint) processOptions(options ...EndpointOption) {
|
||||
|
|
|
@ -35,10 +35,11 @@ type Allocator struct {
|
|||
// Allocated addresses in each address space's internal subnet
|
||||
addresses map[subnetKey]*bitseq.Handle
|
||||
// Datastore
|
||||
store datastore.DataStore
|
||||
App string
|
||||
ID string
|
||||
dbIndex uint64
|
||||
store datastore.DataStore
|
||||
App string
|
||||
ID string
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -100,6 +101,7 @@ func (a *Allocator) subnetConfigFromStore(kvPair *store.KVPair) {
|
|||
if a.dbIndex < kvPair.LastIndex {
|
||||
a.subnets = byteArrayToSubnets(kvPair.Value)
|
||||
a.dbIndex = kvPair.LastIndex
|
||||
a.dbExists = true
|
||||
}
|
||||
a.Unlock()
|
||||
}
|
||||
|
|
|
@ -39,6 +39,12 @@ func (a *Allocator) Value() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
// SetValue unmarshalls the data from the KV store.
|
||||
func (a *Allocator) SetValue(value []byte) error {
|
||||
a.subnets = byteArrayToSubnets(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func subnetsToByteArray(m map[subnetKey]*SubnetInfo) ([]byte, error) {
|
||||
if m == nil {
|
||||
return nil, nil
|
||||
|
@ -94,9 +100,17 @@ func (a *Allocator) Index() uint64 {
|
|||
func (a *Allocator) SetIndex(index uint64) {
|
||||
a.Lock()
|
||||
a.dbIndex = index
|
||||
a.dbExists = true
|
||||
a.Unlock()
|
||||
}
|
||||
|
||||
// Exists method is true if this object has been stored in the DB.
|
||||
func (a *Allocator) Exists() bool {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
return a.dbExists
|
||||
}
|
||||
|
||||
func (a *Allocator) watchForChanges() error {
|
||||
if a.store == nil {
|
||||
return nil
|
||||
|
|
|
@ -67,6 +67,7 @@ type network struct {
|
|||
generic options.Generic
|
||||
dbIndex uint64
|
||||
svcRecords svcMap
|
||||
dbExists bool
|
||||
stopWatchCh chan struct{}
|
||||
sync.Mutex
|
||||
}
|
||||
|
@ -116,6 +117,10 @@ func (n *network) Value() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
func (n *network) SetValue(value []byte) error {
|
||||
return json.Unmarshal(value, n)
|
||||
}
|
||||
|
||||
func (n *network) Index() uint64 {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
@ -125,9 +130,16 @@ func (n *network) Index() uint64 {
|
|||
func (n *network) SetIndex(index uint64) {
|
||||
n.Lock()
|
||||
n.dbIndex = index
|
||||
n.dbExists = true
|
||||
n.Unlock()
|
||||
}
|
||||
|
||||
func (n *network) Exists() bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.dbExists
|
||||
}
|
||||
|
||||
func (n *network) EndpointCnt() uint64 {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
@ -292,7 +304,9 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
|
|||
return nil, types.ForbiddenErrorf("service endpoint with name %s already exists", name)
|
||||
}
|
||||
|
||||
ep := &endpoint{name: name, iFaces: []*endpointInterface{}, generic: make(map[string]interface{})}
|
||||
ep := &endpoint{name: name,
|
||||
iFaces: []*endpointInterface{},
|
||||
generic: make(map[string]interface{})}
|
||||
ep.id = types.UUID(stringid.GenerateRandomID())
|
||||
ep.network = n
|
||||
ep.processOptions(options...)
|
||||
|
|
|
@ -197,6 +197,7 @@ func (c *controller) watchNetworks() error {
|
|||
}
|
||||
}
|
||||
c.processNetworkUpdate(nws, &tmpview)
|
||||
|
||||
// Delete processing
|
||||
for k := range tmpview {
|
||||
c.Lock()
|
||||
|
@ -259,7 +260,7 @@ func (n *network) watchEndpoints() error {
|
|||
continue
|
||||
}
|
||||
delete(tmpview, ep.id)
|
||||
ep.dbIndex = epe.LastIndex
|
||||
ep.SetIndex(epe.LastIndex)
|
||||
ep.network = n
|
||||
if n.ctrlr.processEndpointUpdate(&ep) {
|
||||
err = n.ctrlr.newEndpointFromStore(epe.Key, &ep)
|
||||
|
@ -310,15 +311,17 @@ func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTab
|
|||
if prune != nil {
|
||||
delete(*prune, n.id)
|
||||
}
|
||||
n.dbIndex = kve.LastIndex
|
||||
n.SetIndex(kve.LastIndex)
|
||||
c.Lock()
|
||||
existing, ok := c.networks[n.id]
|
||||
c.Unlock()
|
||||
if ok {
|
||||
existing.Lock()
|
||||
// Skip existing network update
|
||||
if existing.dbIndex != n.dbIndex {
|
||||
existing.dbIndex = n.dbIndex
|
||||
if existing.dbIndex != n.Index() {
|
||||
// Can't use SetIndex() since existing is locked.
|
||||
existing.dbIndex = n.Index()
|
||||
existing.dbExists = true
|
||||
existing.endpointCnt = n.endpointCnt
|
||||
}
|
||||
existing.Unlock()
|
||||
|
@ -353,8 +356,10 @@ func (c *controller) processEndpointUpdate(ep *endpoint) bool {
|
|||
|
||||
ee := existing.(*endpoint)
|
||||
ee.Lock()
|
||||
if ee.dbIndex != ep.dbIndex {
|
||||
ee.dbIndex = ep.dbIndex
|
||||
if ee.dbIndex != ep.Index() {
|
||||
// Can't use SetIndex() because ee is locked.
|
||||
ee.dbIndex = ep.Index()
|
||||
ee.dbExists = true
|
||||
if ee.container != nil && ep.container != nil {
|
||||
// we care only about the container id
|
||||
ee.container.id = ep.container.id
|
||||
|
|
Loading…
Add table
Reference in a new issue