diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 6cea89b917..2d98784905 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -473,13 +473,13 @@ func (nDB *NetworkDB) gossip() { msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail) // Collect stats and print the queue info, note this code is here also to have a view of the queues empty - network.qMessagesSent += len(msgs) + network.qMessagesSent.Add(int64(len(msgs))) if printStats { + msent := network.qMessagesSent.Swap(0) logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d", nDB.config.Hostname, nDB.config.NodeID, - nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(), - network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) - network.qMessagesSent = 0 + nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(), + msent/int64((nDB.config.StatsPrintPeriod/time.Second))) } if len(msgs) == 0 { diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index e43711e676..9127eed8db 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -8,6 +8,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/docker/docker/libnetwork/types" @@ -145,12 +146,12 @@ type network struct { tableBroadcasts *memberlist.TransmitLimitedQueue // Number of gossip messages sent related to this network during the last stats collection period - qMessagesSent int + qMessagesSent atomic.Int64 // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network. // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod // interval - entriesNumber int + entriesNumber atomic.Int64 } // Config represents the configuration of the networkdb instance and @@ -613,11 +614,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nDB.networks[nDB.config.NodeID] = nodeNetworks } n, ok := nodeNetworks[nid] - var entries int + var entries int64 if ok { - entries = n.entriesNumber + entries = n.entriesNumber.Load() } - nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries} + nodeNetworks[nid] = &network{id: nid, ltime: ltime} + nodeNetworks[nid].entriesNumber.Store(entries) nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { //TODO fcrisciani this can be optimized maybe avoiding the lock? @@ -759,7 +761,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac // Add only if it is an insert not an update n, ok := nDB.networks[nDB.config.NodeID][nid] if ok { - n.entriesNumber++ + n.entriesNumber.Add(1) } } return okTable, okNetwork @@ -774,7 +776,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo // Remove only if the delete is successful n, ok := nDB.networks[nDB.config.NodeID][nid] if ok { - n.entriesNumber-- + n.entriesNumber.Add(-1) } } return okTable, okNetwork diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 16bb293785..a27398b407 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -106,18 +106,21 @@ func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string for i := int64(0); i < maxRetries; i++ { db.RLock() nn, nnok := db.networks[node] - db.RUnlock() if nnok { n, ok := nn[id] + leaving := n.leaving + db.RUnlock() if present && ok { return } if !present && - ((ok && n.leaving) || + ((ok && leaving) || !ok) { return } + } else { + db.RUnlock() } time.Sleep(sleepInterval) @@ -130,18 +133,11 @@ func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value s t.Helper() n := 80 for i := 0; i < n; i++ { - entry, err := db.getEntry(tname, nid, key) - if present && err == nil && string(entry.value) == value { + v, err := db.GetEntry(tname, nid, key) + if present && err == nil && string(v) == value { return } - - if !present && - ((err == nil && entry.deleting) || - (err != nil)) { - return - } - - if i == n-1 && !present && err != nil { + if err != nil && !present { return } @@ -577,7 +573,9 @@ func TestNetworkDBGarbageCollection(t *testing.T) { assert.NilError(t, err) } for i := 0; i < 2; i++ { - assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match") + dbs[i].Lock() + assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") + dbs[i].Unlock() } // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node @@ -586,18 +584,24 @@ func TestNetworkDBGarbageCollection(t *testing.T) { err = dbs[2].JoinNetwork("network1") assert.NilError(t, err) for i := 0; i < 3; i++ { - assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match") + dbs[i].Lock() + assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match") + dbs[i].Unlock() } // at this point the entries should had been all deleted time.Sleep(30 * time.Second) for i := 0; i < 3; i++ { - assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected") + dbs[i].Lock() + assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") + dbs[i].Unlock() } // make sure that entries are not coming back time.Sleep(15 * time.Second) for i := 0; i < 3; i++ { - assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected") + dbs[i].Lock() + assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected") + dbs[i].Unlock() } closeNetworkDBInstances(t, dbs) @@ -733,6 +737,7 @@ func TestNodeReincarnation(t *testing.T) { assert.Check(t, is.Len(dbs[0].failedNodes, 1)) assert.Check(t, is.Len(dbs[0].leftNodes, 1)) + dbs[0].Lock() b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}) assert.Check(t, b) dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}} @@ -753,6 +758,7 @@ func TestNodeReincarnation(t *testing.T) { assert.Check(t, is.Len(dbs[0].failedNodes, 0)) assert.Check(t, is.Len(dbs[0].leftNodes, 3)) + dbs[0].Unlock() closeNetworkDBInstances(t, dbs) } @@ -888,8 +894,9 @@ func TestNetworkDBIslands(t *testing.T) { // Spawn again the first 3 nodes with different names but same IP:port for i := 0; i < 3; i++ { logrus.Infof("node %d coming back", i) - dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID()) - dbs[i] = launchNode(t, *dbs[i].config) + conf := *dbs[i].config + conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID()) + dbs[i] = launchNode(t, conf) } // Give some time for the reconnect routine to run, it runs every 6s. diff --git a/libnetwork/networkdb/networkdbdiagnostic.go b/libnetwork/networkdb/networkdbdiagnostic.go index c1cbe130ac..62b5a291f0 100644 --- a/libnetwork/networkdb/networkdbdiagnostic.go +++ b/libnetwork/networkdb/networkdbdiagnostic.go @@ -438,7 +438,7 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) { entries := -1 qLen := -1 if ok { - entries = network.entriesNumber + entries = int(network.entriesNumber.Load()) qLen = network.tableBroadcasts.NumQueued() } nDB.RUnlock()