Browse Source

Merge pull request #2013 from fcrisciani/netdb-node-race

NetworkDB node management race
Flavio Crisciani 7 years ago
parent
commit
b4973cd6b0

+ 1 - 1
libnetwork/diagnose/diagnose.go

@@ -81,7 +81,7 @@ func (n *Server) EnableDebug(ip string, port int) {
 	// go func() {
 	// 	http.Serve(n.sk, n.mux)
 	// }()
-	http.ListenAndServe(":8000", n.mux)
+	http.ListenAndServe(fmt.Sprintf(":%d", port), n.mux)
 }
 
 // DisableDebug stop the dubug and closes the tcp socket

+ 11 - 7
libnetwork/networkdb/cluster.go

@@ -255,13 +255,18 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto
 func (nDB *NetworkDB) reapDeadNode() {
 	nDB.Lock()
 	defer nDB.Unlock()
-	for id, n := range nDB.failedNodes {
-		if n.reapTime > 0 {
-			n.reapTime -= nodeReapPeriod
-			continue
+	for _, nodeMap := range []map[string]*node{
+		nDB.failedNodes,
+		nDB.leftNodes,
+	} {
+		for id, n := range nodeMap {
+			if n.reapTime > nodeReapPeriod {
+				n.reapTime -= nodeReapPeriod
+				continue
+			}
+			logrus.Debugf("Garbage collect node %v", n.Name)
+			delete(nodeMap, id)
 		}
-		logrus.Debugf("Removing failed node %v from gossip cluster", n.Name)
-		delete(nDB.failedNodes, id)
 	}
 }
 
@@ -374,7 +379,6 @@ func (nDB *NetworkDB) gossip() {
 	thisNodeNetworks := nDB.networks[nDB.config.NodeID]
 	for nid := range thisNodeNetworks {
 		networkNodes[nid] = nDB.networkNodes[nid]
-
 	}
 	printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
 	printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod

+ 47 - 41
libnetwork/networkdb/delegate.go

@@ -16,9 +16,12 @@ func (d *delegate) NodeMeta(limit int) []byte {
 	return []byte{}
 }
 
-func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
-	nDB.Lock()
-	defer nDB.Unlock()
+// getNode searches the node inside the tables
+// returns true if the node was respectively in the active list, explicit node leave list or failed list
+func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) {
+	var active bool
+	var left bool
+	var failed bool
 
 	for _, nodes := range []map[string]*node{
 		nDB.failedNodes,
@@ -26,35 +29,19 @@ func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
 		nDB.nodes,
 	} {
 		if n, ok := nodes[nEvent.NodeName]; ok {
+			active = &nodes == &nDB.nodes
+			left = &nodes == &nDB.leftNodes
+			failed = &nodes == &nDB.failedNodes
 			if n.ltime >= nEvent.LTime {
-				return nil
+				return active, left, failed, nil
 			}
-			return n
-		}
-	}
-	return nil
-}
-
-func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
-	nDB.Lock()
-	defer nDB.Unlock()
-
-	for _, nodes := range []map[string]*node{
-		nDB.failedNodes,
-		nDB.leftNodes,
-		nDB.nodes,
-	} {
-		if n, ok := nodes[nEvent.NodeName]; ok {
-			if n.ltime >= nEvent.LTime {
-				return nil
+			if extract {
+				delete(nodes, n.Name)
 			}
-
-			delete(nodes, n.Name)
-			return n
+			return active, left, failed, n
 		}
 	}
-
-	return nil
+	return active, left, failed, nil
 }
 
 func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
@@ -62,11 +49,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 	// time.
 	nDB.networkClock.Witness(nEvent.LTime)
 
-	n := nDB.getNode(nEvent)
+	nDB.RLock()
+	active, left, _, n := nDB.getNode(nEvent, false)
 	if n == nil {
+		nDB.RUnlock()
 		return false
 	}
-	// If its a node leave event for a manager and this is the only manager we
+	nDB.RUnlock()
+	// If it is a node leave event for a manager and this is the only manager we
 	// know of we want the reconnect logic to kick in. In a single manager
 	// cluster manager's gossip can't be bootstrapped unless some other node
 	// connects to it.
@@ -79,28 +69,38 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 		}
 	}
 
-	n = nDB.checkAndGetNode(nEvent)
-	if n == nil {
-		return false
-	}
-
 	n.ltime = nEvent.LTime
 
 	switch nEvent.Type {
 	case NodeEventTypeJoin:
+		if active {
+			// the node is already marked as active nothing to do
+			return false
+		}
 		nDB.Lock()
-		_, found := nDB.nodes[n.Name]
-		nDB.nodes[n.Name] = n
-		nDB.Unlock()
-		if !found {
+		// Because the lock got released on the previous check we have to do it again and re verify the status of the node
+		// All of this is to avoid a big lock on the function
+		if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil {
+			n.reapTime = 0
+			nDB.nodes[n.Name] = n
 			logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
 		}
+		nDB.Unlock()
 		return true
 	case NodeEventTypeLeave:
+		if left {
+			// the node is already marked as left nothing to do.
+			return false
+		}
 		nDB.Lock()
-		nDB.leftNodes[n.Name] = n
+		// Because the lock got released on the previous check we have to do it again and re verify the status of the node
+		// All of this is to avoid a big lock on the function
+		if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil {
+			n.reapTime = nodeReapInterval
+			nDB.leftNodes[n.Name] = n
+			logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
+		}
 		nDB.Unlock()
-		logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
 		return true
 	}
 
@@ -162,6 +162,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
 		return false
 	}
 
