فهرست منبع

Generating node discovery events to the drivers from networkdb

With the introduction of networkdb, the node discovery events were not
sent to the drivers. This commit generates the node discovery events and
sents it to the drivers interested in it.

Signed-off-by: Madhu Venugopal <madhu@docker.com>
Madhu Venugopal 8 سال پیش
والد
کامیت
bb560a1f44
4فایلهای تغییر یافته به همراه65 افزوده شده و 2 حذف شده
  1. 28 0
      libnetwork/agent.go
  2. 6 0
      libnetwork/controller.go
  3. 18 1
      libnetwork/networkdb/event_delegate.go
  4. 13 1
      libnetwork/networkdb/watch.go

+ 28 - 0
libnetwork/agent.go

@@ -3,6 +3,7 @@ package libnetwork
 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
 
 
 import (
 import (
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"net"
 	"net"
 	"os"
 	"os"
@@ -285,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
 	}
 	}
 
 
 	ch, cancel := nDB.Watch("endpoint_table", "", "")
 	ch, cancel := nDB.Watch("endpoint_table", "", "")
+	nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
 
 
 	c.Lock()
 	c.Lock()
 	c.agent = &agent{
 	c.agent = &agent{
@@ -297,6 +299,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
 	c.Unlock()
 	c.Unlock()
 
 
 	go c.handleTableEvents(ch, c.handleEpTableEvent)
 	go c.handleTableEvents(ch, c.handleEpTableEvent)
+	go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
 
 
 	drvEnc := discoverapi.DriverEncryptionConfig{}
 	drvEnc := discoverapi.DriverEncryptionConfig{}
 	keys, tags = c.getKeys(subsysIPSec)
 	keys, tags = c.getKeys(subsysIPSec)
@@ -634,6 +637,31 @@ func (n *network) handleDriverTableEvent(ev events.Event) {
 	d.EventNotify(etype, n.ID(), tname, key, value)
 	d.EventNotify(etype, n.ID(), tname, key, value)
 }
 }
 
 
+func (c *controller) handleNodeTableEvent(ev events.Event) {
+	var (
+		value    []byte
+		isAdd    bool
+		nodeAddr networkdb.NodeAddr
+	)
+	switch event := ev.(type) {
+	case networkdb.CreateEvent:
+		value = event.Value
+		isAdd = true
+	case networkdb.DeleteEvent:
+		value = event.Value
+	case networkdb.UpdateEvent:
+		logrus.Errorf("Unexpected update node table event = %#v", event)
+	}
+
+	err := json.Unmarshal(value, &nodeAddr)
+	if err != nil {
+		logrus.Errorf("Error unmarshalling node table event %v", err)
+		return
+	}
+	c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
+
+}
+
 func (c *controller) handleEpTableEvent(ev events.Event) {
 func (c *controller) handleEpTableEvent(ev events.Event) {
 	var (
 	var (
 		nid   string
 		nid   string

+ 6 - 0
libnetwork/controller.go

@@ -567,6 +567,12 @@ func (c *controller) pushNodeDiscovery(d driverapi.Driver, cap driverapi.Capabil
 	if c.cfg != nil {
 	if c.cfg != nil {
 		addr := strings.Split(c.cfg.Cluster.Address, ":")
 		addr := strings.Split(c.cfg.Cluster.Address, ":")
 		self = net.ParseIP(addr[0])
 		self = net.ParseIP(addr[0])
+		// if external kvstore is not configured, try swarm-mode config
+		if self == nil {
+			if agent := c.getAgent(); agent != nil {
+				self = net.ParseIP(agent.advertiseAddr)
+			}
+		}
 	}
 	}
 
 
 	if d == nil || cap.DataScope != datastore.GlobalScope || nodes == nil {
 	if d == nil || cap.DataScope != datastore.GlobalScope || nodes == nil {

+ 18 - 1
libnetwork/networkdb/event_delegate.go

@@ -1,12 +1,28 @@
 package networkdb
 package networkdb
 
 
-import "github.com/hashicorp/memberlist"
+import (
+	"encoding/json"
+	"net"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/hashicorp/memberlist"
+)
 
 
 type eventDelegate struct {
 type eventDelegate struct {
 	nDB *NetworkDB
 	nDB *NetworkDB
 }
 }
 
 
+func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
+	value, err := json.Marshal(&NodeAddr{addr})
+	if err == nil {
+		e.nDB.broadcaster.Write(makeEvent(op, NodeTable, "", "", value))
+	} else {
+		logrus.Errorf("Error marshalling node broadcast event %s", addr.String())
+	}
+}
+
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
+	e.broadcastNodeEvent(mn.Addr, opCreate)
 	e.nDB.Lock()
 	e.nDB.Lock()
 	// In case the node is rejoining after a failure or leave,
 	// In case the node is rejoining after a failure or leave,
 	// wait until an explicit join message arrives before adding
 	// wait until an explicit join message arrives before adding
@@ -24,6 +40,7 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
 }
 }
 
 
 func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
 func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
+	e.broadcastNodeEvent(mn.Addr, opDelete)
 	e.nDB.deleteNodeTableEntries(mn.Name)
 	e.nDB.deleteNodeTableEntries(mn.Name)
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
 	e.nDB.Lock()
 	e.nDB.Lock()

+ 13 - 1
libnetwork/networkdb/watch.go

@@ -1,6 +1,10 @@
 package networkdb
 package networkdb
 
 
-import "github.com/docker/go-events"
+import (
+	"net"
+
+	"github.com/docker/go-events"
+)
 
 
 type opType uint8
 type opType uint8
 
 
@@ -17,6 +21,14 @@ type event struct {
 	Value     []byte
 	Value     []byte
 }
 }
 
 
+// NodeTable represents table event for node join and leave
+const NodeTable = "NodeTable"
+
+// NodeAddr represents the value carried for node event in NodeTable
+type NodeAddr struct {
+	Addr net.IP
+}
+
 // CreateEvent generates a table entry create event to the watchers
 // CreateEvent generates a table entry create event to the watchers
 type CreateEvent event
 type CreateEvent event