فهرست منبع

Add libnetwork agent mode support

libnetwork agent mode is a mode where libnetwork can act as a local
agent for network and discovery plumbing alone while the state
management is done elsewhere. This completes the support for making
libnetwork and its associated drivers to be completely independent of a
k/v store(if needed) and work purely based on the state information
passed along by some some external controller or manager. This does not
mean that libnetwork support for decentralized state management via a
k/v store is removed.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 سال پیش
والد
کامیت
0580043718

+ 343 - 0
libnetwork/agent.go

@@ -0,0 +1,343 @@
+package libnetwork
+
+import (
+	"fmt"
+	"net"
+	"os"
+	"strings"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/go-events"
+	"github.com/docker/libnetwork/datastore"
+	"github.com/docker/libnetwork/discoverapi"
+	"github.com/docker/libnetwork/driverapi"
+	"github.com/docker/libnetwork/networkdb"
+)
+
+type agent struct {
+	networkDB         *networkdb.NetworkDB
+	bindAddr          string
+	epTblCancel       func()
+	driverCancelFuncs map[string][]func()
+}
+
+func getBindAddr(ifaceName string) (string, error) {
+	iface, err := net.InterfaceByName(ifaceName)
+	if err != nil {
+		return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
+	}
+
+	addrs, err := iface.Addrs()
+	if err != nil {
+		return "", fmt.Errorf("failed to get interface addresses: %v", err)
+	}
+
+	for _, a := range addrs {
+		addr, ok := a.(*net.IPNet)
+		if !ok {
+			continue
+		}
+		addrIP := addr.IP
+
+		if addrIP.IsLinkLocalUnicast() {
+			continue
+		}
+
+		return addrIP.String(), nil
+	}
+
+	return "", fmt.Errorf("failed to get bind address")
+}
+
+func resolveAddr(addrOrInterface string) (string, error) {
+	// Try and see if this is a valid IP address
+	if net.ParseIP(addrOrInterface) != nil {
+		return addrOrInterface, nil
+	}
+
+	// If not a valid IP address, it should be a valid interface
+	return getBindAddr(addrOrInterface)
+}
+
+func (c *controller) agentInit(bindAddrOrInterface string) error {
+	if !c.cfg.Daemon.IsAgent {
+		return nil
+	}
+
+	bindAddr, err := resolveAddr(bindAddrOrInterface)
+	if err != nil {
+		return err
+	}
+
+	hostname, _ := os.Hostname()
+	nDB, err := networkdb.New(&networkdb.Config{
+		BindAddr: bindAddr,
+		NodeName: hostname,
+	})
+
+	if err != nil {
+		return err
+	}
+
+	ch, cancel := nDB.Watch("endpoint_table", "", "")
+
+	c.agent = &agent{
+		networkDB:         nDB,
+		bindAddr:          bindAddr,
+		epTblCancel:       cancel,
+		driverCancelFuncs: make(map[string][]func()),
+	}
+
+	go c.handleTableEvents(ch, c.handleEpTableEvent)
+	return nil
+}
+
+func (c *controller) agentJoin(remotes []string) error {
+	if c.agent == nil {
+		return nil
+	}
+
+	return c.agent.networkDB.Join(remotes)
+}
+
+func (c *controller) agentDriverNotify(d driverapi.Driver) {
+	if c.agent == nil {
+		return
+	}
+
+	d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
+		Address: c.agent.bindAddr,
+		Self:    true,
+	})
+}
+
+func (c *controller) agentClose() {
+	if c.agent == nil {
+		return
+	}
+
+	for _, cancelFuncs := range c.agent.driverCancelFuncs {
+		for _, cancel := range cancelFuncs {
+			cancel()
+		}
+	}
+	c.agent.epTblCancel()
+
+	c.agent.networkDB.Close()
+}
+
+func (n *network) isClusterEligible() bool {
+	if n.driverScope() != datastore.GlobalScope {
+		return false
+	}
+
+	c := n.getController()
+	if c.agent == nil {
+		return false
+	}
+
+	return true
+}
+
+func (n *network) joinCluster() error {
+	if !n.isClusterEligible() {
+		return nil
+	}
+
+	c := n.getController()
+	return c.agent.networkDB.JoinNetwork(n.ID())
+}
+
+func (n *network) leaveCluster() error {
+	if !n.isClusterEligible() {
+		return nil
+	}
+
+	c := n.getController()
+	return c.agent.networkDB.LeaveNetwork(n.ID())
+}
+
+func (ep *endpoint) addToCluster() error {
+	n := ep.getNetwork()
+	if !n.isClusterEligible() {
+		return nil
+	}
+
+	c := n.getController()
+	if !ep.isAnonymous() && ep.Iface().Address() != nil {
+		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
+			return err
+		}
+	}
+
+	for _, te := range ep.joinInfo.driverTableEntries {
+		if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (ep *endpoint) deleteFromCluster() error {
+	n := ep.getNetwork()
+	if !n.isClusterEligible() {
+		return nil
+	}
+
+	c := n.getController()
+	if !ep.isAnonymous() {
+		if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
+			return err
+		}
+	}
+
+	if ep.joinInfo == nil {
+		return nil
+	}
+
+	for _, te := range ep.joinInfo.driverTableEntries {
+		if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (n *network) addDriverWatches() {
+	if !n.isClusterEligible() {
+		return
+	}
+
+	c := n.getController()
+	for _, tableName := range n.driverTables {
+		ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
+		c.Lock()
+		c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
+		c.Unlock()
+
+		go c.handleTableEvents(ch, n.handleDriverTableEvent)
+		d, err := n.driver(false)
+		if err != nil {
+			logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
+			return
+		}
+
+		c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
+			d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
+			return false
+		})
+	}
+}
+
+func (n *network) cancelDriverWatches() {
+	if !n.isClusterEligible() {
+		return
+	}
+
+	c := n.getController()
+	c.Lock()
+	cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
+	delete(c.agent.driverCancelFuncs, n.ID())
+	c.Unlock()
+
+	for _, cancel := range cancelFuncs {
+		cancel()
+	}
+}
+
+func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
+	for {
+		select {
+		case ev, ok := <-ch:
+			if !ok {
+				return
+			}
+
+			fn(ev)
+		}
+	}
+}
+
+func (n *network) handleDriverTableEvent(ev events.Event) {
+	d, err := n.driver(false)
+	if err != nil {
+		logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
+		return
+	}
+
+	var (
+		etype driverapi.EventType
+		tname string
+		key   string
+		value []byte
+	)
+
+	switch event := ev.(type) {
+	case networkdb.CreateEvent:
+		tname = event.Table
+		key = event.Key
+		value = event.Value
+		etype = driverapi.Create
+	case networkdb.DeleteEvent:
+		tname = event.Table
+		key = event.Key
+		value = event.Value
+		etype = driverapi.Delete
+	case networkdb.UpdateEvent:
+		tname = event.Table
+		key = event.Key
+		value = event.Value
+		etype = driverapi.Delete
+	}
+
+	d.EventNotify(etype, n.ID(), tname, key, value)
+}
+
+func (c *controller) handleEpTableEvent(ev events.Event) {
+	var (
+		id    string
+		value string
+		isAdd bool
+	)
+
+	switch event := ev.(type) {
+	case networkdb.CreateEvent:
+		id = event.NetworkID
+		value = string(event.Value)
+		isAdd = true
+	case networkdb.DeleteEvent:
+		id = event.NetworkID
+		value = string(event.Value)
+	case networkdb.UpdateEvent:
+		logrus.Errorf("Unexpected update service table event = %#v", event)
+	}
+
+	nw, err := c.NetworkByID(id)
+	if err != nil {
+		logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
+		return
+	}
+	n := nw.(*network)
+
+	pair := strings.Split(value, "=")
+	if len(pair) < 2 {
+		logrus.Errorf("Incorrect service table value = %s", value)
+		return
+	}
+
+	name := pair[0]
+	ip := net.ParseIP(pair[1])
+
+	if name == "" || ip == nil {
+		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
+		return
+	}
+
+	if isAdd {
+		n.addSvcRecords(name, ip, nil, true)
+	} else {
+		n.deleteSvcRecords(name, ip, nil, true)
+	}
+}

