Bläddra i källkod

libnet/networkdb: use atomics for stats counters

The per-network statistics counters are loaded and incremented without
any concurrency control. Use atomic integers to prevent data races
without having to add any synchronization.

Signed-off-by: Cory Snider <csnider@mirantis.com>
Cory Snider 2 år sedan
förälder
incheckning
d31fa84c7c

+ 4 - 4
libnetwork/networkdb/cluster.go

@@ -473,13 +473,13 @@ func (nDB *NetworkDB) gossip() {
 
 
 		msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
 		msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
 		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
 		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
-		network.qMessagesSent += len(msgs)
+		network.qMessagesSent.Add(int64(len(msgs)))
 		if printStats {
 		if printStats {
+			msent := network.qMessagesSent.Swap(0)
 			logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
 			logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
 				nDB.config.Hostname, nDB.config.NodeID,
 				nDB.config.Hostname, nDB.config.NodeID,
-				nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
-				network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
-			network.qMessagesSent = 0
+				nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
+				msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
 		}
 		}
 
 
 		if len(msgs) == 0 {
 		if len(msgs) == 0 {

+ 9 - 7
libnetwork/networkdb/networkdb.go

@@ -8,6 +8,7 @@ import (
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/docker/docker/libnetwork/types"
 	"github.com/docker/docker/libnetwork/types"
@@ -145,12 +146,12 @@ type network struct {
 	tableBroadcasts *memberlist.TransmitLimitedQueue
 	tableBroadcasts *memberlist.TransmitLimitedQueue
 
 
 	// Number of gossip messages sent related to this network during the last stats collection period
 	// Number of gossip messages sent related to this network during the last stats collection period
-	qMessagesSent int
+	qMessagesSent atomic.Int64
 
 
 	// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
 	// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
 	// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
 	// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
 	// interval
 	// interval
-	entriesNumber int
+	entriesNumber atomic.Int64
 }
 }
 
 
 // Config represents the configuration of the networkdb instance and
 // Config represents the configuration of the networkdb instance and
@@ -613,11 +614,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
 		nDB.networks[nDB.config.NodeID] = nodeNetworks
 		nDB.networks[nDB.config.NodeID] = nodeNetworks
 	}
 	}
 	n, ok := nodeNetworks[nid]
 	n, ok := nodeNetworks[nid]
-	var entries int
+	var entries int64
 	if ok {
 	if ok {
-		entries = n.entriesNumber
+		entries = n.entriesNumber.Load()
 	}
 	}
-	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
+	nodeNetworks[nid] = &network{id: nid, ltime: ltime}
+	nodeNetworks[nid].entriesNumber.Store(entries)
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
 	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
 		NumNodes: func() int {
 		NumNodes: func() int {
 			//TODO fcrisciani this can be optimized maybe avoiding the lock?
 			//TODO fcrisciani this can be optimized maybe avoiding the lock?
@@ -759,7 +761,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
 		// Add only if it is an insert not an update
 		// Add only if it is an insert not an update
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		if ok {
 		if ok {
-			n.entriesNumber++
+			n.entriesNumber.Add(1)
 		}
 		}
 	}
 	}
 	return okTable, okNetwork
 	return okTable, okNetwork
@@ -774,7 +776,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwo
 		// Remove only if the delete is successful
 		// Remove only if the delete is successful
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		n, ok := nDB.networks[nDB.config.NodeID][nid]
 		if ok {
 		if ok {
-			n.entriesNumber--
+			n.entriesNumber.Add(-1)
 		}
 		}
 	}
 	}
 	return okTable, okNetwork
 	return okTable, okNetwork

+ 4 - 4
libnetwork/networkdb/networkdb_test.go

@@ -574,7 +574,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
 	}
 	}
 	for i := 0; i < 2; i++ {
 	for i := 0; i < 2; i++ {
 		dbs[i].Lock()
 		dbs[i].Lock()
-		assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
+		assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
 		dbs[i].Unlock()
 		dbs[i].Unlock()
 	}
 	}
 
 
@@ -585,14 +585,14 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
 	assert.NilError(t, err)
 	assert.NilError(t, err)
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
 		dbs[i].Lock()
 		dbs[i].Lock()
-		assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
+		assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
 		dbs[i].Unlock()
 		dbs[i].Unlock()
 	}
 	}
 	// at this point the entries should had been all deleted
 	// at this point the entries should had been all deleted
 	time.Sleep(30 * time.Second)
 	time.Sleep(30 * time.Second)
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
 		dbs[i].Lock()
 		dbs[i].Lock()
-		assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
+		assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
 		dbs[i].Unlock()
 		dbs[i].Unlock()
 	}
 	}
 
 
@@ -600,7 +600,7 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
 	time.Sleep(15 * time.Second)
 	time.Sleep(15 * time.Second)
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
 		dbs[i].Lock()
 		dbs[i].Lock()
-		assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
+		assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
 		dbs[i].Unlock()
 		dbs[i].Unlock()
 	}
 	}
 
 

+ 1 - 1
libnetwork/networkdb/networkdbdiagnostic.go

@@ -438,7 +438,7 @@ func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
 		entries := -1
 		entries := -1
 		qLen := -1
 		qLen := -1
 		if ok {
 		if ok {
-			entries = network.entriesNumber
+			entries = int(network.entriesNumber.Load())
 			qLen = network.tableBroadcasts.NumQueued()
 			qLen = network.tableBroadcasts.NumQueued()
 		}
 		}
 		nDB.RUnlock()
 		nDB.RUnlock()