|
@@ -0,0 +1,343 @@
|
|
|
+package libnetwork
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "net"
|
|
|
+ "os"
|
|
|
+ "strings"
|
|
|
+
|
|
|
+ "github.com/Sirupsen/logrus"
|
|
|
+ "github.com/docker/go-events"
|
|
|
+ "github.com/docker/libnetwork/datastore"
|
|
|
+ "github.com/docker/libnetwork/discoverapi"
|
|
|
+ "github.com/docker/libnetwork/driverapi"
|
|
|
+ "github.com/docker/libnetwork/networkdb"
|
|
|
+)
|
|
|
+
|
|
|
+type agent struct {
|
|
|
+ networkDB *networkdb.NetworkDB
|
|
|
+ bindAddr string
|
|
|
+ epTblCancel func()
|
|
|
+ driverCancelFuncs map[string][]func()
|
|
|
+}
|
|
|
+
|
|
|
+func getBindAddr(ifaceName string) (string, error) {
|
|
|
+ iface, err := net.InterfaceByName(ifaceName)
|
|
|
+ if err != nil {
|
|
|
+ return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ addrs, err := iface.Addrs()
|
|
|
+ if err != nil {
|
|
|
+ return "", fmt.Errorf("failed to get interface addresses: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, a := range addrs {
|
|
|
+ addr, ok := a.(*net.IPNet)
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ addrIP := addr.IP
|
|
|
+
|
|
|
+ if addrIP.IsLinkLocalUnicast() {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ return addrIP.String(), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return "", fmt.Errorf("failed to get bind address")
|
|
|
+}
|
|
|
+
|
|
|
+func resolveAddr(addrOrInterface string) (string, error) {
|
|
|
+ // Try and see if this is a valid IP address
|
|
|
+ if net.ParseIP(addrOrInterface) != nil {
|
|
|
+ return addrOrInterface, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // If not a valid IP address, it should be a valid interface
|
|
|
+ return getBindAddr(addrOrInterface)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) agentInit(bindAddrOrInterface string) error {
|
|
|
+ if !c.cfg.Daemon.IsAgent {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ bindAddr, err := resolveAddr(bindAddrOrInterface)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ hostname, _ := os.Hostname()
|
|
|
+ nDB, err := networkdb.New(&networkdb.Config{
|
|
|
+ BindAddr: bindAddr,
|
|
|
+ NodeName: hostname,
|
|
|
+ })
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ ch, cancel := nDB.Watch("endpoint_table", "", "")
|
|
|
+
|
|
|
+ c.agent = &agent{
|
|
|
+ networkDB: nDB,
|
|
|
+ bindAddr: bindAddr,
|
|
|
+ epTblCancel: cancel,
|
|
|
+ driverCancelFuncs: make(map[string][]func()),
|
|
|
+ }
|
|
|
+
|
|
|
+ go c.handleTableEvents(ch, c.handleEpTableEvent)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) agentJoin(remotes []string) error {
|
|
|
+ if c.agent == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return c.agent.networkDB.Join(remotes)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) agentDriverNotify(d driverapi.Driver) {
|
|
|
+ if c.agent == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
|
|
|
+ Address: c.agent.bindAddr,
|
|
|
+ Self: true,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) agentClose() {
|
|
|
+ if c.agent == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, cancelFuncs := range c.agent.driverCancelFuncs {
|
|
|
+ for _, cancel := range cancelFuncs {
|
|
|
+ cancel()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ c.agent.epTblCancel()
|
|
|
+
|
|
|
+ c.agent.networkDB.Close()
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) isClusterEligible() bool {
|
|
|
+ if n.driverScope() != datastore.GlobalScope {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ if c.agent == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) joinCluster() error {
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ return c.agent.networkDB.JoinNetwork(n.ID())
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) leaveCluster() error {
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ return c.agent.networkDB.LeaveNetwork(n.ID())
|
|
|
+}
|
|
|
+
|
|
|
+func (ep *endpoint) addToCluster() error {
|
|
|
+ n := ep.getNetwork()
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ if !ep.isAnonymous() && ep.Iface().Address() != nil {
|
|
|
+ if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, te := range ep.joinInfo.driverTableEntries {
|
|
|
+ if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ep *endpoint) deleteFromCluster() error {
|
|
|
+ n := ep.getNetwork()
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ if !ep.isAnonymous() {
|
|
|
+ if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ep.joinInfo == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, te := range ep.joinInfo.driverTableEntries {
|
|
|
+ if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) addDriverWatches() {
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ for _, tableName := range n.driverTables {
|
|
|
+ ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
|
|
|
+ c.Lock()
|
|
|
+ c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
|
|
|
+ c.Unlock()
|
|
|
+
|
|
|
+ go c.handleTableEvents(ch, n.handleDriverTableEvent)
|
|
|
+ d, err := n.driver(false)
|
|
|
+ if err != nil {
|
|
|
+ logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
|
|
|
+ d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
|
|
|
+ return false
|
|
|
+ })
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) cancelDriverWatches() {
|
|
|
+ if !n.isClusterEligible() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ c := n.getController()
|
|
|
+ c.Lock()
|
|
|
+ cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
|
|
|
+ delete(c.agent.driverCancelFuncs, n.ID())
|
|
|
+ c.Unlock()
|
|
|
+
|
|
|
+ for _, cancel := range cancelFuncs {
|
|
|
+ cancel()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case ev, ok := <-ch:
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ fn(ev)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (n *network) handleDriverTableEvent(ev events.Event) {
|
|
|
+ d, err := n.driver(false)
|
|
|
+ if err != nil {
|
|
|
+ logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ etype driverapi.EventType
|
|
|
+ tname string
|
|
|
+ key string
|
|
|
+ value []byte
|
|
|
+ )
|
|
|
+
|
|
|
+ switch event := ev.(type) {
|
|
|
+ case networkdb.CreateEvent:
|
|
|
+ tname = event.Table
|
|
|
+ key = event.Key
|
|
|
+ value = event.Value
|
|
|
+ etype = driverapi.Create
|
|
|
+ case networkdb.DeleteEvent:
|
|
|
+ tname = event.Table
|
|
|
+ key = event.Key
|
|
|
+ value = event.Value
|
|
|
+ etype = driverapi.Delete
|
|
|
+ case networkdb.UpdateEvent:
|
|
|
+ tname = event.Table
|
|
|
+ key = event.Key
|
|
|
+ value = event.Value
|
|
|
+ etype = driverapi.Delete
|
|
|
+ }
|
|
|
+
|
|
|
+ d.EventNotify(etype, n.ID(), tname, key, value)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *controller) handleEpTableEvent(ev events.Event) {
|
|
|
+ var (
|
|
|
+ id string
|
|
|
+ value string
|
|
|
+ isAdd bool
|
|
|
+ )
|
|
|
+
|
|
|
+ switch event := ev.(type) {
|
|
|
+ case networkdb.CreateEvent:
|
|
|
+ id = event.NetworkID
|
|
|
+ value = string(event.Value)
|
|
|
+ isAdd = true
|
|
|
+ case networkdb.DeleteEvent:
|
|
|
+ id = event.NetworkID
|
|
|
+ value = string(event.Value)
|
|
|
+ case networkdb.UpdateEvent:
|
|
|
+ logrus.Errorf("Unexpected update service table event = %#v", event)
|
|
|
+ }
|
|
|
+
|
|
|
+ nw, err := c.NetworkByID(id)
|
|
|
+ if err != nil {
|
|
|
+ logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ n := nw.(*network)
|
|
|
+
|
|
|
+ pair := strings.Split(value, "=")
|
|
|
+ if len(pair) < 2 {
|
|
|
+ logrus.Errorf("Incorrect service table value = %s", value)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ name := pair[0]
|
|
|
+ ip := net.ParseIP(pair[1])
|
|
|
+
|
|
|
+ if name == "" || ip == nil {
|
|
|
+ logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if isAdd {
|
|
|
+ n.addSvcRecords(name, ip, nil, true)
|
|
|
+ } else {
|
|
|
+ n.deleteSvcRecords(name, ip, nil, true)
|
|
|
+ }
|
|
|
+}
|