فهرست منبع

Separate endpoint count data from network object

Currently endpoint count is maintained as part of
network object and the endpoint count gets updated
frequently while the rest of network is quite stable.
Because of the frequent updates to endpoint count the
network object is getting marshalled and unmarshalled
ferquently. This is causing a lot of churn and transient
memory usage. Fix this by creating a deparate object of
endpoint count so that only that gets updated.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 سال پیش
والد
کامیت
ab8dfb54fe
6فایلهای تغییر یافته به همراه199 افزوده شده و 81 حذف شده
  1. 19 13
      libnetwork/controller.go
  2. 2 2
      libnetwork/endpoint.go
  3. 147 0
      libnetwork/endpoint_cnt.go
  4. 1 2
      libnetwork/libnetwork_internal_test.go
  5. 10 59
      libnetwork/network.go
  6. 20 5
      libnetwork/store.go

+ 19 - 13
libnetwork/controller.go

@@ -355,24 +355,30 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 		}
 	}()
 
-	// addNetwork can be called for local scope network lazily when
-	// an endpoint is created after a restart and the network was
-	// created in previous life. Make sure you wrap around the driver
-	// notification of network creation in once call so that the driver
-	// invoked only once in case both the network and endpoint creation
-	// happens in the same lifetime.
-	network.drvOnce.Do(func() {
-		err = c.addNetwork(network)
-	})
-	if err != nil {
+	if err := c.addNetwork(network); err != nil {
 		return nil, err
 	}
+	defer func() {
+		if err != nil {
+			if e := network.deleteNetwork(); e != nil {
+				log.Warnf("couldn't roll back driver network on network %s creation failure: %v", network.name, err)
+			}
+		}
+	}()
 
 	if err = c.updateToStore(network); err != nil {
-		log.Warnf("couldnt create network %s: %v", network.name, err)
-		if e := network.Delete(); e != nil {
-			log.Warnf("couldnt cleanup network %s on network create failure (%v): %v", network.name, err, e)
+		return nil, err
+	}
+	defer func() {
+		if err != nil {
+			if e := c.deleteFromStore(network); e != nil {
+				log.Warnf("couldnt rollback from store, network %s on failure (%v): %v", network.name, err, e)
+			}
 		}
+	}()
+
+	network.epCnt = &endpointCnt{n: network}
+	if err = c.updateToStore(network.epCnt); err != nil {
 		return nil, err
 	}
 

+ 2 - 2
libnetwork/endpoint.go

@@ -484,12 +484,12 @@ func (ep *endpoint) Delete() error {
 	}
 	ep.Unlock()
 
-	if err = n.DecEndpointCnt(); err != nil {
+	if err = n.getEpCnt().DecEndpointCnt(); err != nil {
 		return err
 	}
 	defer func() {
 		if err != nil {
-			if e := n.IncEndpointCnt(); e != nil {
+			if e := n.getEpCnt().IncEndpointCnt(); e != nil {
 				log.Warnf("failed to update network %s : %v", n.name, e)
 			}
 		}

+ 147 - 0
libnetwork/endpoint_cnt.go

@@ -0,0 +1,147 @@
+package libnetwork
+
+import (
+	"encoding/json"
+	"fmt"
+	"sync"
+
+	"github.com/docker/libnetwork/datastore"
+)
+
+type endpointCnt struct {
+	n        *network
+	Count    uint64
+	dbIndex  uint64
+	dbExists bool
+	sync.Mutex
+}
+
+const epCntKeyPrefix = "endpoint_count"
+
+func (ec *endpointCnt) Key() []string {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return []string{epCntKeyPrefix, ec.n.id}
+}
+
+func (ec *endpointCnt) KeyPrefix() []string {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return []string{epCntKeyPrefix, ec.n.id}
+}
+
+func (ec *endpointCnt) Value() []byte {
+	ec.Lock()
+	defer ec.Unlock()
+
+	b, err := json.Marshal(ec)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (ec *endpointCnt) SetValue(value []byte) error {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return json.Unmarshal(value, &ec)
+}
+
+func (ec *endpointCnt) Index() uint64 {
+	ec.Lock()
+	defer ec.Unlock()
+	return ec.dbIndex
+}
+
+func (ec *endpointCnt) SetIndex(index uint64) {
+	ec.Lock()
+	ec.dbIndex = index
+	ec.dbExists = true
+	ec.Unlock()
+}
+
+func (ec *endpointCnt) Exists() bool {
+	ec.Lock()
+	defer ec.Unlock()
+	return ec.dbExists
+}
+
+func (ec *endpointCnt) Skip() bool {
+	ec.Lock()
+	defer ec.Unlock()
+	return !ec.n.persist
+}
+
+func (ec *endpointCnt) New() datastore.KVObject {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return &endpointCnt{
+		n: ec.n,
+	}
+}
+
+func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
+	ec.Lock()
+	defer ec.Unlock()
+
+	dstEc := o.(*endpointCnt)
+	dstEc.n = ec.n
+	dstEc.Count = ec.Count
+	dstEc.dbExists = ec.dbExists
+	dstEc.dbIndex = ec.dbIndex
+
+	return nil
+}
+
+func (ec *endpointCnt) DataScope() string {
+	return ec.n.DataScope()
+}
+
+func (ec *endpointCnt) EndpointCnt() uint64 {
+	ec.Lock()
+	defer ec.Unlock()
+
+	return ec.Count
+}
+
+func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
+retry:
+	ec.Lock()
+	if inc {
+		ec.Count++
+	} else {
+		ec.Count--
+	}
+	ec.Unlock()
+
+	store := ec.n.getController().getStore(ec.DataScope())
+	if store == nil {
+		return fmt.Errorf("store not found for scope %s", ec.DataScope())
+	}
+
+	if err := ec.n.getController().updateToStore(ec); err != nil {
+		if err == datastore.ErrKeyModified {
+			if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
+				return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
+			}
+
+			goto retry
+		}
+
+		return err
+	}
+
+	return nil
+}
+
+func (ec *endpointCnt) IncEndpointCnt() error {
+	return ec.atomicIncDecEpCnt(true)
+}
+
+func (ec *endpointCnt) DecEndpointCnt() error {
+	return ec.atomicIncDecEpCnt(false)
+}

+ 1 - 2
libnetwork/libnetwork_internal_test.go

@@ -39,7 +39,6 @@ func TestNetworkMarshalling(t *testing.T) {
 		ipamType:    "default",
 		addrSpace:   "viola",
 		networkType: "bridge",
-		endpointCnt: 27,
 		enableIPv6:  true,
 		persist:     true,
 		ipamV4Config: []*IpamConf{
@@ -142,7 +141,7 @@ func TestNetworkMarshalling(t *testing.T) {
 	}
 
 	if n.name != nn.name || n.id != nn.id || n.networkType != nn.networkType || n.ipamType != nn.ipamType ||
-		n.addrSpace != nn.addrSpace || n.endpointCnt != nn.endpointCnt || n.enableIPv6 != nn.enableIPv6 ||
+		n.addrSpace != nn.addrSpace || n.enableIPv6 != nn.enableIPv6 ||
 		n.persist != nn.persist || !compareIpamConfList(n.ipamV4Config, nn.ipamV4Config) ||
 		!compareIpamInfoList(n.ipamV4Info, nn.ipamV4Info) || !compareIpamConfList(n.ipamV6Config, nn.ipamV6Config) ||
 		!compareIpamInfoList(n.ipamV6Info, nn.ipamV6Info) {

+ 10 - 59
libnetwork/network.go

@@ -152,7 +152,7 @@ type network struct {
 	ipamV4Info   []*IpamInfo
 	ipamV6Info   []*IpamInfo
 	enableIPv6   bool
-	endpointCnt  uint64
+	epCnt        *endpointCnt
 	generic      options.Generic
 	dbIndex      uint64
 	svcRecords   svcMap
@@ -296,7 +296,6 @@ func (n *network) CopyTo(o datastore.KVObject) error {
 	dstN.id = n.id
 	dstN.networkType = n.networkType
 	dstN.ipamType = n.ipamType
-	dstN.endpointCnt = n.endpointCnt
 	dstN.enableIPv6 = n.enableIPv6
 	dstN.persist = n.persist
 	dstN.dbIndex = n.dbIndex
@@ -339,48 +338,11 @@ func (n *network) DataScope() string {
 	return n.driverScope()
 }
 
-func (n *network) EndpointCnt() uint64 {
+func (n *network) getEpCnt() *endpointCnt {
 	n.Lock()
 	defer n.Unlock()
-	return n.endpointCnt
-}
-
-func (n *network) atomicIncDecEpCnt(inc bool) error {
-retry:
-	n.Lock()
-	if inc {
-		n.endpointCnt++
-	} else {
-		n.endpointCnt--
-	}
-	n.Unlock()
-
-	store := n.getController().getStore(n.DataScope())
-	if store == nil {
-		return fmt.Errorf("store not found for scope %s", n.DataScope())
-	}
-
-	if err := n.getController().updateToStore(n); err != nil {
-		if err == datastore.ErrKeyModified {
-			if err := store.GetObject(datastore.Key(n.Key()...), n); err != nil {
-				return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
-			}
-
-			goto retry
-		}
-
-		return err
-	}
 
-	return nil
-}
-
-func (n *network) IncEndpointCnt() error {
-	return n.atomicIncDecEpCnt(true)
-}
-
-func (n *network) DecEndpointCnt() error {
-	return n.atomicIncDecEpCnt(false)
+	return n.epCnt
 }
 
 // TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
@@ -391,7 +353,6 @@ func (n *network) MarshalJSON() ([]byte, error) {
 	netMap["networkType"] = n.networkType
 	netMap["ipamType"] = n.ipamType
 	netMap["addrSpace"] = n.addrSpace
-	netMap["endpointCnt"] = n.endpointCnt
 	netMap["enableIPv6"] = n.enableIPv6
 	if n.generic != nil {
 		netMap["generic"] = n.generic
@@ -437,7 +398,6 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
 	n.name = netMap["name"].(string)
 	n.id = netMap["id"].(string)
 	n.networkType = netMap["networkType"].(string)
-	n.endpointCnt = uint64(netMap["endpointCnt"].(float64))
 	n.enableIPv6 = netMap["enableIPv6"].(bool)
 
 	if v, ok := netMap["generic"]; ok {
@@ -604,7 +564,7 @@ func (n *network) Delete() error {
 		return &UnknownNetworkError{name: name, id: id}
 	}
 
-	numEps := n.EndpointCnt()
+	numEps := n.getEpCnt().EndpointCnt()
 	if numEps != 0 {
 		return &ActiveEndpointsError{name: n.name, id: n.id}
 	}
@@ -622,24 +582,15 @@ func (n *network) Delete() error {
 	}()
 
 	// deleteFromStore performs an atomic delete operation and the
-	// network.endpointCnt field will help prevent any possible
+	// network.epCnt will help prevent any possible
 	// race between endpoint join and network delete
+	if err = n.getController().deleteFromStore(n.getEpCnt()); err != nil {
+		return fmt.Errorf("error deleting network endpoint count from store: %v", err)
+	}
 	if err = n.getController().deleteFromStore(n); err != nil {
-		if err == datastore.ErrKeyModified {
-			return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
-		}
-		return err
+		return fmt.Errorf("error deleting network from store: %v", err)
 	}
 
-	defer func() {
-		if err != nil {
-			n.dbExists = false
-			if e := n.getController().updateToStore(n); e != nil {
-				log.Warnf("failed to recreate network in store %s : %v", n.name, e)
-			}
-		}
-	}()
-
 	n.ipamRelease()
 
 	return nil
@@ -736,7 +687,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
 	}()
 
 	// Increment endpoint count to indicate completion of endpoint addition
-	if err = n.IncEndpointCnt(); err != nil {
+	if err = n.getEpCnt().IncEndpointCnt(); err != nil {
 		return nil, err
 	}
 

+ 20 - 5
libnetwork/store.go

@@ -69,6 +69,13 @@ func (c *controller) getNetworkFromStore(nid string) (*network, error) {
 			continue
 		}
 
+		ec := &endpointCnt{n: n}
+		err = store.GetObject(datastore.Key(ec.Key()...), ec)
+		if err != nil {
+			return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
+		}
+
+		n.epCnt = ec
 		return n, nil
 	}
 
@@ -94,6 +101,14 @@ func (c *controller) getNetworksFromStore() ([]*network, error) {
 		for _, kvo := range kvol {
 			n := kvo.(*network)
 			n.ctrlr = c
+
+			ec := &endpointCnt{n: n}
+			err = store.GetObject(datastore.Key(ec.Key()...), ec)
+			if err != nil {
+				return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
+			}
+
+			n.epCnt = ec
 			nl = append(nl, n)
 		}
 	}
@@ -211,15 +226,15 @@ func (c *controller) unWatchSvcRecord(ep *endpoint) {
 	c.unWatchCh <- ep
 }
 
-func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) {
+func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
 	for {
 		select {
 		case <-nw.stopCh:
 			return
-		case o := <-nCh:
-			n := o.(*network)
+		case o := <-ecCh:
+			ec := o.(*endpointCnt)
 
-			epl, err := n.getEndpointsFromStore()
+			epl, err := ec.n.getEndpointsFromStore()
 			if err != nil {
 				break
 			}
@@ -300,7 +315,7 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
 		return
 	}
 
-	ch, err := store.Watch(ep.getNetwork(), nw.stopCh)
+	ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh)
 	if err != nil {
 		log.Warnf("Error creating watch for network: %v", err)
 		return