+ 24 - 0
libnetwork/config/config.go

@@ -22,9 +22,12 @@ type Config struct {
 // DaemonCfg represents libnetwork core configuration
 type DaemonCfg struct {
 	Debug          bool
+	IsAgent        bool
 	DataDir        string
 	DefaultNetwork string
 	DefaultDriver  string
+	Bind           string
+	Neighbors      []string
 	Labels         []string
 	DriverCfg      map[string]interface{}
 }
@@ -81,6 +84,27 @@ func ParseConfigOptions(cfgOptions ...Option) *Config {
 // to the controller
 type Option func(c *Config)
 
+// OptionBind function returns an option setter for setting a bind interface or address
+func OptionBind(bind string) Option {
+	return func(c *Config) {
+		c.Daemon.Bind = bind
+	}
+}
+
+// OptionAgent function returns an option setter for setting agent mode
+func OptionAgent() Option {
+	return func(c *Config) {
+		c.Daemon.IsAgent = true
+	}
+}
+
+// OptionNeighbors function returns an option setter for setting a list of neighbors to join.
+func OptionNeighbors(neighbors []string) Option {
+	return func(c *Config) {
+		c.Daemon.Neighbors = neighbors
+	}
+}
+
 // OptionDefaultNetwork function returns an option setter for a default network
 func OptionDefaultNetwork(dn string) Option {
 	return func(c *Config) {

+ 57 - 12
libnetwork/controller.go

@@ -136,6 +136,7 @@ type controller struct {
 	nmap           map[string]*netWatch
 	defOsSbox      osl.Sandbox
 	sboxOnce       sync.Once
+	agent          *agent
 	sync.Mutex
 }
 
@@ -153,6 +154,14 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
 		svcDb:     make(map[string]svcInfo),
 	}
 
+	if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
+		return nil, err
+	}
+
+	if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
+		return nil, err
+	}
+
 	if err := c.initStores(); err != nil {
 		return nil, err
 	}
@@ -235,6 +244,28 @@ func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
 
 var procReloadConfig = make(chan (bool), 1)
 
+func (c *controller) processAgentConfig(cfg *config.Config) (bool, error) {
+	if c.cfg.Daemon.IsAgent == cfg.Daemon.IsAgent {
+		// Agent configuration not changed
+		return false, nil
+	}
+
+	c.Lock()
+	c.cfg = cfg
+	c.Unlock()
+
+	if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
+		return false, err
+	}
+
+	if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
+		c.agentClose()
+		return false, err
+	}
+
+	return true, nil
+}
+
 func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
 	procReloadConfig <- true
 	defer func() { <-procReloadConfig }()
