Browse Source

Merge pull request #1860 from fcrisciani/network-db-stabilization

Network db stabilization
Madhu Venugopal 8 years ago
parent
commit
b486c76cce

+ 2 - 1
libnetwork/networkdb/broadcast.go

@@ -114,7 +114,8 @@ type tableEventMessage struct {
 }
 
 func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
-	return false
+	otherm := other.(*tableEventMessage)
+	return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
 }
 
 func (m *tableEventMessage) Message() []byte {

+ 0 - 7
libnetwork/networkdb/cluster.go

@@ -290,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() {
 		return
 	}
 
-	// Update all the local table state to a new time to
-	// force update on the node we are trying to rejoin, just in
-	// case that node has these in deleting state still. This is
-	// facilitate fast convergence after recovering from a gossip
-	// failure.
-	nDB.updateLocalTableTime()
-
 	logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
 	nDB.bulkSync([]string{node.Name}, true)
 }

+ 25 - 24
libnetwork/networkdb/delegate.go

@@ -133,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 }
 
 func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
-	var flushEntries bool
 	// Update our local clock if the received messages has newer
 	// time.
 	nDB.networkClock.Witness(nEvent.LTime)
 
 	nDB.Lock()
-	defer func() {
-		nDB.Unlock()
-		// When a node leaves a network on the last task removal cleanup the
-		// local entries for this network & node combination. When the tasks
-		// on a network are removed we could have missed the gossip updates.
-		// Not doing this cleanup can leave stale entries because bulksyncs
-		// from the node will no longer include this network state.
-		//
-		// deleteNodeNetworkEntries takes nDB lock.
-		if flushEntries {
-			nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
-		}
-	}()
+	defer nDB.Unlock()
 
 	if nEvent.NodeName == nDB.config.NodeName {
 		return false
@@ -179,7 +166,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
 		n.leaving = nEvent.Type == NetworkEventTypeLeave
 		if n.leaving {
 			n.reapTime = reapInterval
-			flushEntries = true
+
+			// The remote node is leaving the network, but not the gossip cluster.
+			// Mark all its entries in deleted state, this will guarantee that
+			// if some node bulk sync with us, the deleted state of
+			// these entries will be propagated.
+			nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
 		}
 
 		if nEvent.Type == NetworkEventTypeLeave {
@@ -214,17 +206,22 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 	nDB.RLock()
 	networks := nDB.networks[nDB.config.NodeName]
 	network, ok := networks[tEvent.NetworkID]
-	nDB.RUnlock()
-	if !ok || network.leaving {
-		return true
+	// Check if the owner of the event is still part of the network
+	nodes := nDB.networkNodes[tEvent.NetworkID]
+	var nodePresent bool
+	for _, node := range nodes {
+		if node == tEvent.NodeName {
+			nodePresent = true
+			break
+		}
 	}
-
-	e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
-	if err != nil && tEvent.Type == TableEventTypeDelete {
-		// If it is a delete event and we don't have the entry here nothing to do.
+	nDB.RUnlock()
+	if !ok || network.leaving || !nodePresent {
+		// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
 		return false
 	}
 
+	e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
 	if err == nil {
 		// We have the latest state. Ignore the event
 		// since it is stale.
@@ -249,6 +246,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
 	nDB.Unlock()
 
+	if err != nil && tEvent.Type == TableEventTypeDelete {
+		// If it is a delete event and we didn't have the entry here don't repropagate
+		return true
+	}
+
 	var op opType
 	switch tEvent.Type {
 	case TableEventTypeCreate:
@@ -289,8 +291,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
 		return
 	}
 
-	// Do not rebroadcast a bulk sync
-	if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
+	if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
 		var err error
 		buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
 		if err != nil {

+ 5 - 3
libnetwork/networkdb/event_delegate.go

@@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
 	var failed bool
 	logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
 	e.broadcastNodeEvent(mn.Addr, opDelete)
-	e.nDB.deleteNodeTableEntries(mn.Name)
-	e.nDB.deleteNetworkEntriesForNode(mn.Name)
+	// The node left or failed, delete all the entries created by it.
+	// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
+	// If the node instead left because was going down, then it makes sense to just delete all its state
 	e.nDB.Lock()
+	e.nDB.deleteNetworkEntriesForNode(mn.Name)
+	e.nDB.deleteNodeTableEntries(mn.Name)
 	if n, ok := e.nDB.nodes[mn.Name]; ok {
 		delete(e.nDB.nodes, mn.Name)
 
@@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
 	if failed {
 		logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
 	}
-
 }
 
 func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {

+ 48 - 72
libnetwork/networkdb/networkdb.go

@@ -418,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
 }
 
 func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
-	nDB.Lock()
 	for nid, nodes := range nDB.networkNodes {
 		updatedNodes := make([]string, 0, len(nodes))
 		for _, node := range nodes {
@@ -433,11 +432,25 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
 	}
 
 	delete(nDB.networks, deletedNode)
-	nDB.Unlock()
 }
 
+// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
+// 1) when a notification is coming of a node leaving the network
+//		- Walk all the network entries and mark the leaving node's entries for deletion
+//			These will be garbage collected when the reap timer will expire
+// 2) when the local node is leaving the network
+//		- Walk all the network entries:
+//			A) if the entry is owned by the local node
+//		  then we will mark it for deletion. This will ensure that if a node did not
+//		  yet received the notification that the local node is leaving, will be aware
+//		  of the entries to be deleted.
+//			B) if the entry is owned by a remote node, then we can safely delete it. This
+//			ensures that if we join back this network as we receive the CREATE event for
+//		  entries owned by remote nodes, we will accept them and we notify the application
 func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
-	nDB.Lock()
+	// Indicates if the delete is triggered for the local node
+	isNodeLocal := node == nDB.config.NodeName
+
 	nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
 		func(path string, v interface{}) bool {
 			oldEntry := v.(*entry)
@@ -446,7 +459,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
 			tname := params[1]
 			key := params[2]
 
-			if oldEntry.node != node {
+			// If the entry is owned by a remote node and this node is not leaving the network
+			if oldEntry.node != node && !isNodeLocal {
+				// Don't do anything because the event is triggered for a node that does not own this entry
+				return false
+			}
+
+			// If this entry is already marked for deletion and this node is not leaving the network
+			if oldEntry.deleting && !isNodeLocal {
+				// Don't do anything this entry will be already garbage collected using the old reapTime
 				return false
 			}
 
@@ -458,17 +479,29 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
 				reapTime: reapInterval,
 			}
 
-			nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-			nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+			// we arrived at this point in 2 cases:
+			// 1) this entry is owned by the node that is leaving the network
+			// 2) the local node is leaving the network
+			if oldEntry.node == node {
+				if isNodeLocal {
+					// TODO fcrisciani: this can be removed if there is no way to leave the network
+					// without doing a delete of all the objects
+					entry.ltime++
+				}
+				nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
+				nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+			} else {
+				// the local node is leaving the network, all the entries of remote nodes can be safely removed
+				nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
+				nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
+			}
 
 			nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
 			return false
 		})
-	nDB.Unlock()
 }
 
 func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
-	nDB.Lock()
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 		oldEntry := v.(*entry)
 		if oldEntry.node != node {
@@ -480,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
 		nid := params[1]
 		key := params[2]
 
-		entry := &entry{
-			ltime:    oldEntry.ltime,
-			node:     node,
-			value:    oldEntry.value,
-			deleting: true,
-			reapTime: reapInterval,
-		}
-
-		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-		nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+		nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
+		nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
 
-		nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
+		nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
 		return false
 	})
-	nDB.Unlock()
 }
 
 // WalkTable walks a single table in NetworkDB and invokes the passed
@@ -573,37 +597,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
 
 	nDB.Lock()
 	defer nDB.Unlock()
-	var (
-		paths   []string
-		entries []*entry
-	)
 
+	// Remove myself from the list of the nodes participating to the network
 	nDB.deleteNetworkNode(nid, nDB.config.NodeName)
 
-	nwWalker := func(path string, v interface{}) bool {
-		entry, ok := v.(*entry)
-		if !ok {
-			return false
-		}
-		paths = append(paths, path)
-		entries = append(entries, entry)
-		return false
-	}
-
-	nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
-	for _, path := range paths {
-		params := strings.Split(path[1:], "/")
-		tname := params[1]
-		key := params[2]
-
-		if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
-			logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
-		}
-
-		if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
-			logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
-		}
-	}
+	// Update all the local entries marking them for deletion and delete all the remote entries
+	nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName)
 
 	nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
 	if !ok {
@@ -616,6 +615,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
 	}
 
 	n.ltime = ltime
+	n.reapTime = reapInterval
 	n.leaving = true
 	return nil
 }
@@ -679,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
 		n.ltime = ltime
 	}
 }
-
-func (nDB *NetworkDB) updateLocalTableTime() {
-	nDB.Lock()
-	defer nDB.Unlock()
-
-	ltime := nDB.tableClock.Increment()
-	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
-		entry := v.(*entry)
-		if entry.node != nDB.config.NodeName {
-			return false
-		}
-
-		params := strings.Split(path[1:], "/")
-		tname := params[0]
-		nid := params[1]
-		key := params[2]
-		entry.ltime = ltime
-
-		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-		nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
-
-		return false
-	})
-}