diff --git a/libnetwork/agent.go b/libnetwork/agent.go new file mode 100644 index 0000000000..825971077f --- /dev/null +++ b/libnetwork/agent.go @@ -0,0 +1,343 @@ +package libnetwork + +import ( + "fmt" + "net" + "os" + "strings" + + "github.com/Sirupsen/logrus" + "github.com/docker/go-events" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/discoverapi" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/networkdb" +) + +type agent struct { + networkDB *networkdb.NetworkDB + bindAddr string + epTblCancel func() + driverCancelFuncs map[string][]func() +} + +func getBindAddr(ifaceName string) (string, error) { + iface, err := net.InterfaceByName(ifaceName) + if err != nil { + return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err) + } + + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("failed to get interface addresses: %v", err) + } + + for _, a := range addrs { + addr, ok := a.(*net.IPNet) + if !ok { + continue + } + addrIP := addr.IP + + if addrIP.IsLinkLocalUnicast() { + continue + } + + return addrIP.String(), nil + } + + return "", fmt.Errorf("failed to get bind address") +} + +func resolveAddr(addrOrInterface string) (string, error) { + // Try and see if this is a valid IP address + if net.ParseIP(addrOrInterface) != nil { + return addrOrInterface, nil + } + + // If not a valid IP address, it should be a valid interface + return getBindAddr(addrOrInterface) +} + +func (c *controller) agentInit(bindAddrOrInterface string) error { + if !c.cfg.Daemon.IsAgent { + return nil + } + + bindAddr, err := resolveAddr(bindAddrOrInterface) + if err != nil { + return err + } + + hostname, _ := os.Hostname() + nDB, err := networkdb.New(&networkdb.Config{ + BindAddr: bindAddr, + NodeName: hostname, + }) + + if err != nil { + return err + } + + ch, cancel := nDB.Watch("endpoint_table", "", "") + + c.agent = &agent{ + networkDB: nDB, + bindAddr: bindAddr, + epTblCancel: cancel, + driverCancelFuncs: make(map[string][]func()), + } + + go c.handleTableEvents(ch, c.handleEpTableEvent) + return nil +} + +func (c *controller) agentJoin(remotes []string) error { + if c.agent == nil { + return nil + } + + return c.agent.networkDB.Join(remotes) +} + +func (c *controller) agentDriverNotify(d driverapi.Driver) { + if c.agent == nil { + return + } + + d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{ + Address: c.agent.bindAddr, + Self: true, + }) +} + +func (c *controller) agentClose() { + if c.agent == nil { + return + } + + for _, cancelFuncs := range c.agent.driverCancelFuncs { + for _, cancel := range cancelFuncs { + cancel() + } + } + c.agent.epTblCancel() + + c.agent.networkDB.Close() +} + +func (n *network) isClusterEligible() bool { + if n.driverScope() != datastore.GlobalScope { + return false + } + + c := n.getController() + if c.agent == nil { + return false + } + + return true +} + +func (n *network) joinCluster() error { + if !n.isClusterEligible() { + return nil + } + + c := n.getController() + return c.agent.networkDB.JoinNetwork(n.ID()) +} + +func (n *network) leaveCluster() error { + if !n.isClusterEligible() { + return nil + } + + c := n.getController() + return c.agent.networkDB.LeaveNetwork(n.ID()) +} + +func (ep *endpoint) addToCluster() error { + n := ep.getNetwork() + if !n.isClusterEligible() { + return nil + } + + c := n.getController() + if !ep.isAnonymous() && ep.Iface().Address() != nil { + if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil { + return err + } + } + + for _, te := range ep.joinInfo.driverTableEntries { + if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil { + return err + } + } + + return nil +} + +func (ep *endpoint) deleteFromCluster() error { + n := ep.getNetwork() + if !n.isClusterEligible() { + return nil + } + + c := n.getController() + if !ep.isAnonymous() { + if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { + return err + } + } + + if ep.joinInfo == nil { + return nil + } + + for _, te := range ep.joinInfo.driverTableEntries { + if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil { + return err + } + } + + return nil +} + +func (n *network) addDriverWatches() { + if !n.isClusterEligible() { + return + } + + c := n.getController() + for _, tableName := range n.driverTables { + ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "") + c.Lock() + c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel) + c.Unlock() + + go c.handleTableEvents(ch, n.handleDriverTableEvent) + d, err := n.driver(false) + if err != nil { + logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err) + return + } + + c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool { + d.EventNotify(driverapi.Create, n.ID(), tableName, key, value) + return false + }) + } +} + +func (n *network) cancelDriverWatches() { + if !n.isClusterEligible() { + return + } + + c := n.getController() + c.Lock() + cancelFuncs := c.agent.driverCancelFuncs[n.ID()] + delete(c.agent.driverCancelFuncs, n.ID()) + c.Unlock() + + for _, cancel := range cancelFuncs { + cancel() + } +} + +func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) { + for { + select { + case ev, ok := <-ch: + if !ok { + return + } + + fn(ev) + } + } +} + +func (n *network) handleDriverTableEvent(ev events.Event) { + d, err := n.driver(false) + if err != nil { + logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err) + return + } + + var ( + etype driverapi.EventType + tname string + key string + value []byte + ) + + switch event := ev.(type) { + case networkdb.CreateEvent: + tname = event.Table + key = event.Key + value = event.Value + etype = driverapi.Create + case networkdb.DeleteEvent: + tname = event.Table + key = event.Key + value = event.Value + etype = driverapi.Delete + case networkdb.UpdateEvent: + tname = event.Table + key = event.Key + value = event.Value + etype = driverapi.Delete + } + + d.EventNotify(etype, n.ID(), tname, key, value) +} + +func (c *controller) handleEpTableEvent(ev events.Event) { + var ( + id string + value string + isAdd bool + ) + + switch event := ev.(type) { + case networkdb.CreateEvent: + id = event.NetworkID + value = string(event.Value) + isAdd = true + case networkdb.DeleteEvent: + id = event.NetworkID + value = string(event.Value) + case networkdb.UpdateEvent: + logrus.Errorf("Unexpected update service table event = %#v", event) + } + + nw, err := c.NetworkByID(id) + if err != nil { + logrus.Errorf("Could not find network %s while handling service table event: %v", id, err) + return + } + n := nw.(*network) + + pair := strings.Split(value, "=") + if len(pair) < 2 { + logrus.Errorf("Incorrect service table value = %s", value) + return + } + + name := pair[0] + ip := net.ParseIP(pair[1]) + + if name == "" || ip == nil { + logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value) + return + } + + if isAdd { + n.addSvcRecords(name, ip, nil, true) + } else { + n.deleteSvcRecords(name, ip, nil, true) + } +} diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 8da92f7a0e..62d9993a90 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -22,9 +22,12 @@ type Config struct { // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { Debug bool + IsAgent bool DataDir string DefaultNetwork string DefaultDriver string + Bind string + Neighbors []string Labels []string DriverCfg map[string]interface{} } @@ -81,6 +84,27 @@ func ParseConfigOptions(cfgOptions ...Option) *Config { // to the controller type Option func(c *Config) +// OptionBind function returns an option setter for setting a bind interface or address +func OptionBind(bind string) Option { + return func(c *Config) { + c.Daemon.Bind = bind + } +} + +// OptionAgent function returns an option setter for setting agent mode +func OptionAgent() Option { + return func(c *Config) { + c.Daemon.IsAgent = true + } +} + +// OptionNeighbors function returns an option setter for setting a list of neighbors to join. +func OptionNeighbors(neighbors []string) Option { + return func(c *Config) { + c.Daemon.Neighbors = neighbors + } +} + // OptionDefaultNetwork function returns an option setter for a default network func OptionDefaultNetwork(dn string) Option { return func(c *Config) { diff --git a/libnetwork/controller.go b/libnetwork/controller.go index fbf9daed3d..07cb024f9d 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -136,6 +136,7 @@ type controller struct { nmap map[string]*netWatch defOsSbox osl.Sandbox sboxOnce sync.Once + agent *agent sync.Mutex } @@ -153,6 +154,14 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { svcDb: make(map[string]svcInfo), } + if err := c.agentInit(c.cfg.Daemon.Bind); err != nil { + return nil, err + } + + if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil { + return nil, err + } + if err := c.initStores(); err != nil { return nil, err } @@ -235,6 +244,28 @@ func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { var procReloadConfig = make(chan (bool), 1) +func (c *controller) processAgentConfig(cfg *config.Config) (bool, error) { + if c.cfg.Daemon.IsAgent == cfg.Daemon.IsAgent { + // Agent configuration not changed + return false, nil + } + + c.Lock() + c.cfg = cfg + c.Unlock() + + if err := c.agentInit(c.cfg.Daemon.Bind); err != nil { + return false, err + } + + if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil { + c.agentClose() + return false, err + } + + return true, nil +} + func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { procReloadConfig <- true defer func() { <-procReloadConfig }() @@ -243,6 +274,16 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { // Refuse the configuration if it alters an existing datastore client configuration. update := false cfg := config.ParseConfigOptions(cfgOptions...) + + isAgentConfig, err := c.processAgentConfig(cfg) + if err != nil { + return err + } + + if isAgentConfig { + return nil + } + for s := range c.cfg.Scopes { if _, ok := cfg.Scopes[s]; !ok { return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client") @@ -265,16 +306,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { return nil } - c.Lock() - c.cfg = cfg - c.Unlock() - - if c.discovery == nil && c.cfg.Cluster.Watcher != nil { - if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil { - log.Errorf("Failed to Initialize Discovery after configuration update: %v", err) - } - } - var dsConfig *discoverapi.DatastoreConfigData for scope, sCfg := range cfg.Scopes { if scope == datastore.LocalScope || !sCfg.IsValid() { @@ -308,6 +339,12 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { return false }) + if c.discovery == nil && c.cfg.Cluster.Watcher != nil { + if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil { + log.Errorf("Failed to Initialize Discovery after configuration update: %v", err) + } + } + return nil } @@ -421,6 +458,7 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, c.pushNodeDiscovery(driver, capability, hd.Fetch(), true) } + c.agentDriverNotify(driver) return nil } @@ -465,7 +503,8 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... } }() - if err = c.addNetwork(network); err != nil { + err = c.addNetwork(network) + if err != nil { return nil, err } defer func() { @@ -496,6 +535,12 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... return nil, err } + if err = network.joinCluster(); err != nil { + log.Errorf("Failed to join network %s into agent cluster: %v", name, err) + } + + network.addDriverWatches() + return network, nil } @@ -506,7 +551,7 @@ func (c *controller) addNetwork(n *network) error { } // Create the network - if err := d.CreateNetwork(n.id, n.generic, nil, n.getIPData(4), n.getIPData(6)); err != nil { + if err := d.CreateNetwork(n.id, n.generic, n, n.getIPData(4), n.getIPData(6)); err != nil { return err } diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index f6666bf2d4..e4d487adaa 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -147,10 +147,14 @@ func (d *driver) nodeJoin(node string, self bool) { d.Lock() d.bindAddress = node d.Unlock() - err := d.serfInit() - if err != nil { - logrus.Errorf("initializing serf instance failed: %v", err) - return + + // If there is no cluster store there is no need to start serf. + if d.store != nil { + err := d.serfInit() + if err != nil { + logrus.Errorf("initializing serf instance failed: %v", err) + return + } } } diff --git a/libnetwork/drivers/overlay/overlay_test.go b/libnetwork/drivers/overlay/overlay_test.go index e29530ba42..1d6012904f 100644 --- a/libnetwork/drivers/overlay/overlay_test.go +++ b/libnetwork/drivers/overlay/overlay_test.go @@ -5,11 +5,18 @@ import ( "testing" "time" + "github.com/docker/libkv/store/consul" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/discoverapi" "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/netlabel" _ "github.com/docker/libnetwork/testutils" ) +func init() { + consul.Register() +} + type driverTester struct { t *testing.T d *driver @@ -19,7 +26,14 @@ const testNetworkType = "overlay" func setupDriver(t *testing.T) *driverTester { dt := &driverTester{t: t} - if err := Init(dt, nil); err != nil { + config := make(map[string]interface{}) + config[netlabel.GlobalKVClient] = discoverapi.DatastoreConfigData{ + Scope: datastore.GlobalScope, + Provider: "consul", + Address: "127.0.0.01:8500", + } + + if err := Init(dt, config); err != nil { t.Fatal(err) } diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 4c126984b8..45516db8f7 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -446,6 +446,10 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { return err } + if e := ep.addToCluster(); e != nil { + log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e) + } + if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil { return sb.setupDefaultGW() } @@ -632,6 +636,10 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) return err } + if e := ep.deleteFromCluster(); e != nil { + log.Errorf("Could not delete state for endpoint %s from cluster: %v", ep.Name(), e) + } + sb.deleteHostsEntries(n.getSvcRecords(ep)) if !sb.inDelete && sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil { return sb.setupDefaultGW() diff --git a/libnetwork/endpoint_info.go b/libnetwork/endpoint_info.go index 187dffc565..cc7aa17a66 100644 --- a/libnetwork/endpoint_info.go +++ b/libnetwork/endpoint_info.go @@ -143,9 +143,16 @@ type endpointJoinInfo struct { gw net.IP gw6 net.IP StaticRoutes []*types.StaticRoute + driverTableEntries []*tableEntry disableGatewayService bool } +type tableEntry struct { + tableName string + key string + value []byte +} + func (ep *endpoint) Info() EndpointInfo { n, err := ep.getNetworkFromStore() if err != nil { @@ -293,6 +300,15 @@ func (ep *endpoint) AddStaticRoute(destination *net.IPNet, routeType int, nextHo } func (ep *endpoint) AddTableEntry(tableName, key string, value []byte) error { + ep.Lock() + defer ep.Unlock() + + ep.joinInfo.driverTableEntries = append(ep.joinInfo.driverTableEntries, &tableEntry{ + tableName: tableName, + key: key, + value: value, + }) + return nil } diff --git a/libnetwork/network.go b/libnetwork/network.go index 05b8370c30..07c0965859 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -171,6 +171,7 @@ type network struct { drvOnce *sync.Once internal bool inDelete bool + driverTables []string sync.Mutex } @@ -662,8 +663,15 @@ func (n *network) driver(load bool) (driverapi.Driver, error) { return nil, err } + c := n.getController() n.Lock() n.scope = cap.DataScope + if c.cfg.Daemon.IsAgent { + // If we are running in agent mode then all networks + // in libnetwork are local scope regardless of the + // backing driver. + n.scope = datastore.LocalScope + } n.Unlock() return d, nil } @@ -720,6 +728,12 @@ func (n *network) delete(force bool) error { return fmt.Errorf("error deleting network from store: %v", err) } + n.cancelDriverWatches() + + if err = n.leaveCluster(); err != nil { + log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err) + } + return nil } @@ -1424,3 +1438,11 @@ func (n *network) Labels() map[string]string { return lbls } + +func (n *network) TableEventRegister(tableName string) error { + n.Lock() + defer n.Unlock() + + n.driverTables = append(n.driverTables, tableName) + return nil +} diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index bfba59f698..317f1e5974 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -38,9 +38,11 @@ func (nDB *NetworkDB) clusterInit() error { config := memberlist.DefaultLANConfig() config.Name = nDB.config.NodeName config.BindAddr = nDB.config.BindAddr + if nDB.config.BindPort != 0 { config.BindPort = nDB.config.BindPort } + config.ProtocolVersion = memberlist.ProtocolVersionMax config.Delegate = &delegate{nDB: nDB} config.Events = &eventDelegate{nDB: nDB}