Kaynağa Gözat

Merge pull request #1944 from fcrisciani/netdb-fix-reap

Fix reapTime logic in NetworkDB
Madhu Venugopal 7 yıl önce
ebeveyn
işleme
c69b749801

+ 1 - 0
libnetwork/.gitignore

@@ -38,3 +38,4 @@ cmd/dnet/dnet
 
 libnetworkbuild.created
 test/networkDb/testMain
+test/networkDb/gossipdb

+ 2 - 0
libnetwork/networkdb/broadcast.go

@@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
 		TableName: tname,
 		Key:       key,
 		Value:     entry.value,
+		// The duration in second is a float that below would be truncated
+		ResidualReapTime: int32(entry.reapTime.Seconds()),
 	}
 
 	raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)

+ 56 - 40
libnetwork/networkdb/cluster.go

@@ -17,11 +17,15 @@ import (
 )
 
 const (
-	reapInterval     = 30 * time.Minute
-	reapPeriod       = 5 * time.Second
-	retryInterval    = 1 * time.Second
-	nodeReapInterval = 24 * time.Hour
-	nodeReapPeriod   = 2 * time.Hour
+	// The garbage collection logic for entries leverage the presence of the network.
+	// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
+	// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
+	reapEntryInterval   = 30 * time.Minute
+	reapNetworkInterval = reapEntryInterval + 5*reapPeriod
+	reapPeriod          = 5 * time.Second
+	retryInterval       = 1 * time.Second
+	nodeReapInterval    = 24 * time.Hour
+	nodeReapPeriod      = 2 * time.Hour
 )
 
 type logWriter struct{}
@@ -300,8 +304,9 @@ func (nDB *NetworkDB) reconnectNode() {
 // the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
 // is safe as long as no other concurrent path touches the reapTime field.
 func (nDB *NetworkDB) reapState() {
-	nDB.reapNetworks()
+	// The reapTableEntries leverage the presence of the network so garbage collect entries first
 	nDB.reapTableEntries()
+	nDB.reapNetworks()
 }
 
 func (nDB *NetworkDB) reapNetworks() {
@@ -321,43 +326,51 @@ func (nDB *NetworkDB) reapNetworks() {
 }
 
 func (nDB *NetworkDB) reapTableEntries() {
-	var paths []string
-
+	var nodeNetworks []string
+	// This is best effort, if the list of network changes will be picked up in the next cycle
 	nDB.RLock()
-	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
-		entry, ok := v.(*entry)
-		if !ok {
-			return false
-		}
-
-		if !entry.deleting {
-			return false
-		}
-		if entry.reapTime > 0 {
-			entry.reapTime -= reapPeriod
-			return false
-		}
-		paths = append(paths, path)
-		return false
-	})
+	for nid := range nDB.networks[nDB.config.NodeName] {
+		nodeNetworks = append(nodeNetworks, nid)
+	}
 	nDB.RUnlock()
 
-	nDB.Lock()
-	for _, path := range paths {
-		params := strings.Split(path[1:], "/")
-		tname := params[0]
-		nid := params[1]
-		key := params[2]
-
-		if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
-			logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
-		}
+	cycleStart := time.Now()
+	// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
+	// The lock is taken at the beginning of the cycle and the deletion is inline
+	for _, nid := range nodeNetworks {
+		nDB.Lock()
+		nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
+			// timeCompensation compensate in case the lock took some time to be released
+			timeCompensation := time.Since(cycleStart)
+			entry, ok := v.(*entry)
+			if !ok || !entry.deleting {
+				return false
+			}
 
-		if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
-			logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
-		}
+			// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
+			// for the tableEvent the number is always strictly > 1 and never 0
+			if entry.reapTime > reapPeriod+timeCompensation+time.Second {
+				entry.reapTime -= reapPeriod + timeCompensation
+				return false
+			}
+
+			params := strings.Split(path[1:], "/")
+			nid := params[0]
+			tname := params[1]
+			key := params[2]
+
+			okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
+			if !okTable {
+				logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
+			}
+			if !okNetwork {
+				logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
+			}
+
+			return false
+		})
+		nDB.Unlock()
 	}
-	nDB.Unlock()
 }
 
 func (nDB *NetworkDB) gossip() {
@@ -406,8 +419,9 @@ func (nDB *NetworkDB) gossip() {
 		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
 		network.qMessagesSent += len(msgs)
 		if printStats {
-			logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
-				nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
+			logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
+				nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
+				network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
 			network.qMessagesSent = 0
 		}
 
@@ -572,6 +586,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
 				TableName: params[1],
 				Key:       params[2],
 				Value:     entry.value,
+				// The duration in second is a float that below would be truncated
+				ResidualReapTime: int32(entry.reapTime.Seconds()),
 			}
 
 			msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)

+ 19 - 19
libnetwork/networkdb/delegate.go

