Browse Source

datastore support for Endpoint

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 10 years ago
parent
commit
dca35085f5

+ 1 - 1
libnetwork/cmd/test/libnetwork.toml

@@ -4,7 +4,7 @@ title = "LibNetwork Configuration file"
   debug = false
 [cluster]
   discovery = "token://22aa23948f4f6b31230687689636959e"
-  Address = "2.1.1.1"
+  Address = "1.1.1.1"
 [datastore]
   embedded = false
 [datastore.client]

+ 11 - 0
libnetwork/cmd/test/main.go

@@ -35,6 +35,17 @@ func main() {
 		} else {
 			fmt.Println("Network Created Successfully :", netw)
 		}
+		netw, _ = controller.NetworkByName(fmt.Sprintf("Gordon-%d", i))
+		_, err = netw.CreateEndpoint(fmt.Sprintf("Gordon-Ep-%d", i), nil)
+		if err != nil {
+			log.Fatalf("Error creating endpoint 1 %v", err)
+		}
+
+		_, err = netw.CreateEndpoint(fmt.Sprintf("Gordon-Ep2-%d", i), nil)
+		if err != nil {
+			log.Fatalf("Error creating endpoint 2 %v", err)
+		}
+
 		time.Sleep(10 * time.Second)
 	}
 }

+ 18 - 98
libnetwork/controller.go

