Kaynağa Gözat

Changed ReapTable logic

- Changed the loop per network. Previous implementation was taking a
  ReadLock to update the reapTime but now with the residualReapTime
  also the bulkSync is using the same ReadLock creating possible
  issues in concurrent read and update of the value.
  The new logic fetches the list of networks and proceed to the
  cleanup network by network locking the database and releasing it
  after each network. This should ensure a fair locking avoiding
  to keep the database blocked for too much time.

  Note: The ticker does not guarantee that the reap logic runs
  precisely every reapTimePeriod, actually documentation says that
  if the routine is too long will skip ticks. In case of slowdown
  of the process itself it is possible that the lifetime of the
  deleted entries increases, it still should not be a huge problem
  because now the residual reaptime is propagated among all the nodes
  a slower node will let the deleted entry being repropagate multiple
  times but the state will still remain consistent.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
Flavio Crisciani 7 yıl önce
ebeveyn
işleme
053a534ab1

+ 42 - 34
libnetwork/networkdb/cluster.go

@@ -321,43 +321,51 @@ func (nDB *NetworkDB) reapNetworks() {
 }
 
 func (nDB *NetworkDB) reapTableEntries() {
-	var paths []string
-
+	var nodeNetworks []string
+	// This is best effort, if the list of network changes will be picked up in the next cycle
 	nDB.RLock()
-	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
-		entry, ok := v.(*entry)
-		if !ok {
-			return false
-		}
-
-		if !entry.deleting {
-			return false
-		}
-		if entry.reapTime > 0 {
-			entry.reapTime -= reapPeriod
-			return false
-		}
-		paths = append(paths, path)
-		return false
-	})
+	for nid := range nDB.networks[nDB.config.NodeName] {
+		nodeNetworks = append(nodeNetworks, nid)
+	}
 	nDB.RUnlock()
 
-	nDB.Lock()
-	for _, path := range paths {
-		params := strings.Split(path[1:], "/")
-		tname := params[0]
-		nid := params[1]
-		key := params[2]
-
-		okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
-		if !okTable {
-			logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
-		}
-		if !okNetwork {
-			logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
-		}
+	cycleStart := time.Now()
+	// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
+	// The lock is taken at the beginning of the cycle and the deletion is inline
+	for _, nid := range nodeNetworks {
+		nDB.Lock()
+		nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
+			// timeCompensation compensate in case the lock took some time to be released
+			timeCompensation := time.Since(cycleStart)
+			entry, ok := v.(*entry)
+			if !ok || !entry.deleting {
+				return false
+			}
+
+			// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
+			// for the tableEvent the number is always strictly > 1 and never 0
+			if entry.reapTime > reapPeriod+timeCompensation+time.Second {
+				entry.reapTime -= reapPeriod + timeCompensation
+				return false
+			}
+
+			params := strings.Split(path[1:], "/")
+			nid := params[0]
+			tname := params[1]
+			key := params[2]
+
+			okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
+			if !okTable {
+				logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
+			}
+			if !okNetwork {
+				logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
+			}
+
+			return false
+		})
+		nDB.Unlock()
 	}