+	// If the node is not known from memberlist we cannot process save any state of it else if it actually
+	// dies we won't receive any notification and we will remain stuck with it
+	if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
+		return false
+	}
+
 	// This remote network join is being seen the first time.
 	nodeNetworks[nEvent.NetworkID] = &network{
 		id:    nEvent.NetworkID,
@@ -466,7 +472,7 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
 	var gMsg GossipMessage
 	err := proto.Unmarshal(buf, &gMsg)
 	if err != nil {
-		logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
+		logrus.Errorf("Error unmarshalling push pull message: %v", err)
 		return
 	}
 

+ 35 - 3
libnetwork/networkdb/event_delegate.go

@@ -21,10 +21,29 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
 	}
 }
 
+func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) {
+	for name, node := range e.nDB.failedNodes {
+		if node.Addr.Equal(mn.Addr) {
+			logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
+			delete(e.nDB.failedNodes, name)
+			return
+		}
+	}
+
+	for name, node := range e.nDB.leftNodes {
+		if node.Addr.Equal(mn.Addr) {
+			logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
+			delete(e.nDB.leftNodes, name)
+			return
+		}
+	}
+}
+
 func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
 	logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
 	e.broadcastNodeEvent(mn.Addr, opCreate)
 	e.nDB.Lock()
+	defer e.nDB.Unlock()
 	// In case the node is rejoining after a failure or leave,
 	// wait until an explicit join message arrives before adding
 	// it to the nodes just to make sure this is not a stale
@@ -32,12 +51,15 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
 	_, fOk := e.nDB.failedNodes[mn.Name]
 	_, lOk := e.nDB.leftNodes[mn.Name]
 	if fOk || lOk {
-		e.nDB.Unlock()
 		return
 	}
 
+	// Every node has a unique ID
+	// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
+	// failed or shutdown one
+	e.purgeReincarnation(mn)
+
 	e.nDB.nodes[mn.Name] = &node{Node: *mn}
-	e.nDB.Unlock()
 	logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
 }
 
@@ -49,18 +71,28 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
 	// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
 	// If the node instead left because was going down, then it makes sense to just delete all its state
 	e.nDB.Lock()
+	defer e.nDB.Unlock()
 	e.nDB.deleteNetworkEntriesForNode(mn.Name)
 	e.nDB.deleteNodeTableEntries(mn.Name)
 	if n, ok := e.nDB.nodes[mn.Name]; ok {
 		delete(e.nDB.nodes, mn.Name)
 
+		// Check if a new incarnation of the same node already joined
+		// In that case this node can simply be removed and no further action are needed
+		for name, node := range e.nDB.nodes {
+			if node.Addr.Equal(mn.Addr) {
+				logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr)
+				return
+			}
+		}
+
 		// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
 		// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
 		n.reapTime = nodeReapInterval
 		e.nDB.failedNodes[mn.Name] = n
 		failed = true
 	}
-	e.nDB.Unlock()
+
 	if failed {
 		logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
 	}

+ 7 - 0
libnetwork/networkdb/networkdb.go

@@ -310,6 +310,10 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
 				Name: node.Name,
 				IP:   node.Addr.String(),
 			})
+		} else {
+			// Added for testing purposes, this condition should never happen else mean that the network list
+			// is out of sync with the node list
+			peers = append(peers, PeerInfo{})
 		}
 	}
 	return peers
@@ -593,6 +597,9 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
 	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
 		NumNodes: func() int {
+			//TODO fcrisciani this can be optimized maybe avoiding the lock?
+			// this call is done each GetBroadcasts call to evaluate the number of
+			// replicas for the message
 			nDB.RLock()
 			defer nDB.RUnlock()
 			return len(nDB.networkNodes[nid])

+ 12 - 0
libnetwork/networkdb/networkdbdiagnose.go

@@ -5,7 +5,9 @@ import (
 	"net/http"
 	"strings"
 
+	stackdump "github.com/docker/docker/pkg/signal"
 	"github.com/docker/libnetwork/diagnose"
+	"github.com/sirupsen/logrus"
 )
 
 const (
@@ -24,6 +26,7 @@ var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
 	"/deleteentry":  dbDeleteEntry,
 	"/getentry":     dbGetEntry,
 	"/gettable":     dbGetTable,
+	"/dump":         dbStackTrace,
 }
 
 func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
@@ -240,3 +243,12 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
 		}
 	}
 }
+
+func dbStackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	path, err := stackdump.DumpStacks("/tmp/")
+	if err != nil {
+		logrus.WithError(err).Error("failed to write goroutines dump")
+	} else {
+		fmt.Fprintf(w, "goroutine stacks written to %s", path)
+	}
+}

+ 2 - 0
libnetwork/test/networkDb/Dockerfile

@@ -1,5 +1,7 @@
 FROM alpine
 
+RUN apk --no-cache add curl
+
 COPY testMain /app/
 
 WORKDIR app