libnet/networkdb: use atomics for stats counters

The per-network statistics counters are loaded and incremented without
any concurrency control. Use atomic integers to prevent data races
without having to add any synchronization.

Signed-off-by: Cory Snider <csnider@mirantis.com>
(cherry picked from commit d31fa84c7c)
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2023-02-10 15:21:58 -05:00
parent 20d05e235e
commit 962c238c17
4 changed files with 18 additions and 16 deletions

View file

@ -473,13 +473,13 @@ func (nDB *NetworkDB) gossip() {
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
network.qMessagesSent.Add(int64(len(msgs)))
if printStats {
msent := network.qMessagesSent.Swap(0)
logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nDB.config.Hostname, nDB.config.NodeID,
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
}
if len(msgs) == 0 {

View file

@ -8,6 +8,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-radix"
@ -145,12 +146,12 @@ type network struct {
tableBroadcasts *memberlist.TransmitLimitedQueue
// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
qMessagesSent atomic.Int64
// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
// interval
entriesNumber int
entriesNumber atomic.Int64
}
// Config represents the configuration of the networkdb instance and
@ -612,11 +613,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nDB.networks[nDB.config.NodeID] = nodeNetworks
}
n, ok := nodeNetworks[nid]
var entries int
var entries int64
if ok {
entries = n.entriesNumber
entries = n.entriesNumber.Load()
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].entriesNumber.Store(entries)
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
//TODO fcrisciani this can be optimized maybe avoiding the lock?
@ -758,7 +760,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
// Add only if it is an insert not an update
n, ok := nDB.networks[nDB.config.NodeID][nid]
if ok {
n.entriesNumber++
n.entriesNumber.Add(1)
}
}
return okTable, okNetwork
@ -773,7 +775,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
// Remove only if the delete is successful
n, ok := nDB.networks[nDB.config.NodeID][nid]
if ok {
n.entriesNumber--
n.entriesNumber.Add(-1)
}
}
return okTable, okNetwork

View file

@ -583,7 +583,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
}
for i := 0; i < 2; i++ {
dbs[i].Lock()
assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
dbs[i].Unlock()
}
@ -594,14 +594,14 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
assert.NilError(t, err)
for i := 0; i < 3; i++ {
dbs[i].Lock()
assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
dbs[i].Unlock()
}
// at this point the entries should had been all deleted
time.Sleep(30 * time.Second)
for i := 0; i < 3; i++ {
dbs[i].Lock()
assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
dbs[i].Unlock()
}
@ -609,7 +609,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
time.Sleep(15 * time.Second)
for i := 0; i < 3; i++ {
dbs[i].Lock()
assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
dbs[i].Unlock()
}

View file

@ -438,7 +438,7 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
entries := -1
qLen := -1
if ok {
entries = network.entriesNumber
entries = int(network.entriesNumber.Load())
qLen = network.tableBroadcasts.NumQueued()
}
nDB.RUnlock()