Procházet zdrojové kódy

Merge pull request #310 from mavenugo/bootup

Reading the top level element (network) from datastore on init
Jana Radhakrishnan před 10 roky
rodič
revize
433bfc7b0c
2 změnil soubory, kde provedl 51 přidání a 30 odebrání
  1. 3 2
      libnetwork/cmd/dnet/libnetwork.toml
  2. 48 28
      libnetwork/store.go

+ 3 - 2
libnetwork/cmd/dnet/libnetwork.toml

@@ -2,10 +2,11 @@ title = "LibNetwork Configuration file"
 
 
 [daemon]
 [daemon]
   debug = false
   debug = false
-  DefaultNetwork = "bridge"
-  DefaultDriver = "bridge"
 [cluster]
 [cluster]
   discovery = "token://22aa23948f4f6b31230687689636959e"
   discovery = "token://22aa23948f4f6b31230687689636959e"
   Address = "1.1.1.1"
   Address = "1.1.1.1"
 [datastore]
 [datastore]
   embedded = false
   embedded = false
+[datastore.client]
+  provider = "consul"
+  Address = "localhost:8500"

+ 48 - 28
libnetwork/store.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"fmt"
 
 
 	log "github.com/Sirupsen/logrus"
 	log "github.com/Sirupsen/logrus"
+	"github.com/docker/libkv/store"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/libnetwork/types"
 )
 )
@@ -31,9 +32,23 @@ func (c *controller) initDataStore() error {
 	c.Lock()
 	c.Lock()
 	c.store = store
 	c.store = store
 	c.Unlock()
 	c.Unlock()
+
+	nws, err := c.getNetworksFromStore()
+	if err == nil {
+		c.processNetworkUpdate(nws, nil)
+	} else if err != datastore.ErrKeyNotFound {
+		log.Warnf("failed to read networks from datastore during init : %v", err)
+	}
 	return c.watchNetworks()
 	return c.watchNetworks()
 }
 }
 
 
+func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) {
+	c.Lock()
+	cs := c.store
+	c.Unlock()
+	return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
+}
+
 func (c *controller) newNetworkFromStore(n *network) error {
 func (c *controller) newNetworkFromStore(n *network) error {
 	n.Lock()
 	n.Lock()
 	n.ctrlr = c
 	n.ctrlr = c
@@ -178,34 +193,7 @@ func (c *controller) watchNetworks() error {
 				for k, v := range lview {
 				for k, v := range lview {
 					tmpview[k] = v
 					tmpview[k] = v
 				}
 				}
-				for _, kve := range nws {
-					var n network
-					err := json.Unmarshal(kve.Value, &n)
-					if err != nil {
-						log.Error(err)
-						continue
-					}
-					delete(tmpview, n.id)
-					n.dbIndex = kve.LastIndex
-					c.Lock()
-					existing, ok := c.networks[n.id]
-					c.Unlock()
-					if ok {
-						existing.Lock()
-						// Skip existing network update
-						if existing.dbIndex != n.dbIndex {
-							existing.dbIndex = n.dbIndex
-							existing.endpointCnt = n.endpointCnt
-						}
-						existing.Unlock()
-						continue
-					}
-
-					if err = c.newNetworkFromStore(&n); err != nil {
-						log.Error(err)
-					}
-
-				}
+				c.processNetworkUpdate(nws, &tmpview)
 				// Delete processing
 				// Delete processing
 				for k := range tmpview {
 				for k := range tmpview {
 					c.Lock()
 					c.Lock()
@@ -305,6 +293,38 @@ func (n *network) stopWatch() {
 	n.Unlock()
 	n.Unlock()
 }
 }
 
 
+func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
+	for _, kve := range nws {
+		var n network
+		err := json.Unmarshal(kve.Value, &n)
+		if err != nil {
+			log.Error(err)
+			continue
+		}
+		if prune != nil {
+			delete(*prune, n.id)
+		}
+		n.dbIndex = kve.LastIndex
+		c.Lock()
+		existing, ok := c.networks[n.id]
+		c.Unlock()
+		if ok {
+			existing.Lock()
+			// Skip existing network update
+			if existing.dbIndex != n.dbIndex {
+				existing.dbIndex = n.dbIndex
+				existing.endpointCnt = n.endpointCnt
+			}
+			existing.Unlock()
+			continue
+		}
+
+		if err = c.newNetworkFromStore(&n); err != nil {
+			log.Error(err)
+		}
+	}
+}
+
 func (c *controller) processEndpointUpdate(ep *endpoint) bool {
 func (c *controller) processEndpointUpdate(ep *endpoint) bool {
 	nw := ep.network
 	nw := ep.network
 	if nw == nil {
 	if nw == nil {