Bladeren bron

Merge pull request #633 from mrjana/bugs

Separate endpoint count into a different object
Madhu Venugopal 9 jaren geleden
bovenliggende
commit
c454b1084d

+ 19 - 13
libnetwork/controller.go

@@ -355,24 +355,30 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 		}
 		}
 	}()
 	}()
 
 
-	// addNetwork can be called for local scope network lazily when
-	// an endpoint is created after a restart and the network was
-	// created in previous life. Make sure you wrap around the driver
-	// notification of network creation in once call so that the driver
-	// invoked only once in case both the network and endpoint creation
-	// happens in the same lifetime.
-	network.drvOnce.Do(func() {
-		err = c.addNetwork(network)
-	})
-	if err != nil {
+	if err := c.addNetwork(network); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	defer func() {
+		if err != nil {
+			if e := network.deleteNetwork(); e != nil {
+				log.Warnf("couldn't roll back driver network on network %s creation failure: %v", network.name, err)
+			}
+		}
+	}()
 
 
 	if err = c.updateToStore(network); err != nil {
 	if err = c.updateToStore(network); err != nil {
-		log.Warnf("couldnt create network %s: %v", network.name, err)
-		if e := network.Delete(); e != nil {
-			log.Warnf("couldnt cleanup network %s on network create failure (%v): %v", network.name, err, e)
+		return nil, err
+	}
+	defer func() {
+		if err != nil {
+			if e := c.deleteFromStore(network); e != nil {
+				log.Warnf("couldnt rollback from store, network %s on failure (%v): %v", network.name, err, e)
+			}
 		}
 		}
+	}()
+
+	network.epCnt = &endpointCnt{n: network}
+	if err = c.updateToStore(network.epCnt); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 

+ 22 - 0
libnetwork/datastore/datastore.go

@@ -5,6 +5,7 @@ import (
 	"log"
 	"log"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
+	"sync"
 
 
 	"github.com/docker/libkv"
 	"github.com/docker/libkv"
 	"github.com/docker/libkv/store"
 	"github.com/docker/libkv/store"
@@ -55,6 +56,7 @@ type datastore struct {
 	scope string
 	scope string
 	store store.Store
 	store store.Store
 	cache *cache
 	cache *cache
+	sync.Mutex
 }
 }
 
 
 // KVObject is  Key/Value interface used by objects to be part of the DataStore
 // KVObject is  Key/Value interface used by objects to be part of the DataStore
@@ -287,6 +289,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
 		pair     *store.KVPair
 		pair     *store.KVPair
 		err      error
 		err      error
 	)
 	)