@@ -243,6 +274,16 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
 	// Refuse the configuration if it alters an existing datastore client configuration.
 	update := false
 	cfg := config.ParseConfigOptions(cfgOptions...)
+
+	isAgentConfig, err := c.processAgentConfig(cfg)
+	if err != nil {
+		return err
+	}
+
+	if isAgentConfig {
+		return nil
+	}
+
 	for s := range c.cfg.Scopes {
 		if _, ok := cfg.Scopes[s]; !ok {
 			return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client")
@@ -265,16 +306,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
 		return nil
 	}
 
-	c.Lock()
-	c.cfg = cfg
-	c.Unlock()
-
-	if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
-		if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
-			log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
-		}
-	}
-
 	var dsConfig *discoverapi.DatastoreConfigData
 	for scope, sCfg := range cfg.Scopes {
 		if scope == datastore.LocalScope || !sCfg.IsValid() {
@@ -308,6 +339,12 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
 		return false
 	})
 
+	if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
+		if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
+			log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
+		}
+	}
+
 	return nil
 }
 
@@ -421,6 +458,7 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
 		c.pushNodeDiscovery(driver, capability, hd.Fetch(), true)
 	}
 
+	c.agentDriverNotify(driver)
 	return nil
 }
 
@@ -465,7 +503,8 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
 		}
 	}()
 
-	if err = c.addNetwork(network); err != nil {
+	err = c.addNetwork(network)
+	if err != nil {
 		return nil, err
 	}
 	defer func() {
@@ -496,6 +535,12 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
 		return nil, err
 	}
 
+	if err = network.joinCluster(); err != nil {
+		log.Errorf("Failed to join network %s into agent cluster: %v", name, err)
+	}
+
+	network.addDriverWatches()
+
 	return network, nil
 }
 
@@ -506,7 +551,7 @@ func (c *controller) addNetwork(n *network) error {
 	}
 
 	// Create the network
