Browse Source

Merge pull request #45020 from corhere/backport-23.0/libnetwork-test-race

[23.0 backport] libnetwork/networkdb: make go test -race ./libnetwork/networkdb pass
Bjorn Neergaard 2 năm trước cách đây
mục cha
commit
a4e9b25461

+ 4 - 4
libnetwork/networkdb/cluster.go

@@ -473,13 +473,13 @@ func (nDB *NetworkDB) gossip() {
 
 		msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
 		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
-		network.qMessagesSent += len(msgs)
+		network.qMessagesSent.Add(int64(len(msgs)))
 		if printStats {
+			msent := network.qMessagesSent.Swap(0)
 			logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
 				nDB.config.Hostname, nDB.config.NodeID,
-				nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
-				network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
-			network.qMessagesSent = 0
+				nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
+				msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
 		}
 
 		if len(msgs) == 0 {

+ 9 - 7
libnetwork/networkdb/networkdb.go

@@ -8,6 +8,7 @@ import (
 	"os"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/armon/go-radix"
@@ -145,12 +146,12 @@ type network struct {
 	tableBroadcasts *memberlist.TransmitLimitedQueue
 
 	// Number of gossip messages sent related to this network during the last stats collection period
-	qMessagesSent int
+	qMessagesSent atomic.Int64
 
 	// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
 	// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
 	// interval
-	entriesNumber int
+	entriesNumber atomic.Int64
 }
 
 // Config represents the configuration of the networkdb instance and
@@ -612,11 +613,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
 		nDB.networks[nDB.config.NodeID] = nodeNetworks
 	}
 	n, ok := nodeNetworks[nid]
-	var entries int
+	var entries int64
 	if ok {
-		entries = n.entriesNumber
+		entries = n.entriesNumber.Load()
 	}
-	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
+	nodeNetworks[nid] = &network{id: nid, ltime: ltime}
+	nodeNetworks[nid].entriesNumber.Store(entries)
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
 		NumNodes: func() int {
 			//TODO fcrisciani this can be optimized maybe avoiding the lock?
@@ -758,7 +760,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
 		// Add only if it is an insert not an update
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		if ok {
-			n.entriesNumber++
+			n.entriesNumber.Add(1)
 		}
 	}
 	return okTable, okNetwork
@@ -773,7 +775,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
 		// Remove only if the delete is successful
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		if ok {
-			n.entriesNumber--
+			n.entriesNumber.Add(-1)
 		}
 	}
 	return okTable, okNetwork

+ 25 - 18
libnetwork/networkdb/networkdb_test.go

@@ -107,18 +107,21 @@ func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string
 	for i := int64(0); i < maxRetries; i++ {
 		db.RLock()
 		nn, nnok := db.networks[node]
-		db.RUnlock()
 		if nnok {
 			n, ok := nn[id]
+			leaving := n.leaving
+			db.RUnlock()
 			if present && ok {
 				return
 			}
 
 			if !present &&
-				((ok && n.leaving) ||
+				((ok && leaving) ||
 					!ok) {
 				return
 			}
+		} else {
+			db.RUnlock()
 		}
 
 		time.Sleep(sleepInterval)
@@ -131,18 +134,11 @@ func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value s
 	t.Helper()
 	n := 80
 	for i := 0; i < n; i++ {
-		entry, err := db.getEntry(tname, nid, key)
-		if present && err == nil && string(entry.value) == value {
+		v, err := db.GetEntry(tname, nid, key)
+		if present && err == nil && string(v) == value {
 			return
 		}
-
-		if !present &&
-			((err == nil && entry.deleting) ||
-				(err != nil)) {
-			return
-		}
-
-		if i == n-1 && !present && err != nil {
+		if err != nil && !present {
 			return
 		}
 
@@ -586,7 +582,9 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
 		assert.NilError(t, err)
 	}
 	for i := 0; i < 2; i++ {
-		assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
+		dbs[i].Lock()
+		assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
+		dbs[i].Unlock()
 	}
 
 	// from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
@@ -595,18 +593,24 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
 	err = dbs[2].JoinNetwork("network1")
 	assert.NilError(t, err)
 	for i := 0; i < 3; i++ {
-		assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
+		dbs[i].Lock()
+		assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
+		dbs[i].Unlock()
 	}
 	// at this point the entries should had been all deleted
 	time.Sleep(30 * time.Second)
 	for i := 0; i < 3; i++ {
-		assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
+		dbs[i].Lock()
+		assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
+		dbs[i].Unlock()
 	}
 
 	// make sure that entries are not coming back
 	time.Sleep(15 * time.Second)
 	for i := 0; i < 3; i++ {
-		assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
+		dbs[i].Lock()
+		assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
+		dbs[i].Unlock()
 	}
 
 	closeNetworkDBInstances(t, dbs)
@@ -742,6 +746,7 @@ func TestNodeReincarnation(t *testing.T) {
 	assert.Check(t, is.Len(dbs[0].failedNodes, 1))
 	assert.Check(t, is.Len(dbs[0].leftNodes, 1))
 
+	dbs[0].Lock()
 	b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
 	assert.Check(t, b)
 	dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
@@ -762,6 +767,7 @@ func TestNodeReincarnation(t *testing.T) {
 	assert.Check(t, is.Len(dbs[0].failedNodes, 0))
 	assert.Check(t, is.Len(dbs[0].leftNodes, 3))
 
+	dbs[0].Unlock()
 	closeNetworkDBInstances(t, dbs)
 }
 
@@ -897,8 +903,9 @@ func TestNetworkDBIslands(t *testing.T) {
 	// Spawn again the first 3 nodes with different names but same IP:port
 	for i := 0; i < 3; i++ {
 		logrus.Infof("node %d coming back", i)
-		dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
-		dbs[i] = launchNode(t, *dbs[i].config)
+		conf := *dbs[i].config
+		conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
+		dbs[i] = launchNode(t, conf)
 	}
 
 	// Give some time for the reconnect routine to run, it runs every 6s.

+ 1 - 1
libnetwork/networkdb/networkdbdiagnostic.go

@@ -438,7 +438,7 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
 		entries := -1
 		qLen := -1
 		if ok {
-			entries = network.entriesNumber
+			entries = int(network.entriesNumber.Load())
 			qLen = network.tableBroadcasts.NumQueued()
 		}
 		nDB.RUnlock()