Browse Source

Libnetwork Host Discovery using Swarm Discovery pkg

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 10 years ago
parent
commit
ae8643748d

+ 2 - 2
libnetwork/cmd/test/libnetwork.toml

@@ -3,8 +3,8 @@ title = "LibNetwork Configuration file"
 [daemon]
   debug = false
 [cluster]
-  discovery = "token://swarm-discovery-token"
-  Address = "Cluster-wide reachable Host IP"
+  discovery = "token://ce5b9756aeab50fe8fda02624f093d1c"
+  Address = "1.1.1.1:90"
 [datastore]
   embedded = false
 [datastore.client]

+ 3 - 1
libnetwork/config/config_test.go

@@ -2,6 +2,8 @@ package config
 
 import (
 	"testing"
+
+	_ "github.com/docker/libnetwork/netutils"
 )
 
 func TestInvalidConfig(t *testing.T) {
@@ -12,7 +14,7 @@ func TestInvalidConfig(t *testing.T) {
 }
 
 func TestConfig(t *testing.T) {
-	cfg, err := ParseConfig("libnetwork.toml")
+	_, err := ParseConfig("libnetwork.toml")
 	if err != nil {
 		t.Fatal("Error parsing a valid configuration file :", err)
 	}

+ 41 - 5
libnetwork/controller.go

@@ -47,6 +47,8 @@ package libnetwork
 
 import (
 	"encoding/json"
+	"fmt"
+	"net"
 	"os"
 	"strings"
 	"sync"
@@ -57,6 +59,7 @@ import (
 	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
+	"github.com/docker/libnetwork/hostdiscovery"
 	"github.com/docker/libnetwork/sandbox"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/swarm/pkg/store"
@@ -123,6 +126,11 @@ func New(configFile string) (NetworkController, error) {
 			// But it cannot fail creating the Controller
 			log.Warnf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
 		}
+		if err := c.initDiscovery(); err != nil {
+			// Failing to initalize discovery is a bad situation to be in.
+			// But it cannot fail creating the Controller
+			log.Warnf("Failed to Initialize Discovery : %v", err)
+		}
 	} else {
 		// Missing Configuration file is not a failure scenario
 		// But without that, datastore cannot be initialized.
@@ -156,6 +164,10 @@ func (c *controller) initConfig(configFile string) error {
 }
 
 func (c *controller) initDataStore() error {
+	if c.cfg == nil {
+		return fmt.Errorf("datastore initialization requires a valid configuration")
+	}
+
 	store, err := datastore.NewDataStore(&c.cfg.Datastore)
 	if err != nil {
 		return err
@@ -168,6 +180,21 @@ func (c *controller) initDataStore() error {
 	return nil
 }
 
+func (c *controller) initDiscovery() error {
+	if c.cfg == nil {
+		return fmt.Errorf("discovery initialization requires a valid configuration")
+	}
+
+	hostDiscovery := hostdiscovery.NewHostDiscovery()
+	return hostDiscovery.StartDiscovery(&c.cfg.Cluster, c.hostJoinCallback, c.hostLeaveCallback)
+}
+
+func (c *controller) hostJoinCallback(hosts []net.IP) {
+}
+
+func (c *controller) hostLeaveCallback(hosts []net.IP) {
+}
+
 func (c *controller) ConfigureNetworkDriver(networkType string, options map[string]interface{}) error {
 	c.Lock()
 	d, ok := c.drivers[networkType]
@@ -260,18 +287,22 @@ func (c *controller) addNetworkToStore(n *network) error {
 	if isReservedNetwork(n.Name()) {
 		return nil
 	}
-	if c.store == nil {
-		return ErrInvalidDatastore
+	c.Lock()
+	cs := c.store
+	c.Unlock()
+	if cs == nil {
+		log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
+		return nil
 	}
-	return c.store.PutObjectAtomic(n)
+	return cs.PutObjectAtomic(n)
 }
 
 func (c *controller) watchNewNetworks() {
 	c.Lock()
-	store = c.store
+	cs := c.store
 	c.Unlock()
 
-	store.KVStore().WatchRange(datastore.Key("network"), "", 0, func(kvi []store.KVEntry) {
+	cs.KVStore().WatchRange(datastore.Key(datastore.NetworkKeyPrefix), "", 0, func(kvi []store.KVEntry) {
 		for _, kve := range kvi {
 			var n network
 			err := json.Unmarshal(kve.Value(), &n)
@@ -286,7 +317,12 @@ func (c *controller) watchNewNetworks() {
 			if ok && existing.dbIndex == n.dbIndex {
 				// Skip any watch notification for a network that has not changed
 				continue
+			} else if ok {
+				// Received an update for an existing network object
+				log.Debugf("Skipping network update for %s (%s)", n.name, n.id)
+				continue
 			}
+
 			c.newNetworkFromStore(&n)
 		}
 	})

+ 9 - 0
libnetwork/datastore/datastore.go

@@ -24,12 +24,21 @@ type datastore struct {
 
 //KV Key Value interface used by objects to be part of the DataStore
 type KV interface {
+	// Key method lets an object to provide the Key to be used in KV Store
 	Key() []string
+	// Value method lets an object to marshal its content to be stored in the KV store
 	Value() []byte
+	// Index method returns the latest DB Index as seen by the object
 	Index() uint64
+	// SetIndex method allows the datastore to store the latest DB Index into the object
 	SetIndex(uint64)
 }
 
+const (
+	// NetworkKeyPrefix is the prefix for network key in the kv store
+	NetworkKeyPrefix = "network"
+)
+
 //Key provides convenient method to create a Key
 func Key(key ...string) string {
 	keychain := []string{"docker", "libnetwork"}

+ 164 - 0
libnetwork/hostdiscovery/hostdiscovery.go

@@ -0,0 +1,164 @@
+package hostdiscovery
+
+import (
+	"errors"
+	"fmt"
+	"net"
+	"sync"
+	"time"
+
+	log "github.com/Sirupsen/logrus"
+
+	mapset "github.com/deckarep/golang-set"
+	"github.com/docker/libnetwork/config"
+	"github.com/docker/swarm/discovery"
+	// Anonymous import will be removed after we upgrade to latest swarm
+	_ "github.com/docker/swarm/discovery/file"
+	// Anonymous import will be removed after we upgrade to latest swarm
+	_ "github.com/docker/swarm/discovery/kv"
+	// Anonymous import will be removed after we upgrade to latest swarm
+	_ "github.com/docker/swarm/discovery/nodes"
+	// Anonymous import will be removed after we upgrade to latest swarm
+	_ "github.com/docker/swarm/discovery/token"
+)
+
+const defaultHeartbeat = 10
+
+// JoinCallback provides a callback event for new node joining the cluster
+type JoinCallback func(entries []net.IP)
+
+// LeaveCallback provides a callback event for node leaving the cluster
+type LeaveCallback func(entries []net.IP)
+
+// HostDiscovery primary interface
+type HostDiscovery interface {
+	// StartDiscovery initiates the discovery process and provides appropriate callbacks
+	StartDiscovery(*config.ClusterCfg, JoinCallback, LeaveCallback) error
+	// StopDiscovery stops the discovery perocess
+	StopDiscovery() error
+	// Fetch returns a list of host IPs that are currently discovered
+	Fetch() ([]net.IP, error)
+}
+
+type hostDiscovery struct {
+	discovery discovery.Discovery
+	nodes     mapset.Set
+	stopChan  chan struct{}
+	sync.Mutex
+}
+
+// NewHostDiscovery function creates a host discovery object
+func NewHostDiscovery() HostDiscovery {
+	return &hostDiscovery{nodes: mapset.NewSet(), stopChan: make(chan struct{})}
+}
+
+func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
+	if cfg == nil {
+		return fmt.Errorf("discovery requires a valid configuration")
+	}
+
+	hb := cfg.Heartbeat
+	if hb == 0 {
+		hb = defaultHeartbeat
+	}
+	d, err := discovery.New(cfg.Discovery, hb)
+	if err != nil {
+		return err
+	}
+
+	if ip := net.ParseIP(cfg.Address); ip == nil {
+		return errors.New("Address config should be either ipv4 or ipv6 address")
+	}
+
+	if err := d.Register(cfg.Address + ":0"); err != nil {
+		return err
+	}
+
+	h.Lock()
+	h.discovery = d
+	h.Unlock()
+
+	go d.Watch(func(entries []*discovery.Entry) {
+		h.processCallback(entries, joinCallback, leaveCallback)
+	})
+
+	go sustainHeartbeat(d, hb, cfg, h.stopChan)
+	return nil
+}
+
+func (h *hostDiscovery) StopDiscovery() error {
+	h.Lock()
+	stopChan := h.stopChan
+	h.discovery = nil
+	h.Unlock()
+
+	close(stopChan)
+	return nil
+}
+
+func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCfg, stopChan chan struct{}) {
+	for {
+		select {
+		case <-stopChan:
+			return
+		case <-time.After(time.Duration(hb) * time.Second):
+			if err := d.Register(config.Address + ":0"); err != nil {
+				log.Warn(err)
+			}
+		}
+	}
+}
+
+func (h *hostDiscovery) processCallback(entries []*discovery.Entry, joinCallback JoinCallback, leaveCallback LeaveCallback) {
+	updated := hosts(entries)
+	h.Lock()
+	existing := h.nodes
+	added, removed := diff(existing, updated)
+	h.nodes = updated
+	h.Unlock()
+
+	if len(added) > 0 {
+		joinCallback(added)
+	}
+	if len(removed) > 0 {
+		leaveCallback(removed)
+	}
+}
+
+func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
+	addSlice := updated.Difference(existing).ToSlice()
+	removeSlice := existing.Difference(updated).ToSlice()
+	for _, ip := range addSlice {
+		added = append(added, net.ParseIP(ip.(string)))
+	}
+	for _, ip := range removeSlice {
+		removed = append(removed, net.ParseIP(ip.(string)))
+	}
+	return
+}
+
+func (h *hostDiscovery) Fetch() ([]net.IP, error) {
+	h.Lock()
+	hd := h.discovery
+	h.Unlock()
+	if hd == nil {
+		return nil, errors.New("No Active Discovery")
+	}
+	entries, err := hd.Fetch()
+	if err != nil {
+		return nil, err
+	}
+	ips := []net.IP{}
+	for _, entry := range entries {
+		ips = append(ips, net.ParseIP(entry.Host))
+	}
+	return ips, nil
+}
+
+func hosts(entries []*discovery.Entry) mapset.Set {
+	hosts := mapset.NewSet()
+	for _, entry := range entries {
+		hosts.Add(entry.Host)
+	}
+	return hosts
+}

+ 140 - 0
libnetwork/hostdiscovery/hostdiscovery_test.go

@@ -0,0 +1,140 @@
+package hostdiscovery
+
+import (
+	"net"
+	"testing"
+	"time"
+
+	mapset "github.com/deckarep/golang-set"
+	_ "github.com/docker/libnetwork/netutils"
+
+	"github.com/docker/libnetwork/config"
+	"github.com/docker/swarm/discovery"
+)
+
+func TestDiscovery(t *testing.T) {
+	_, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80")
+	if err != nil {
+		t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
+	}
+
+	hd := NewHostDiscovery()
+	config, err := config.ParseConfig("libnetwork.toml")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = hd.StartDiscovery(&config.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
+	if err != nil {
+		t.Fatal(err)
+	}
+	time.Sleep(time.Duration(config.Cluster.Heartbeat*2) * time.Second)
+	hosts, err := hd.Fetch()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	found := false
+	for _, ip := range hosts {
+		if ip.Equal(net.ParseIP(config.Cluster.Address)) {
+			found = true
+		}
+	}
+	if !found {
+		t.Fatalf("Expecting hosts. But none discovered ")
+	}
+	err = hd.StopDiscovery()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestBadDiscovery(t *testing.T) {
+	_, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80")
+	if err != nil {
+		t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com")
+	}
+
+	hd := NewHostDiscovery()
+	cfg := &config.Config{}
+	cfg.Cluster.Discovery = ""
+	err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
+	if err == nil {
+		t.Fatal("Invalid discovery configuration must fail")
+	}
+	cfg, err = config.ParseConfig("libnetwork.toml")
+	if err != nil {
+		t.Fatal(err)
+	}
+	cfg.Cluster.Address = "invalid"
+	err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {})
+	if err == nil {
+		t.Fatal("Invalid discovery address configuration must fail")
+	}
+}
+
+func TestDiff(t *testing.T) {
+	existing := mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
+	addedIP := "3.3.3.3"
+	updated := existing.Clone()
+	updated.Add(addedIP)
+
+	added, removed := diff(existing, updated)
+	if len(added) != 1 {
+		t.Fatalf("Diff failed for an Add update. Expecting 1 element, but got %d elements", len(added))
+	}
+	if added[0].String() != addedIP {
+		t.Fatalf("Expecting : %v, Got : %v", addedIP, added[0])
+	}
+	if len(removed) > 0 {
+		t.Fatalf("Diff failed for remove use-case. Expecting 0 element, but got %d elements", len(removed))
+	}
+
+	updated = mapset.NewSetFromSlice([]interface{}{addedIP})
+	added, removed = diff(existing, updated)
+	if len(removed) != 2 {
+		t.Fatalf("Diff failed for an remove update. Expecting 2 element, but got %d elements", len(removed))
+	}
+	if len(added) != 1 {
+		t.Fatalf("Diff failed for add use-case. Expecting 1 element, but got %d elements", len(added))
+	}
+}
+
+func TestAddedCallback(t *testing.T) {
+	hd := hostDiscovery{}
+	hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1"})
+	update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}, &discovery.Entry{Host: "2.2.2.2", Port: "0"}}
+
+	added := false
+	removed := false
+	hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
+	if !added {
+		t.Fatalf("Expecting a Added callback notification. But none received")
+	}
+}
+
+func TestRemovedCallback(t *testing.T) {
+	hd := hostDiscovery{}
+	hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
+	update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}}
+
+	added := false
+	removed := false
+	hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
+	if !removed {
+		t.Fatalf("Expecting a Removed callback notification. But none received")
+	}
+}
+
+func TestNoCallback(t *testing.T) {
+	hd := hostDiscovery{}
+	hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
+	update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}, &discovery.Entry{Host: "2.2.2.2", Port: "0"}}
+
+	added := false
+	removed := false
+	hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
+	if added || removed {
+		t.Fatalf("Not expecting any callback notification. But received a callback")
+	}
+}

+ 6 - 0
libnetwork/hostdiscovery/libnetwork.toml

@@ -0,0 +1,6 @@
+title = "LibNetwork Configuration file"
+
+[cluster]
+  discovery = "token://08469efb104bce980931ed24c8eb03a2"
+  Address = "1.1.1.1"
+  Heartbeat = 3

+ 2 - 1
libnetwork/network.go

@@ -6,6 +6,7 @@ import (
 	"sync"
 
 	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/netlabel"
 	"github.com/docker/libnetwork/options"
@@ -79,7 +80,7 @@ func (n *network) Type() string {
 }
 
 func (n *network) Key() []string {
-	return []string{"network", string(n.id)}
+	return []string{datastore.NetworkKeyPrefix, string(n.id)}
 }
 
 func (n *network) Value() []byte {