-	nDB.Unlock()
 }
 
 func (nDB *NetworkDB) gossip() {
@@ -406,7 +414,7 @@ func (nDB *NetworkDB) gossip() {
 		// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
 		network.qMessagesSent += len(msgs)
 		if printStats {
-			logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue  qLen:%d netPeers:%d netMsg/s:%d",
+			logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
 				nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
 				network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
 			network.qMessagesSent = 0

+ 4 - 4
libnetwork/networkdb/delegate.go

@@ -238,8 +238,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 	}
 
 	// All the entries marked for deletion should have a reapTime set greater than 0
-	// This case can happens if the cluster is running different versions of the engine where the old version does not have the
-	// field. In both cases we should raise a warning message
+	// This case can happen if the cluster is running different versions of the engine where the old version does not have the
+	// field. If that is not the case, this can be a BUG
 	if e.deleting && e.reapTime == 0 {
 		logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
 		e.reapTime = reapInterval
@@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 
 	if err != nil && tEvent.Type == TableEventTypeDelete {
 		// If it is a delete event and we did not have a state for it, don't propagate to the application
-		// If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around
+		// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
 		// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
-		return e.reapTime >= reapPeriod/6
+		return e.reapTime > reapPeriod/6
 	}
 
 	var op opType

+ 9 - 5
libnetwork/networkdb/networkdb.go

@@ -475,7 +475,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
 
 			entry := &entry{
 				ltime:    oldEntry.ltime,
-				node:     node,
+				node:     oldEntry.node,
 				value:    oldEntry.value,
 				deleting: true,
 				reapTime: reapInterval,
@@ -692,8 +692,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
 	_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
 	if !okNetwork {
 		// Add only if it is an insert not an update
-		n := nDB.networks[nDB.config.NodeName][nid]
-		n.entriesNumber++
+		n, ok := nDB.networks[nDB.config.NodeName][nid]
+		if ok {
+			n.entriesNumber++
+		}
 	}
 	return okTable, okNetwork
 }
@@ -705,8 +707,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
 	_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
 	if okNetwork {
 		// Remove only if the delete is successful
-		n := nDB.networks[nDB.config.NodeName][nid]
-		n.entriesNumber--
+		n, ok := nDB.networks[nDB.config.NodeName][nid]
+		if ok {
+			n.entriesNumber--
+		}
 	}
 	return okTable, okNetwork
 }

+ 6 - 0
libnetwork/networkdb/networkdb_test.go

@@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
 	if len(dbs[0].networkNodes["network1"]) != 2 {
 		t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
 	}
+	if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving {
+		t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
+	}
 
 	// Wait for the propagation on db[1]
 	for i := 0; i < maxRetry; i++ {
@@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
 	if len(dbs[1].networkNodes["network1"]) != 2 {
 		t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
 	}
+	if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving {
+		t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
+	}
 
 	// Try a quick leave/join
 	err = dbs[0].LeaveNetwork("network1")

+ 1 - 1
libnetwork/test/networkDb/README

@@ -1,7 +1,7 @@
 SERVER
 
 cd test/networkdb
-env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test .
+env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test .
 (only for testkit case) docker push fcrisciani/networkdb-test
 
 Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000

+ 5 - 5
libnetwork/test/networkDb/dbclient/ndbClient.go

@@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
 	tableName := args[1]
 	parallelWriters, _ := strconv.Atoi(args[2])
 	writeTimeSec, _ := strconv.Atoi(args[3])
-	parallerlLeaver, _ := strconv.Atoi(args[4])
+	parallelLeaver, _ := strconv.Atoi(args[4])
 
 	// Start parallel writers that will create and delete unique keys
 	doneCh := make(chan resultTuple, parallelWriters)
@@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
 
 	keysExpected := keyMap[totalWrittenKeys]
 	// The Leavers will leave the network
-	for i := 0; i < parallerlLeaver; i++ {
+	for i := 0; i < parallelLeaver; i++ {
 		logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
 		go leaveNetwork(ips[i], servicePort, networkName, doneCh)
 		// Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
 		keysExpected -= keyMap[ips[i]]
 	}
-	waitWriters(parallerlLeaver, false, doneCh)
+	waitWriters(parallelLeaver, false, doneCh)
 
 	// Give some time
 	time.Sleep(100 * time.Millisecond)
 
 	// The writers will join the network
-	for i := 0; i < parallerlLeaver; i++ {
+	for i := 0; i < parallelLeaver; i++ {
 		logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
 		go joinNetwork(ips[i], servicePort, networkName, doneCh)
 	}
-	waitWriters(parallerlLeaver, false, doneCh)
+	waitWriters(parallelLeaver, false, doneCh)
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)