Pārlūkot izejas kodu

Purge stale nodes with same prefix and IP

Since the node name randomization fix, we need to make sure that we
purge the old node with the same prefix and same IP from the nodes
database if it still present. This causes unnecessary reconnect
attempts.

Also added a change to avoid unnecessary update of local lamport time
and only do it of we are ready to do a push pull on a join. Join should
happen only when the node is bootstrapped or when trying to reconnect
with a failed node.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 8 gadi atpakaļ
vecāks
revīzija
7b905d3c63

+ 7 - 7
libnetwork/networkdb/cluster.go

@@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() {
 	}
 	}
 	nDB.RUnlock()
 	nDB.RUnlock()
 
 
-	// Update all the local state to a new time to force update on
-	// the node we are trying to rejoin, just in case that node
-	// has these in leaving/deleting state still. This is
-	// facilitate fast convergence after recovering from a gossip
-	// failure.
-	nDB.updateLocalStateTime()
-
 	node := nodes[randomOffset(len(nodes))]
 	node := nodes[randomOffset(len(nodes))]
 	addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
 	addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
 
 
@@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() {
 		return
 		return
 	}
 	}
 
 
+	// Update all the local table state to a new time to
+	// force update on the node we are trying to rejoin, just in
+	// case that node has these in deleting state still. This is
+	// facilitate fast convergence after recovering from a gossip
+	// failure.
+	nDB.updateLocalTableTime()
+
 	logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
 	logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
 	nDB.bulkSync([]string{node.Name}, true)
 	nDB.bulkSync([]string{node.Name}, true)
 }
 }

+ 35 - 9
libnetwork/networkdb/delegate.go

@@ -3,6 +3,7 @@ package networkdb
 import (
 import (
 	"fmt"
 	"fmt"
 	"net"
 	"net"
+	"strings"
 	"time"
 	"time"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
@@ -31,7 +32,7 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
 				return nil
 				return nil
 			}
 			}
 
 
-			delete(nDB.failedNodes, n.Name)
+			delete(nodes, n.Name)
 			return n
 			return n
 		}
 		}
 	}
 	}
@@ -39,16 +40,36 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
 	return nil
 	return nil
 }
 }
 
 
-func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
-	// Update our local clock if the received messages has newer
-	// time.
-	nDB.networkClock.Witness(nEvent.LTime)
+func (nDB *NetworkDB) purgeSameNode(n *node) {
+	nDB.Lock()
+	defer nDB.Unlock()
 
 
+	prefix := strings.Split(n.Name, "-")[0]
+	for _, nodes := range []map[string]*node{
+		nDB.failedNodes,
+		nDB.leftNodes,
+		nDB.nodes,
+	} {
+		var nodeNames []string
+		for name, node := range nodes {
+			if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
+				nodeNames = append(nodeNames, name)
+			}
+		}
+
+		for _, name := range nodeNames {
+			delete(nodes, name)
+		}
+	}
+}
+
+func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 	n := nDB.checkAndGetNode(nEvent)
 	n := nDB.checkAndGetNode(nEvent)
 	if n == nil {
 	if n == nil {
 		return false
 		return false
 	}
 	}
 
 
+	nDB.purgeSameNode(n)
 	n.ltime = nEvent.LTime
 	n.ltime = nEvent.LTime
 
 
 	switch nEvent.Type {
 	switch nEvent.Type {
@@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
 }
 }
 
 
 func (d *delegate) LocalState(join bool) []byte {
 func (d *delegate) LocalState(join bool) []byte {
+	if join {
+		// Update all the local node/network state to a new time to
+		// force update on the node we are trying to rejoin, just in
+		// case that node has these in leaving state still. This is
+		// facilitate fast convergence after recovering from a gossip
+		// failure.
+		d.nDB.updateLocalNetworkTime()
+	}
+
 	d.nDB.RLock()
 	d.nDB.RLock()
 	defer d.nDB.RUnlock()
 	defer d.nDB.RUnlock()
 
 
@@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
 		return
 		return
 	}
 	}
 
 
-	if pp.LTime > 0 {
-		d.nDB.networkClock.Witness(pp.LTime)
-	}
-
 	nodeEvent := &NodeEvent{
 	nodeEvent := &NodeEvent{
 		LTime:    pp.LTime,
 		LTime:    pp.LTime,
 		NodeName: pp.NodeName,
 		NodeName: pp.NodeName,

+ 7 - 2
libnetwork/networkdb/networkdb.go

@@ -524,7 +524,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
 	return networks
 	return networks
 }
 }
 
 
-func (nDB *NetworkDB) updateLocalStateTime() {
+func (nDB *NetworkDB) updateLocalNetworkTime() {
 	nDB.Lock()
 	nDB.Lock()
 	defer nDB.Unlock()
 	defer nDB.Unlock()
 
 
@@ -532,8 +532,13 @@ func (nDB *NetworkDB) updateLocalStateTime() {
 	for _, n := range nDB.networks[nDB.config.NodeName] {
 	for _, n := range nDB.networks[nDB.config.NodeName] {
 		n.ltime = ltime
 		n.ltime = ltime
 	}
 	}
+}
+
+func (nDB *NetworkDB) updateLocalTableTime() {
+	nDB.Lock()
+	defer nDB.Unlock()
 
 
-	ltime = nDB.tableClock.Increment()
+	ltime := nDB.tableClock.Increment()
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 		entry := v.(*entry)
 		entry := v.(*entry)
 		if entry.node != nDB.config.NodeName {
 		if entry.node != nDB.config.NodeName {