-	if err := d.CreateNetwork(n.id, n.generic, nil, n.getIPData(4), n.getIPData(6)); err != nil {
+	if err := d.CreateNetwork(n.id, n.generic, n, n.getIPData(4), n.getIPData(6)); err != nil {
 		return err
 	}
 

+ 8 - 4
libnetwork/drivers/overlay/overlay.go

@@ -147,10 +147,14 @@ func (d *driver) nodeJoin(node string, self bool) {
 		d.Lock()
 		d.bindAddress = node
 		d.Unlock()
-		err := d.serfInit()
-		if err != nil {
-			logrus.Errorf("initializing serf instance failed: %v", err)
-			return
+
+		// If there is no cluster store there is no need to start serf.
+		if d.store != nil {
+			err := d.serfInit()
+			if err != nil {
+				logrus.Errorf("initializing serf instance failed: %v", err)
+				return
+			}
 		}
 	}
 

+ 15 - 1
libnetwork/drivers/overlay/overlay_test.go

@@ -5,11 +5,18 @@ import (
 	"testing"
 	"time"
 
+	"github.com/docker/libkv/store/consul"
+	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
+	"github.com/docker/libnetwork/netlabel"
 	_ "github.com/docker/libnetwork/testutils"
 )
 
+func init() {
+	consul.Register()
+}
+
 type driverTester struct {
 	t *testing.T
 	d *driver
@@ -19,7 +26,14 @@ const testNetworkType = "overlay"
 
 func setupDriver(t *testing.T) *driverTester {
 	dt := &driverTester{t: t}
-	if err := Init(dt, nil); err != nil {
+	config := make(map[string]interface{})
+	config[netlabel.GlobalKVClient] = discoverapi.DatastoreConfigData{
+		Scope:    datastore.GlobalScope,
+		Provider: "consul",
+		Address:  "127.0.0.01:8500",
+	}
+
+	if err := Init(dt, config); err != nil {
 		t.Fatal(err)
 	}
 

+ 8 - 0
libnetwork/endpoint.go

@@ -446,6 +446,10 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
 		return err
 	}
 
+	if e := ep.addToCluster(); e != nil {
+		log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e)
+	}
+
 	if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
 		return sb.setupDefaultGW()
 	}
@@ -632,6 +636,10 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
 		return err
 	}
 
+	if e := ep.deleteFromCluster(); e != nil {
+		log.Errorf("Could not delete state for endpoint %s from cluster: %v", ep.Name(), e)
+	}
+
 	sb.deleteHostsEntries(n.getSvcRecords(ep))
 	if !sb.inDelete && sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
 		return sb.setupDefaultGW()

+ 16 - 0
libnetwork/endpoint_info.go

@@ -143,9 +143,16 @@ type endpointJoinInfo struct {
 	gw                    net.IP
 	gw6                   net.IP
 	StaticRoutes          []*types.StaticRoute
+	driverTableEntries    []*tableEntry
 	disableGatewayService bool
 }
 
+type tableEntry struct {
+	tableName string
+	key       string
+	value     []byte
+}
+
 func (ep *endpoint) Info() EndpointInfo {
 	n, err := ep.getNetworkFromStore()
 	if err != nil {
@@ -293,6 +300,15 @@ func (ep *endpoint) AddStaticRoute(destination *net.IPNet, routeType int, nextHo
 }
 
 func (ep *endpoint) AddTableEntry(tableName, key string, value []byte) error {
+	ep.Lock()
+	defer ep.Unlock()
+
+	ep.joinInfo.driverTableEntries = append(ep.joinInfo.driverTableEntries, &tableEntry{
+		tableName: tableName,
+		key:       key,
+		value:     value,
+	})
+
 	return nil
 }
 

+ 22 - 0
libnetwork/network.go

@@ -171,6 +171,7 @@ type network struct {
 	drvOnce      *sync.Once
 	internal     bool
 	inDelete     bool
+	driverTables []string
 	sync.Mutex
 }
 
@@ -662,8 +663,15 @@ func (n *network) driver(load bool) (driverapi.Driver, error) {
 		return nil, err
 	}
 
+	c := n.getController()
 	n.Lock()
 	n.scope = cap.DataScope
+	if c.cfg.Daemon.IsAgent {
+		// If we are running in agent mode then all networks
+		// in libnetwork are local scope regardless of the
+		// backing driver.
+		n.scope = datastore.LocalScope
+	}
 	n.Unlock()
 	return d, nil
 }
@@ -720,6 +728,12 @@ func (n *network) delete(force bool) error {
 		return fmt.Errorf("error deleting network from store: %v", err)
 	}
 
+	n.cancelDriverWatches()
+
+	if err = n.leaveCluster(); err != nil {
+		log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
+	}
+
 	return nil
 }
 
@@ -1424,3 +1438,11 @@ func (n *network) Labels() map[string]string {
 
 	return lbls
 }
+
+func (n *network) TableEventRegister(tableName string) error {
+	n.Lock()
+	defer n.Unlock()
+
+	n.driverTables = append(n.driverTables, tableName)
+	return nil
+}

+ 2 - 0
libnetwork/networkdb/cluster.go

@@ -38,9 +38,11 @@ func (nDB *NetworkDB) clusterInit() error {
 	config := memberlist.DefaultLANConfig()
 	config.Name = nDB.config.NodeName
 	config.BindAddr = nDB.config.BindAddr
+
 	if nDB.config.BindPort != 0 {
 		config.BindPort = nDB.config.BindPort
 	}
+
 	config.ProtocolVersion = memberlist.ProtocolVersionMax
 	config.Delegate = &delegate{nDB: nDB}
 	config.Events = &eventDelegate{nDB: nDB}