瀏覽代碼

Merge pull request #1626 from mavenugo/nd

Generating node discovery events to the drivers from networkdb
Santhosh Manohar 8 年之前
父節點
當前提交
4e4787fa31
共有 4 個文件被更改,包括 65 次插入2 次删除
  1. 28 0
      libnetwork/agent.go
  2. 6 0
      libnetwork/controller.go
  3. 18 1
      libnetwork/networkdb/event_delegate.go
  4. 13 1
      libnetwork/networkdb/watch.go

+ 28 - 0
libnetwork/agent.go

@@ -3,6 +3,7 @@ package libnetwork
 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
 
 import (
+	"encoding/json"
 	"fmt"
 	"net"
 	"os"
@@ -285,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
 	}
 
 	ch, cancel := nDB.Watch("endpoint_table", "", "")
+	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
 
 	c.Lock()
 	c.agent = &agent{
@@ -297,6 +299,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
 	c.Unlock()
 
 	go c.handleTableEvents(ch, c.handleEpTableEvent)
+	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
 
 	drvEnc := discoverapi.DriverEncryptionConfig{}
 	keys, tags = c.getKeys(subsysIPSec)
@@ -634,6 +637,31 @@ func (n *network) handleDriverTableEvent(ev events.Event) {
 	d.EventNotify(etype, n.ID(), tname, key, value)
 }
 
+func (c *controller) handleNodeTableEvent(ev events.Event) {
+	var (
+		value    []byte
+		isAdd    bool
+		nodeAddr networkdb.NodeAddr
+	)
+	switch event := ev.(type) {
+	case networkdb.CreateEvent:
+		value = event.Value
+		isAdd = true
+	case networkdb.DeleteEvent:
+		value = event.Value
+	case networkdb.UpdateEvent:
+		logrus.Errorf("Unexpected update node table event = %#v", event)
+	}
+
+	err := json.Unmarshal(value, &nodeAddr)
+	if err != nil {
+		logrus.Errorf("Error unmarshalling node table event %v", err)
+		return
+	}
+	c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
+
+}
+
 func (c *controller) handleEpTableEvent(ev events.Event) {
 	var (
 		nid   string

+ 6 - 0
libnetwork/controller.go

@@ -567,6 +567,12 @@ func (c *controller) pushNodeDiscovery(d driverapi.Driver, cap driverapi.Capabil
 	if c.cfg != nil {
 		addr := strings.Split(c.cfg.Cluster.Address, ":")
 		self = net.ParseIP(addr[0])
+		// if external kvstore is not configured, try swarm-mode config
+		if self == nil {
+			if agent := c.getAgent(); agent != nil {
+				self = net.ParseIP(agent.advertiseAddr)
+			}
+		}
 	}
 
 	if d == nil || cap.DataScope != datastore.GlobalScope || nodes == nil {

+ 18 - 1
libnetwork/networkdb/event_delegate.go

@@ -1,12 +1,28 @@
 package networkdb
 
-import "github.com/hashicorp/memberlist"
+import (
+	"encoding/json"
+	"net"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/hashicorp/memberlist"
+)
 
 type eventDelegate struct {
 	nDB *NetworkDB
 }
 
+func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
+	value, err := json.Marshal(&NodeAddr{addr})
+	if err == nil {
+		e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
+	} else {
+		logrus.Errorf("Error marshalling node broadcast event %s", addr.String())
+	}
+}
+
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
+	e.broadcastNodeEvent(mn.Addr, opCreate)
 	e.nDB.Lock()
 	// In case the node is rejoining after a failure or leave,
 	// wait until an explicit join message arrives before adding
@@ -24,6 +40,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
 }
 
 func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
+	e.broadcastNodeEvent(mn.Addr, opDelete)
 	e.nDB.deleteNodeTableEntries(mn.Name)
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
 	e.nDB.Lock()

+ 13 - 1
libnetwork/networkdb/watch.go

@@ -1,6 +1,10 @@
 package networkdb
 
-import "github.com/docker/go-events"
+import (
+	"net"
+
+	"github.com/docker/go-events"
+)
 
 type opType uint8
 
@@ -17,6 +21,14 @@ type event struct {
 	Value     []byte
 }
 
+// NodeTable represents table event for node join and leave
+const NodeTable = "NodeTable"
+
+// NodeAddr represents the value carried for node event in NodeTable
+type NodeAddr struct {
+	Addr net.IP
+}
+
 // CreateEvent generates a table entry create event to the watchers
 type CreateEvent event