Merge pull request #44500 from tiborvass/libnetwork_test_race
libnetwork/networkdb: make go test -race ./libnetwork/networkdb pass
This commit is contained in:
commit
243a55d603
4 changed files with 39 additions and 30 deletions
|
@ -473,13 +473,13 @@ func (nDB *NetworkDB) gossip() {
|
|||
|
||||
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
|
||||
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
|
||||
network.qMessagesSent += len(msgs)
|
||||
network.qMessagesSent.Add(int64(len(msgs)))
|
||||
if printStats {
|
||||
msent := network.qMessagesSent.Swap(0)
|
||||
logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
|
||||
nDB.config.Hostname, nDB.config.NodeID,
|
||||
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
|
||||
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
|
||||
network.qMessagesSent = 0
|
||||
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
|
||||
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
|
||||
}
|
||||
|
||||
if len(msgs) == 0 {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/libnetwork/types"
|
||||
|
@ -145,12 +146,12 @@ type network struct {
|
|||
tableBroadcasts *memberlist.TransmitLimitedQueue
|
||||
|
||||
// Number of gossip messages sent related to this network during the last stats collection period
|
||||
qMessagesSent int
|
||||
qMessagesSent atomic.Int64
|
||||
|
||||
// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
|
||||
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
|
||||
// interval
|
||||
entriesNumber int
|
||||
entriesNumber atomic.Int64
|
||||
}
|
||||
|
||||
// Config represents the configuration of the networkdb instance and
|
||||
|
@ -613,11 +614,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
|||
nDB.networks[nDB.config.NodeID] = nodeNetworks
|
||||
}
|
||||
n, ok := nodeNetworks[nid]
|
||||
var entries int
|
||||
var entries int64
|
||||
if ok {
|
||||
entries = n.entriesNumber
|
||||
entries = n.entriesNumber.Load()
|
||||
}
|
||||
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
|
||||
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
|
||||
nodeNetworks[nid].entriesNumber.Store(entries)
|
||||
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: func() int {
|
||||
//TODO fcrisciani this can be optimized maybe avoiding the lock?
|
||||
|
@ -759,7 +761,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
|
|||
// Add only if it is an insert not an update
|
||||
n, ok := nDB.networks[nDB.config.NodeID][nid]
|
||||
if ok {
|
||||
n.entriesNumber++
|
||||
n.entriesNumber.Add(1)
|
||||
}
|
||||
}
|
||||
return okTable, okNetwork
|
||||
|
@ -774,7 +776,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo
|
|||
// Remove only if the delete is successful
|
||||
n, ok := nDB.networks[nDB.config.NodeID][nid]
|
||||
if ok {
|
||||
n.entriesNumber--
|
||||
n.entriesNumber.Add(-1)
|
||||
}
|
||||
}
|
||||
return okTable, okNetwork
|
||||
|
|
|
@ -106,18 +106,21 @@ func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string
|
|||
for i := int64(0); i < maxRetries; i++ {
|
||||
db.RLock()
|
||||
nn, nnok := db.networks[node]
|
||||
db.RUnlock()
|
||||
if nnok {
|
||||
n, ok := nn[id]
|
||||
leaving := n.leaving
|
||||
db.RUnlock()
|
||||
if present && ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !present &&
|
||||
((ok && n.leaving) ||
|
||||
((ok && leaving) ||
|
||||
!ok) {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
db.RUnlock()
|
||||
}
|
||||
|
||||
time.Sleep(sleepInterval)
|
||||
|
@ -130,18 +133,11 @@ func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value s
|
|||
t.Helper()
|
||||
n := 80
|
||||
for i := 0; i < n; i++ {
|
||||
entry, err := db.getEntry(tname, nid, key)
|
||||
if present && err == nil && string(entry.value) == value {
|
||||
v, err := db.GetEntry(tname, nid, key)
|
||||
if present && err == nil && string(v) == value {
|
||||
return
|
||||
}
|
||||
|
||||
if !present &&
|
||||
((err == nil && entry.deleting) ||
|
||||
(err != nil)) {
|
||||
return
|
||||
}
|
||||
|
||||
if i == n-1 && !present && err != nil {
|
||||
if err != nil && !present {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -577,7 +573,9 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
|||
assert.NilError(t, err)
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
// from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
|
||||
|
@ -586,18 +584,24 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
|||
err = dbs[2].JoinNetwork("network1")
|
||||
assert.NilError(t, err)
|
||||
for i := 0; i < 3; i++ {
|
||||
assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
// at this point the entries should had been all deleted
|
||||
time.Sleep(30 * time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
// make sure that entries are not coming back
|
||||
time.Sleep(15 * time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
|
||||
dbs[i].Lock()
|
||||
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
|
||||
dbs[i].Unlock()
|
||||
}
|
||||
|
||||
closeNetworkDBInstances(t, dbs)
|
||||
|
@ -733,6 +737,7 @@ func TestNodeReincarnation(t *testing.T) {
|
|||
assert.Check(t, is.Len(dbs[0].failedNodes, 1))
|
||||
assert.Check(t, is.Len(dbs[0].leftNodes, 1))
|
||||
|
||||
dbs[0].Lock()
|
||||
b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
|
||||
assert.Check(t, b)
|
||||
dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
|
||||
|
@ -753,6 +758,7 @@ func TestNodeReincarnation(t *testing.T) {
|
|||
assert.Check(t, is.Len(dbs[0].failedNodes, 0))
|
||||
assert.Check(t, is.Len(dbs[0].leftNodes, 3))
|
||||
|
||||
dbs[0].Unlock()
|
||||
closeNetworkDBInstances(t, dbs)
|
||||
}
|
||||
|
||||
|
@ -888,8 +894,9 @@ func TestNetworkDBIslands(t *testing.T) {
|
|||
// Spawn again the first 3 nodes with different names but same IP:port
|
||||
for i := 0; i < 3; i++ {
|
||||
logrus.Infof("node %d coming back", i)
|
||||
dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
|
||||
dbs[i] = launchNode(t, *dbs[i].config)
|
||||
conf := *dbs[i].config
|
||||
conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
|
||||
dbs[i] = launchNode(t, conf)
|
||||
}
|
||||
|
||||
// Give some time for the reconnect routine to run, it runs every 6s.
|
||||
|
|
|
@ -438,7 +438,7 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
|||
entries := -1
|
||||
qLen := -1
|
||||
if ok {
|
||||
entries = network.entriesNumber
|
||||
entries = int(network.entriesNumber.Load())
|
||||
qLen = network.tableBroadcasts.NumQueued()
|
||||
}
|
||||
nDB.RUnlock()
|
||||
|
|
Loading…
Add table
Reference in a new issue