+	ds.Lock()
+	defer ds.Unlock()
 
 
 	if kvObject == nil {
 	if kvObject == nil {
 		return types.BadRequestErrorf("invalid KV Object : nil")
 		return types.BadRequestErrorf("invalid KV Object : nil")
@@ -325,6 +329,9 @@ add_cache:
 
 
 // PutObject adds a new Record based on an object into the datastore
 // PutObject adds a new Record based on an object into the datastore
 func (ds *datastore) PutObject(kvObject KVObject) error {
 func (ds *datastore) PutObject(kvObject KVObject) error {
+	ds.Lock()
+	defer ds.Unlock()
+
 	if kvObject == nil {
 	if kvObject == nil {
 		return types.BadRequestErrorf("invalid KV Object : nil")
 		return types.BadRequestErrorf("invalid KV Object : nil")
 	}
 	}
@@ -356,6 +363,9 @@ func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
 
 
 // GetObject returns a record matching the key
 // GetObject returns a record matching the key
 func (ds *datastore) GetObject(key string, o KVObject) error {
 func (ds *datastore) GetObject(key string, o KVObject) error {
+	ds.Lock()
+	defer ds.Unlock()
+
 	if ds.cache != nil {
 	if ds.cache != nil {
 		return ds.cache.get(key, o)
 		return ds.cache.get(key, o)
 	}
 	}
@@ -387,6 +397,9 @@ func (ds *datastore) ensureKey(key string) error {
 }
 }
 
 
 func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
 func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
+	ds.Lock()
+	defer ds.Unlock()
+
 	if ds.cache != nil {
 	if ds.cache != nil {
 		return ds.cache.list(kvObject)
 		return ds.cache.list(kvObject)
 	}
 	}
@@ -430,6 +443,9 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
 
 
 // DeleteObject unconditionally deletes a record from the store
 // DeleteObject unconditionally deletes a record from the store
 func (ds *datastore) DeleteObject(kvObject KVObject) error {
 func (ds *datastore) DeleteObject(kvObject KVObject) error {
+	ds.Lock()
+	defer ds.Unlock()
+
 	// cleaup the cache first
 	// cleaup the cache first
 	if ds.cache != nil {
 	if ds.cache != nil {
 		ds.cache.del(kvObject)
 		ds.cache.del(kvObject)
@@ -444,6 +460,9 @@ func (ds *datastore) DeleteObject(kvObject KVObject) error {
 
 
 // DeleteObjectAtomic performs atomic delete on a record
 // DeleteObjectAtomic performs atomic delete on a record
 func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
 func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
+	ds.Lock()
+	defer ds.Unlock()
+
 	if kvObject == nil {
 	if kvObject == nil {
 		return types.BadRequestErrorf("invalid KV Object : nil")
 		return types.BadRequestErrorf("invalid KV Object : nil")
 	}
 	}
@@ -469,6 +488,9 @@ del_cache:
 
 
 // DeleteTree unconditionally deletes a record from the store
 // DeleteTree unconditionally deletes a record from the store
 func (ds *datastore) DeleteTree(kvObject KVObject) error {
 func (ds *datastore) DeleteTree(kvObject KVObject) error {
+	ds.Lock()
+	defer ds.Unlock()
+
 	// cleaup the cache first
 	// cleaup the cache first
 	if ds.cache != nil {
 	if ds.cache != nil {
 		ds.cache.del(kvObject)
 		ds.cache.del(kvObject)

+ 2 - 2
libnetwork/endpoint.go

@@ -484,12 +484,12 @@ func (ep *endpoint) Delete() error {
 	}
 	}
 	ep.Unlock()
 	ep.Unlock()
 
 
-	if err = n.DecEndpointCnt(); err != nil {
+	if err = n.getEpCnt().DecEndpointCnt(); err != nil {
 		return err
 		return err
 	}
 	}
 	defer func() {
 	defer func() {
 		if err != nil {
 		if err != nil {
-			if e := n.IncEndpointCnt(); e != nil {
+			if e := n.getEpCnt().IncEndpointCnt(); e != nil {
 				log.Warnf("failed to update network %s : %v", n.name, e)
 				log.Warnf("failed to update network %s : %v", n.name, e)
 			}
 			}
 		}
 		}

+ 147 - 0
libnetwork/endpoint_cnt.go

@@ -0,0 +1,147 @@
+package libnetwork
+
+import (
+	"encoding/json"
+	"fmt"
+	"sync"
+
+	"github.com/docker/libnetwork/datastore"
+)
+
+type endpointCnt struct {
+	n        *network
+	Count    uint64
+	dbIndex  uint64
+	dbExists bool
+	sync.Mutex
+}
+
+const epCntKeyPrefix = "endpoint_count"
+
+func (ec *endpointCnt) Key() []string {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return []string{epCntKeyPrefix, ec.n.id}
+}
+
+func (ec *endpointCnt) KeyPrefix() []string {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return []string{epCntKeyPrefix, ec.n.id}
+}
+
+func (ec *endpointCnt) Value() []byte {
+	ec.Lock()
+	defer ec.Unlock()
+
+	b, err := json.Marshal(ec)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (ec *endpointCnt) SetValue(value []byte) error {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return json.Unmarshal(value, &ec)
+}
+
+func (ec *endpointCnt) Index() uint64 {
+	ec.Lock()
+	defer ec.Unlock()
+	return ec.dbIndex
+}
+
+func (ec *endpointCnt) SetIndex(index uint64) {
+	ec.Lock()
+	ec.dbIndex = index
+	ec.dbExists = true
+	ec.Unlock()
+}
+
+func (ec *endpointCnt) Exists() bool {
+	ec.Lock()
+	defer ec.Unlock()
+	return ec.dbExists
+}
+
+func (ec *endpointCnt) Skip() bool {
+	ec.Lock()
+	defer ec.Unlock()
+	return !ec.n.persist
+}
+
+func (ec *endpointCnt) New() datastore.KVObject {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return &endpointCnt{
+		n: ec.n,
+	}
+}
+
+func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
+	ec.Lock()
+	defer ec.Unlock()
+
+	dstEc := o.(*endpointCnt)
+	dstEc.n = ec.n
+	dstEc.Count = ec.Count
+	dstEc.dbExists = ec.dbExists
+	dstEc.dbIndex = ec.dbIndex
+
+	return nil
+}
+
+func (ec *endpointCnt) DataScope() string {
+	return ec.n.DataScope()
+}
+
+func (ec *endpointCnt) EndpointCnt() uint64 {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return ec.Count
+}
+
+func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
+retry:
+	ec.Lock()
+	if inc {
+		ec.Count++
+	} else {
+		ec.Count--
+	}
+	ec.Unlock()
+
+	store := ec.n.getController().getStore(ec.DataScope())
+	if store == nil {
+		return fmt.Errorf("store not found for scope %s", ec.DataScope())
+	}
+
+	if err := ec.n.getController().updateToStore(ec); err != nil {
+		if err == datastore.ErrKeyModified {
+			if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
+				return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
+			}
+
+			goto retry
+		}
+
+		return err
+	}
+
+	return nil
+}
+
+func (ec *endpointCnt) IncEndpointCnt() error {
+	return ec.atomicIncDecEpCnt(true)
+}
+
+func (ec *endpointCnt) DecEndpointCnt() error {
+	return ec.atomicIncDecEpCnt(false)
+}

+ 1 - 2
libnetwork/libnetwork_internal_test.go

@@ -39,7 +39,6 @@ func TestNetworkMarshalling(t *testing.T) {
 		ipamType:    "default",
 		ipamType:    "default",
 		addrSpace:   "viola",
 		addrSpace:   "viola",
 		networkType: "bridge",
 		networkType: "bridge",
-		endpointCnt: 27,
 		enableIPv6:  true,
 		enableIPv6:  true,
 		persist:     true,
 		persist:     true,
 		ipamV4Config: []*IpamConf{
 		ipamV4Config: []*IpamConf{
@@ -142,7 +141,7 @@ func TestNetworkMarshalling(t *testing.T) {
 	}
 	}
 
 
 	if n.name != nn.name || n.id != nn.id || n.networkType != nn.networkType || n.ipamType != nn.ipamType ||
 	if n.name != nn.name || n.id != nn.id || n.networkType != nn.networkType || n.ipamType != nn.ipamType ||
-		n.addrSpace != nn.addrSpace || n.endpointCnt != nn.endpointCnt || n.enableIPv6 != nn.enableIPv6 ||
+		n.addrSpace != nn.addrSpace || n.enableIPv6 != nn.enableIPv6 ||
 		n.persist != nn.persist || !compareIpamConfList(n.ipamV4Config, nn.ipamV4Config) ||
 		n.persist != nn.persist || !compareIpamConfList(n.ipamV4Config, nn.ipamV4Config) ||
 		!compareIpamInfoList(n.ipamV4Info, nn.ipamV4Info) || !compareIpamConfList(n.ipamV6Config, nn.ipamV6Config) ||
 		!compareIpamInfoList(n.ipamV4Info, nn.ipamV4Info) || !compareIpamConfList(n.ipamV6Config, nn.ipamV6Config) ||
 		!compareIpamInfoList(n.ipamV6Info, nn.ipamV6Info) {
 		!compareIpamInfoList(n.ipamV6Info, nn.ipamV6Info) {

+ 10 - 59
libnetwork/network.go

@@ -152,7 +152,7 @@ type network struct {
 	ipamV4Info   []*IpamInfo
 	ipamV4Info   []*IpamInfo
 	ipamV6Info   []*IpamInfo
 	ipamV6Info   []*IpamInfo
 	enableIPv6   bool
 	enableIPv6   bool
-	endpointCnt  uint64
+	epCnt        *endpointCnt
 	generic      options.Generic
 	generic      options.Generic
 	dbIndex      uint64
 	dbIndex      uint64
 	svcRecords   svcMap
 	svcRecords   svcMap
@@ -296,7 +296,6 @@ func (n *network) CopyTo(o datastore.KVObject) error {
 	dstN.id = n.id
 	dstN.id = n.id
 	dstN.networkType = n.networkType
 	dstN.networkType = n.networkType
 	dstN.ipamType = n.ipamType
 	dstN.ipamType = n.ipamType
-	dstN.endpointCnt = n.endpointCnt
 	dstN.enableIPv6 = n.enableIPv6
 	dstN.enableIPv6 = n.enableIPv6
 	dstN.persist = n.persist
 	dstN.persist = n.persist
 	dstN.dbIndex = n.dbIndex
 	dstN.dbIndex = n.dbIndex
@@ -339,48 +338,11 @@ func (n *network) DataScope() string {
 	return n.driverScope()
 	return n.driverScope()
 }
 }
 
 
-func (n *network) EndpointCnt() uint64 {
+func (n *network) getEpCnt() *endpointCnt {
 	n.Lock()
 	n.Lock()
 	defer n.Unlock()
 	defer n.Unlock()
-	return n.endpointCnt
-}
-
-func (n *network) atomicIncDecEpCnt(inc bool) error {
-retry:
-	n.Lock()
-	if inc {
-		n.endpointCnt++
-	} else {
-		n.endpointCnt--
-	}
-	n.Unlock()
-
-	store := n.getController().getStore(n.DataScope())
-	if store == nil {
-		return fmt.Errorf("store not found for scope %s", n.DataScope())
-	}
-
-	if err := n.getController().updateToStore(n); err != nil {
-		if err == datastore.ErrKeyModified {
-			if err := store.GetObject(datastore.Key(n.Key()...), n); err != nil {
-				return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
-			}
-
-			goto retry
-		}
-
-		return err
-	}
 
 
-	return nil
-}
-
-func (n *network) IncEndpointCnt() error {
-	return n.atomicIncDecEpCnt(true)
-}
-
-func (n *network) DecEndpointCnt() error {
-	return n.atomicIncDecEpCnt(false)
+	return n.epCnt
 }
 }
 
 
 // TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
 // TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
@@ -391,7 +353,6 @@ func (n *network) MarshalJSON() ([]byte, error) {
 	netMap["networkType"] = n.networkType
 	netMap["networkType"] = n.networkType
 	netMap["ipamType"] = n.ipamType
 	netMap["ipamType"] = n.ipamType
 	netMap["addrSpace"] = n.addrSpace
 	netMap["addrSpace"] = n.addrSpace
-	netMap["endpointCnt"] = n.endpointCnt
 	netMap["enableIPv6"] = n.enableIPv6
 	netMap["enableIPv6"] = n.enableIPv6
 	if n.generic != nil {
 	if n.generic != nil {
 		netMap["generic"] = n.generic
 		netMap["generic"] = n.generic
@@ -437,7 +398,6 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
 	n.name = netMap["name"].(string)
 	n.name = netMap["name"].(string)
 	n.id = netMap["id"].(string)
 	n.id = netMap["id"].(string)
 	n.networkType = netMap["networkType"].(string)
 	n.networkType = netMap["networkType"].(string)
-	n.endpointCnt = uint64(netMap["endpointCnt"].(float64))
 	n.enableIPv6 = netMap["enableIPv6"].(bool)
 	n.enableIPv6 = netMap["enableIPv6"].(bool)
 
 
 	if v, ok := netMap["generic"]; ok {
 	if v, ok := netMap["generic"]; ok {
@@ -604,7 +564,7 @@ func (n *network) Delete() error {
 		return &UnknownNetworkError{name: name, id: id}
 		return &UnknownNetworkError{name: name, id: id}
 	}
 	}
 
 
-	numEps := n.EndpointCnt()
+	numEps := n.getEpCnt().EndpointCnt()
 	if numEps != 0 {
 	if numEps != 0 {
 		return &ActiveEndpointsError{name: n.name, id: n.id}
 		return &ActiveEndpointsError{name: n.name, id: n.id}
 	}
 	}
@@ -622,24 +582,15 @@ func (n *network) Delete() error {
 	}()
 	}()
 
 
 	// deleteFromStore performs an atomic delete operation and the
 	// deleteFromStore performs an atomic delete operation and the
-	// network.endpointCnt field will help prevent any possible
+	// network.epCnt will help prevent any possible
 	// race between endpoint join and network delete
 	// race between endpoint join and network delete
+	if err = n.getController().deleteFromStore(n.getEpCnt()); err != nil {
+		return fmt.Errorf("error deleting network endpoint count from store: %v", err)
+	}
 	if err = n.getController().deleteFromStore(n); err != nil {
 	if err = n.getController().deleteFromStore(n); err != nil {
-		if err == datastore.ErrKeyModified {
-			return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
-		}
-		return err
+		return fmt.Errorf("error deleting network from store: %v", err)
 	}
 	}
 
 
-	defer func() {
-		if err != nil {
-			n.dbExists = false
-			if e := n.getController().updateToStore(n); e != nil {
-				log.Warnf("failed to recreate network in store %s : %v", n.name, e)
-			}
-		}
-	}()
-
 	n.ipamRelease()
 	n.ipamRelease()
 
 
 	return nil
 	return nil
@@ -736,7 +687,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
 	}()
 	}()
 
 
 	// Increment endpoint count to indicate completion of endpoint addition
 	// Increment endpoint count to indicate completion of endpoint addition
-	if err = n.IncEndpointCnt(); err != nil {
+	if err = n.getEpCnt().IncEndpointCnt(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 

+ 20 - 5
libnetwork/store.go

@@ -69,6 +69,13 @@ func (c *controller) getNetworkFromStore(nid string) (*network, error) {
 			continue
 			continue
 		}
 		}
 
 
+		ec := &endpointCnt{n: n}
+		err = store.GetObject(datastore.Key(ec.Key()...), ec)
+		if err != nil {
+			return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
+		}
+
+		n.epCnt = ec
 		return n, nil
 		return n, nil
 	}
 	}
 
 
@@ -94,6 +101,14 @@ func (c *controller) getNetworksFromStore() ([]*network, error) {
 		for _, kvo := range kvol {
 		for _, kvo := range kvol {
 			n := kvo.(*network)
 			n := kvo.(*network)
 			n.ctrlr = c
 			n.ctrlr = c
+
+			ec := &endpointCnt{n: n}
+			err = store.GetObject(datastore.Key(ec.Key()...), ec)
+			if err != nil {
+				return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
+			}
+
+			n.epCnt = ec
 			nl = append(nl, n)
 			nl = append(nl, n)
 		}
 		}
 	}
 	}
@@ -211,15 +226,15 @@ func (c *controller) unWatchSvcRecord(ep *endpoint) {
 	c.unWatchCh <- ep
 	c.unWatchCh <- ep
 }
 }
 
 
-func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) {
+func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
 	for {
 	for {
 		select {
 		select {
 		case <-nw.stopCh:
 		case <-nw.stopCh:
 			return
 			return
-		case o := <-nCh:
-			n := o.(*network)
+		case o := <-ecCh:
+			ec := o.(*endpointCnt)
 
 
-			epl, err := n.getEndpointsFromStore()
+			epl, err := ec.n.getEndpointsFromStore()
 			if err != nil {
 			if err != nil {
 				break
 				break
 			}
 			}
@@ -300,7 +315,7 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
 		return
 		return
 	}
 	}
 
 
-	ch, err := store.Watch(ep.getNetwork(), nw.stopCh)
+	ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh)
 	if err != nil {
 	if err != nil {
 		log.Warnf("Error creating watch for network: %v", err)
 		log.Warnf("Error creating watch for network: %v", err)
 		return
 		return