Browse Source

Merge pull request #1467 from mrjana/networkdb

Purge stale nodes with same prefix and IP
Santhosh Manohar 8 năm trước cách đây
mục cha
commit
f9e11527ec

+ 7 - 7
libnetwork/networkdb/cluster.go

@@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() {
 	}
 	}
 	nDB.RUnlock()
 	nDB.RUnlock()
 
 
-	// Update all the local state to a new time to force update on
-	// the node we are trying to rejoin, just in case that node
-	// has these in leaving/deleting state still. This is
-	// facilitate fast convergence after recovering from a gossip
-	// failure.
-	nDB.updateLocalStateTime()
-
 	node := nodes[randomOffset(len(nodes))]
 	node := nodes[randomOffset(len(nodes))]
 	addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
 	addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
 
 
@@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() {
 		return
 		return
 	}
 	}
 
 
+	// Update all the local table state to a new time to
+	// force update on the node we are trying to rejoin, just in
+	// case that node has these in deleting state still. This is
+	// facilitate fast convergence after recovering from a gossip
+	// failure.
+	nDB.updateLocalTableTime()
+
 	logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
 	logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
 	nDB.bulkSync([]string{node.Name}, true)
 	nDB.bulkSync([]string{node.Name}, true)
 }
 }

+ 35 - 9
libnetwork/networkdb/delegate.go

@@ -3,6 +3,7 @@ package networkdb
 import (
 import (
 	"fmt"
 	"fmt"
 	"net"
 	"net"
+	"strings"
 	"time"
 	"time"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
@@ -31,7 +32,7 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
 				return nil
 				return nil
 			}
 			}
 
 
-			delete(nDB.failedNodes, n.Name)
+			delete(nodes, n.Name)
 			return n
 			return n
 		}
 		}
 	}
 	}
@@ -39,16 +40,36 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
 	return nil
 	return nil
 }
 }
 
 
-func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
-	// Update our local clock if the received messages has newer
-	// time.
-	nDB.networkClock.Witness(nEvent.LTime)
+func (nDB *NetworkDB) purgeSameNode(n *node) {
+	nDB.Lock()
+	defer nDB.Unlock()
 
 
+	prefix := strings.Split(n.Name, "-")[0]
+	for _, nodes := range []map[string]*node{
+		nDB.failedNodes,
+		nDB.leftNodes,
+		nDB.nodes,
+	} {
+		var nodeNames []string
+		for name, node := range nodes {
+			if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
+				nodeNames = append(nodeNames, name)
+			}
+		}
+
+		for _, name := range nodeNames {
+			delete(nodes, name)
+		}
+	}
+}
+
+func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 	n := nDB.checkAndGetNode(nEvent)
 	n := nDB.checkAndGetNode(nEvent)
 	if n == nil {
 	if n == nil {
 		return false
 		return false
 	}
 	}
 
 
+	nDB.purgeSameNode(n)
 	n.ltime = nEvent.LTime
 	n.ltime = nEvent.LTime
 
 
 	switch nEvent.Type {
 	switch nEvent.Type {
@@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
 }
 }
 
 
 func (d *delegate) LocalState(join bool) []byte {
 func (d *delegate) LocalState(join bool) []byte {
+	if join {
+		// Update all the local node/network state to a new time to
+		// force update on the node we are trying to rejoin, just in
+		// case that node has these in leaving state still. This is
+		// facilitate fast convergence after recovering from a gossip
+		// failure.
+		d.nDB.updateLocalNetworkTime()
+	}
+
 	d.nDB.RLock()
 	d.nDB.RLock()
 	defer d.nDB.RUnlock()
 	defer d.nDB.RUnlock()
 
 
@@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
 		return
 		return
 	}
 	}
 
 
-	if pp.LTime > 0 {
-		d.nDB.networkClock.Witness(pp.LTime)
-	}
-
 	nodeEvent := &NodeEvent{
 	nodeEvent := &NodeEvent{
 		LTime:    pp.LTime,
 		LTime:    pp.LTime,
 		NodeName: pp.NodeName,
 		NodeName: pp.NodeName,

+ 7 - 2
libnetwork/networkdb/networkdb.go

@@ -524,7 +524,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
 	return networks
 	return networks
 }
 }
 
 
-func (nDB *NetworkDB) updateLocalStateTime() {
+func (nDB *NetworkDB) updateLocalNetworkTime() {
 	nDB.Lock()
 	nDB.Lock()
 	defer nDB.Unlock()
 	defer nDB.Unlock()
 
 
@@ -532,8 +532,13 @@ func (nDB *NetworkDB) updateLocalStateTime() {
 	for _, n := range nDB.networks[nDB.config.NodeName] {
 	for _, n := range nDB.networks[nDB.config.NodeName] {
 		n.ltime = ltime
 		n.ltime = ltime
 	}
 	}
+}
+
+func (nDB *NetworkDB) updateLocalTableTime() {
+	nDB.Lock()
+	defer nDB.Unlock()
 
 
-	ltime = nDB.tableClock.Increment()
+	ltime := nDB.tableClock.Increment()
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 		entry := v.(*entry)
 		entry := v.(*entry)
 		if entry.node != nDB.config.NodeName {
 		if entry.node != nDB.config.NodeName {