@@ -1,9 +1,9 @@
 package networkdb
 
 import (
-	"fmt"
 	"net"
 	"strings"
+	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/sirupsen/logrus"
@@ -165,7 +165,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
 		n.ltime = nEvent.LTime
 		n.leaving = nEvent.Type == NetworkEventTypeLeave
 		if n.leaving {
-			n.reapTime = reapInterval
+			n.reapTime = reapNetworkInterval
 
 			// The remote node is leaving the network, but not the gossip cluster.
 			// Mark all its entries in deleted state, this will guarantee that
@@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
 }
 
 func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
-	// Update our local clock if the received messages has newer
-	// time.
+	// Update our local clock if the received messages has newer time.
 	nDB.tableClock.Witness(tEvent.LTime)
 
 	// Ignore the table events for networks that are in the process of going away
@@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 		node:     tEvent.NodeName,
 		value:    tEvent.Value,
 		deleting: tEvent.Type == TableEventTypeDelete,
+		reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
 	}
 
-	if e.deleting {
-		e.reapTime = reapInterval
+	// All the entries marked for deletion should have a reapTime set greater than 0
+	// This case can happen if the cluster is running different versions of the engine where the old version does not have the
+	// field. If that is not the case, this can be a BUG
+	if e.deleting && e.reapTime == 0 {
+		logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
+		e.reapTime = reapEntryInterval
 	}
 
 	nDB.Lock()
-	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
-	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
+	nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
 	nDB.Unlock()
 
 	if err != nil && tEvent.Type == TableEventTypeDelete {
-		// If it is a delete event and we didn't have the entry here don't repropagate
-		return true
+		// If it is a delete event and we did not have a state for it, don't propagate to the application
+		// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
+		// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
+		return e.reapTime > reapPeriod/6
 	}
 
 	var op opType
@@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
 		n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
 		nDB.RUnlock()
 
-		if !ok {
-			return
-		}
-
-		broadcastQ := n.tableBroadcasts
-
-		if broadcastQ == nil {
+		// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
+		if !ok || n.leaving || n.tableBroadcasts == nil {
 			return
 		}
 
-		broadcastQ.QueueBroadcast(&tableEventMessage{
+		n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
 			msg:   buf,
 			id:    tEvent.NetworkID,
 			tname: tEvent.TableName,
 			key:   tEvent.Key,
-			node:  nDB.config.NodeName,
+			node:  tEvent.NodeName,
 		})
 	}
 }

+ 53 - 17
libnetwork/networkdb/networkdb.go

@@ -141,6 +141,11 @@ type network struct {
 
 	// Number of gossip messages sent related to this network during the last stats collection period
 	qMessagesSent int
+
+	// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
+	// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
+	// interval
+	entriesNumber int
 }
 
 // Config represents the configuration of the networdb instance and
@@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
 	}
 
 	nDB.Lock()
-	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
 	nDB.Unlock()
 
 	return nil
@@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
 	}
 
 	nDB.Lock()
-	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
 	nDB.Unlock()
 
 	return nil
@@ -402,7 +405,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
 		node:     nDB.config.NodeName,
 		value:    value,
 		deleting: true,
-		reapTime: reapInterval,
+		reapTime: reapEntryInterval,
 	}
 
 	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
@@ -410,8 +413,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
 	}
 
 	nDB.Lock()
-	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
 	nDB.Unlock()
 
 	return nil
@@ -473,10 +475,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
 
 			entry := &entry{
 				ltime:    oldEntry.ltime,
-				node:     node,
+				node:     oldEntry.node,
 				value:    oldEntry.value,
 				deleting: true,
-				reapTime: reapInterval,
+				reapTime: reapEntryInterval,
 			}
 
 			// we arrived at this point in 2 cases:
@@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
 					// without doing a delete of all the objects
 					entry.ltime++
 				}
-				nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
-				nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+				nDB.createOrUpdateEntry(nid, tname, key, entry)
 			} else {
 				// the local node is leaving the network, all the entries of remote nodes can be safely removed
-				nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
-				nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
+				nDB.deleteEntry(nid, tname, key)
 			}
 
 			nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
@@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
 		nid := params[1]
 		key := params[2]
 
-		nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
-		nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
+		nDB.deleteEntry(nid, tname, key)
 
 		nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
 		return false
@@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
 		nodeNetworks = make(map[string]*network)
 		nDB.networks[nDB.config.NodeName] = nodeNetworks
 	}
-	nodeNetworks[nid] = &network{id: nid, ltime: ltime}
+	n, ok := nodeNetworks[nid]
+	var entries int
+	if ok {
+		entries = n.entriesNumber
+	}
+	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
 		NumNodes: func() int {
 			nDB.RLock()
@@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
 		},
 		RetransmitMult: 4,
 	}
+
 	nDB.addNetworkNode(nid, nDB.config.NodeName)
 	networkNodes := nDB.networkNodes[nid]
 	nDB.Unlock()