@@ -45,7 +45,6 @@ create network namespaces and allocate interfaces for containers to use.
 package libnetwork
 
 import (
-	"encoding/json"
 	"fmt"
 	"net"
 	"os"
@@ -161,21 +160,6 @@ func (c *controller) initConfig(configFile string) error {
 	return nil
 }
 
-func (c *controller) initDataStore() error {
-	if c.cfg == nil {
-		return fmt.Errorf("datastore initialization requires a valid configuration")
-	}
-
-	store, err := datastore.NewDataStore(&c.cfg.Datastore)
-	if err != nil {
-		return err
-	}
-	c.Lock()
-	c.store = store
-	c.Unlock()
-	return c.watchNewNetworks()
-}
-
 func (c *controller) initDiscovery() error {
 	if c.cfg == nil {
 		return fmt.Errorf("discovery initialization requires a valid configuration")
@@ -217,18 +201,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 	if name == "" {
 		return nil, ErrInvalidName(name)
 	}
-	// Check if a driver for the specified network type is available
-	c.Lock()
-	d, ok := c.drivers[networkType]
-	c.Unlock()
-	if !ok {
-		var err error
-		d, err = c.loadDriver(networkType)
-		if err != nil {
-			return nil, err
-		}
-	}
-
 	// Check if a network already exists with the specified network name
 	c.Lock()
 	for _, n := range c.networks {
@@ -245,21 +217,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 		networkType: networkType,
 		id:          types.UUID(stringid.GenerateRandomID()),
 		ctrlr:       c,
-		driver:      d,
 		endpoints:   endpointTable{},
 	}
 
 	network.processOptions(options...)
-	// Create the network
-	if err := d.CreateNetwork(network.id, network.generic); err != nil {
+
+	if err := c.addNetwork(network); err != nil {
 		return nil, err
 	}
 
-	// Store the network handler in controller
-	c.Lock()
-	c.networks[network.id] = network
-	c.Unlock()
-
 	if err := c.addNetworkToStore(network); err != nil {
 		return nil, err
 	}
@@ -267,77 +233,31 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
 	return network, nil
 }
 
-func (c *controller) newNetworkFromStore(n *network) {
-	c.Lock()
-	defer c.Unlock()
-
-	if _, ok := c.drivers[n.networkType]; !ok {
-		log.Warnf("Network driver unavailable for type=%s. ignoring network updates for %s", n.Type(), n.Name())
-		return
-	}
-	n.ctrlr = c
-	n.driver = c.drivers[n.networkType]
-	c.networks[n.id] = n
-	// TODO : Populate n.endpoints back from endpoint dbstore
-}
+func (c *controller) addNetwork(n *network) error {
 
-func (c *controller) addNetworkToStore(n *network) error {
-	if isReservedNetwork(n.Name()) {
-		return nil
-	}
 	c.Lock()
-	cs := c.store
+	// Check if a driver for the specified network type is available
+	d, ok := c.drivers[n.networkType]
 	c.Unlock()
-	if cs == nil {
-		log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
-		return nil
-	}
 
-	// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
-	// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
-	// return cs.PutObjectAtomic(n)
+	if !ok {
+		var err error
+		d, err = c.loadDriver(n.networkType)
+		if err != nil {
+			return err
+		}
+	}
 
-	return cs.PutObject(n)
-}
+	n.driver = d
 
-func (c *controller) watchNewNetworks() error {
+	// Create the network
+	if err := d.CreateNetwork(n.id, n.generic); err != nil {
+		return err
+	}
 	c.Lock()
-	cs := c.store
+	c.networks[n.id] = n
 	c.Unlock()
 
-	kvPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan)
-	if err != nil {
-		return err
-	}
-	go func() {
-		for {
-			select {
-			case kvs := <-kvPairs:
-				for _, kve := range kvs {
-					var n network
-					err := json.Unmarshal(kve.Value, &n)
-					if err != nil {
-						log.Error(err)
-						continue
-					}
-					n.dbIndex = kve.LastIndex
-					c.Lock()
-					existing, ok := c.networks[n.id]
-					c.Unlock()
-					if ok && existing.dbIndex == n.dbIndex {
-						// Skip any watch notification for a network that has not changed
-						continue
-					} else if ok {
-						// Received an update for an existing network object
-						log.Debugf("Skipping network update for %s (%s)", n.name, n.id)
-						continue
-					}
-
-					c.newNetworkFromStore(&n)
-				}
-			}
-		}
-	}()
 	return nil
 }
 

+ 2 - 0
libnetwork/datastore/datastore.go

@@ -37,6 +37,8 @@ type KV interface {
 const (
 	// NetworkKeyPrefix is the prefix for network key in the kv store
 	NetworkKeyPrefix = "network"
+	// EndpointKeyPrefix is the prefix for endpoint key in the kv store
+	EndpointKeyPrefix = "endpoint"
 )
 
 //Key provides convenient method to create a Key

+ 66 - 0
libnetwork/endpoint.go

@@ -2,6 +2,7 @@ package libnetwork
 
 import (
 	"bytes"
+	"encoding/json"
 	"io/ioutil"
 	"os"
 	"path"
@@ -10,6 +11,7 @@ import (
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/etchosts"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/resolvconf"
@@ -108,9 +110,53 @@ type endpoint struct {
 	exposedPorts  []types.TransportPort
 	generic       map[string]interface{}
 	joinLeaveDone chan struct{}
+	dbIndex       uint64
 	sync.Mutex
 }
 
+func (ep *endpoint) MarshalJSON() ([]byte, error) {
+	epMap := make(map[string]interface{})
+	epMap["name"] = ep.name
+	epMap["id"] = string(ep.id)
+	epMap["network"] = ep.network
+	epMap["ep_iface"] = ep.iFaces
+	epMap["exposed_ports"] = ep.exposedPorts
+	epMap["generic"] = ep.generic
+	return json.Marshal(epMap)
+}
+
+func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
+	var epMap map[string]interface{}
+	if err := json.Unmarshal(b, &epMap); err != nil {
+		return err
+	}
+	ep.name = epMap["name"].(string)
+	ep.id = types.UUID(epMap["id"].(string))
+
+	nb, _ := json.Marshal(epMap["network"])
+	var n network
+	json.Unmarshal(nb, &n)
+	ep.network = &n
+
+	ib, _ := json.Marshal(epMap["ep_iface"])
+	var ifaces []endpointInterface
+	json.Unmarshal(ib, &ifaces)
+	ep.iFaces = make([]*endpointInterface, 0)
+	for _, iface := range ifaces {
+		ep.iFaces = append(ep.iFaces, &iface)
+	}
+
+	tb, _ := json.Marshal(epMap["exposed_ports"])
+	var tPorts []types.TransportPort
+	json.Unmarshal(tb, &tPorts)
+	ep.exposedPorts = tPorts
+
+	if epMap["generic"] != nil {
+		ep.generic = epMap["generic"].(map[string]interface{})
+	}
+	return nil
+}
+
 const defaultPrefix = "/var/lib/docker/network/files"
 
 func (ep *endpoint) ID() string {
@@ -134,6 +180,26 @@ func (ep *endpoint) Network() string {
 	return ep.network.name
 }
 
+func (ep *endpoint) Key() []string {
+	return []string{datastore.EndpointKeyPrefix, string(ep.network.id), string(ep.id)}
+}
+
+func (ep *endpoint) Value() []byte {
+	b, err := json.Marshal(ep)
+	if err != nil {
+		return nil
+	}
+	return b
+}
+
+func (ep *endpoint) Index() uint64 {
+	return ep.dbIndex
+}
+
+func (ep *endpoint) SetIndex(index uint64) {
+	ep.dbIndex = index
+}
+
 func (ep *endpoint) processOptions(options ...EndpointOption) {
 	ep.Lock()
 	defer ep.Unlock()

+ 27 - 6
libnetwork/network.go

@@ -191,6 +191,28 @@ func (n *network) Delete() error {
 	return nil
 }
 
+func (n *network) addEndpoint(ep *endpoint) error {
+	var err error
+	n.Lock()
+	n.endpoints[ep.id] = ep
+	d := n.driver
+	n.Unlock()
+
+	defer func() {
+		if err != nil {
+			n.Lock()
+			delete(n.endpoints, ep.id)
+			n.Unlock()
+		}
+	}()
+
+	err = d.CreateEndpoint(n.id, ep.id, ep, ep.generic)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
 func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) {
 	if name == "" {
 		return nil, ErrInvalidName(name)
@@ -205,15 +227,14 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
 	ep.network = n
 	ep.processOptions(options...)
 
-	d := n.driver
-	err := d.CreateEndpoint(n.id, ep.id, ep, ep.generic)
-	if err != nil {
+	if err := n.addEndpoint(ep); err != nil {
+		return nil, err
+	}
+
+	if err := n.ctrlr.addEndpointToStore(ep); err != nil {
 		return nil, err
 	}
 
-	n.Lock()
-	n.endpoints[ep.id] = ep
-	n.Unlock()
 	return ep, nil
 }
 

+ 157 - 0
libnetwork/store.go

@@ -0,0 +1,157 @@
+package libnetwork
+
+import (
+	"encoding/json"
+	"fmt"
+
+	log "github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/datastore"
+)
+
+func (c *controller) initDataStore() error {
+	if c.cfg == nil {
+		return fmt.Errorf("datastore initialization requires a valid configuration")
+	}
+
+	store, err := datastore.NewDataStore(&c.cfg.Datastore)
+	if err != nil {
+		return err
+	}
+	c.Lock()
+	c.store = store
+	c.Unlock()
+	return c.watchStore()
+}
+
+func (c *controller) newNetworkFromStore(n *network) {
+	c.Lock()
+	n.ctrlr = c
+	c.Unlock()
+	n.endpoints = endpointTable{}
+
+	c.addNetwork(n)
+}
+
+func (c *controller) addNetworkToStore(n *network) error {
+	if isReservedNetwork(n.Name()) {
+		return nil
+	}
+	c.Lock()
+	cs := c.store
+	c.Unlock()
+	if cs == nil {
+		log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
+		return nil
+	}
+
+	// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
+	// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
+	// return cs.PutObjectAtomic(n)
+
+	return cs.PutObject(n)
+}
+
+func (c *controller) newEndpointFromStore(ep *endpoint) {
+	c.Lock()
+	defer c.Unlock()
+
+	n, ok := c.networks[ep.network.id]
+	if !ok {
+		log.Warnf("Network (%s) unavailable for endpoint=%s. ignoring endpoint update", ep.network.id, ep.name)
+		// TODO : Get Network from Store and call newNetworkFromStore
+		return
+	}
+	ep.network = n
+	_, err := n.EndpointByID(string(ep.id))
+	if _, ok := err.(ErrNoSuchEndpoint); ok {
+		n.addEndpoint(ep)
+	}
+}
+
+func (c *controller) addEndpointToStore(ep *endpoint) error {
+	if isReservedNetwork(ep.network.name) {
+		return nil
+	}
+	c.Lock()
+	cs := c.store
+	c.Unlock()
+	if cs == nil {
+		log.Debugf("datastore not initialized. endpoint %s is not added to the store", ep.name)
+		return nil
+	}
+
+	// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
+	// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
+	// return cs.PutObjectAtomic(ep)
+
+	return cs.PutObject(ep)
+}
+
+func (c *controller) watchStore() error {
+	c.Lock()
+	cs := c.store
+	c.Unlock()
+
+	nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan)
+	if err != nil {
+		return err
+	}
+	epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), c.stopChan)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case nws := <-nwPairs:
+				for _, kve := range nws {
+					var n network
+					err := json.Unmarshal(kve.Value, &n)
+					if err != nil {
+						log.Error(err)
+						continue
+					}
+					n.dbIndex = kve.LastIndex
+					c.Lock()
+					existing, ok := c.networks[n.id]
+					c.Unlock()
+					if ok {
+						// Skip existing network update
+						if existing.dbIndex != n.dbIndex {
+							log.Debugf("Skipping network update for %s (%s)", n.name, n.id)
+						}
+						continue
+					}
+
+					c.newNetworkFromStore(&n)
+				}
+			case eps := <-epPairs:
+				for _, epe := range eps {
+					var ep endpoint
+					err := json.Unmarshal(epe.Value, &ep)
+					if err != nil {
+						log.Error(err)
+						continue
+					}
+					ep.dbIndex = epe.LastIndex
+					c.Lock()
+					n, ok := c.networks[ep.network.id]
+					c.Unlock()
+					if ok {
+						existing, _ := n.EndpointByID(string(ep.id))
+						if existing != nil {
+							// Skip existing endpoint update
+							if existing.(*endpoint).dbIndex != ep.dbIndex {
+								log.Debugf("Skipping endpoint update for %s (%s)", ep.name, ep.id)
+							}
+							continue
+						}
+					}
+
+					c.newEndpointFromStore(&ep)
+				}
+			}
+		}
+	}()
+	return nil
+}