@@ -614,8 +619,9 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
 		return fmt.Errorf("could not find network %s while trying to leave", nid)
 	}
 
+	logrus.Debugf("%s: leaving network %s", nDB.config.NodeName, nid)
 	n.ltime = ltime
-	n.reapTime = reapInterval
+	n.reapTime = reapNetworkInterval
 	n.leaving = true
 	return nil
 }
@@ -679,3 +685,33 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
 		n.ltime = ltime
 	}
 }
+
+// createOrUpdateEntry this function handles the creation or update of entries into the local
+// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
+func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
+	_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
+	_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+	if !okNetwork {
+		// Add only if it is an insert not an update
+		n, ok := nDB.networks[nDB.config.NodeName][nid]
+		if ok {
+			n.entriesNumber++
+		}
+	}
+	return okTable, okNetwork
+}
+
+// deleteEntry this function handles the deletion of entries into the local tree store.
+// It is also used to keep in sync the entries number of the network (all tables are aggregated)
+func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
+	_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
+	_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
+	if okNetwork {
+		// Remove only if the delete is successful
+		n, ok := nDB.networks[nDB.config.NodeName][nid]
+		if ok {
+			n.entriesNumber--
+		}
+	}
+	return okTable, okNetwork
+}

Dosya farkı çok büyük olduğundan ihmal edildi
+ 342 - 188
libnetwork/networkdb/networkdb.pb.go


+ 6 - 2
libnetwork/networkdb/networkdb.proto

@@ -109,7 +109,7 @@ message NetworkEntry {
 	// network event was recorded.
 	uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
 	// Source node name where this network attachment happened.
-	string node_name = 3;
+	string node_name = 3 [(gogoproto.customname) = "NodeName"];
 	// Indicates if a leave from this network is in progress.
 	bool leaving = 4;
 }
@@ -119,6 +119,8 @@ message NetworkPushPull {
 	// Lamport time when this push pull was initiated.
 	uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
 	repeated NetworkEntry networks = 2;
+	// Name of the node sending this push pull payload.
+	string node_name = 3 [(gogoproto.customname) = "NodeName"];
 }
 
 // TableEvent message payload definition.
@@ -152,6 +154,8 @@ message TableEvent {
 	string key = 6;
 	// Entry value.
 	bytes value = 7;
+	// Residual reap time for the entry before getting deleted in seconds
+	int32 residual_reap_time = 8 [(gogoproto.customname) = "ResidualReapTime"];;
 }
 
 // BulkSync message payload definition.
@@ -180,4 +184,4 @@ message CompoundMessage {
 
 	// A list of simple messages.
 	repeated SimpleMessage messages = 1;
-}
+}

+ 6 - 0
libnetwork/networkdb/networkdb_test.go

@@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
 	if len(dbs[0].networkNodes["network1"]) != 2 {
 		t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
 	}
+	if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving {
+		t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
+	}
 
 	// Wait for the propagation on db[1]
 	for i := 0; i < maxRetry; i++ {
@@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
 	if len(dbs[1].networkNodes["network1"]) != 2 {
 		t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
 	}
+	if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving {
+		t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
+	}
 
 	// Try a quick leave/join
 	err = dbs[0].LeaveNetwork("network1")

+ 1 - 1
libnetwork/test/networkDb/README

@@ -1,7 +1,7 @@
 SERVER
 
 cd test/networkdb
-env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile .
+env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test .
 (only for testkit case) docker push fcrisciani/networkdb-test
 
 Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000

+ 5 - 5
libnetwork/test/networkDb/dbclient/ndbClient.go

@@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
 	tableName := args[1]
 	parallelWriters, _ := strconv.Atoi(args[2])
 	writeTimeSec, _ := strconv.Atoi(args[3])
-	parallerlLeaver, _ := strconv.Atoi(args[4])
+	parallelLeaver, _ := strconv.Atoi(args[4])
 
 	// Start parallel writers that will create and delete unique keys
 	doneCh := make(chan resultTuple, parallelWriters)
@@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
 
 	keysExpected := keyMap[totalWrittenKeys]
 	// The Leavers will leave the network
-	for i := 0; i < parallerlLeaver; i++ {
+	for i := 0; i < parallelLeaver; i++ {
 		logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
 		go leaveNetwork(ips[i], servicePort, networkName, doneCh)
 		// Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
 		keysExpected -= keyMap[ips[i]]
 	}
-	waitWriters(parallerlLeaver, false, doneCh)
+	waitWriters(parallelLeaver, false, doneCh)
 
 	// Give some time
 	time.Sleep(100 * time.Millisecond)
 
 	// The writers will join the network
-	for i := 0; i < parallerlLeaver; i++ {
+	for i := 0; i < parallelLeaver; i++ {
 		logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
 		go joinNetwork(ips[i], servicePort, networkName, doneCh)
 	}
-	waitWriters(parallerlLeaver, false, doneCh)
+	waitWriters(parallelLeaver, false, doneCh)
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor