Browse Source

Enhance testing infra

Allow to write and delete X number of entries
Allow to query the queue length

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
Flavio Crisciani 7 years ago
parent
commit
b09cb39fa5

+ 183 - 23
libnetwork/cmd/networkdb-test/dbclient/ndbClient.go

@@ -2,6 +2,7 @@ package dbclient
 
 import (
 	"context"
+	"fmt"
 	"io/ioutil"
 	"log"
 	"net"
@@ -25,17 +26,10 @@ type resultTuple struct {
 }
 
 func httpGetFatalError(ip, port, path string) {
-	// for {
 	body, err := httpGet(ip, port, path)
 	if err != nil || !strings.Contains(string(body), "OK") {
-		// if strings.Contains(err.Error(), "EOF") {
-		// 	logrus.Warnf("Got EOF path:%s err:%s", path, err)
-		// 	continue
-		// }
 		log.Fatalf("[%s] error %s %s", path, err, body)
 	}
-	// break
-	// }
 }
 
 func httpGet(ip, port, path string) ([]byte, error) {
@@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
 	body, err := httpGet(ip, port, "/clusterpeers")
 
 	if err != nil {
-		logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
+		logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
 		doneCh <- resultTuple{id: ip, result: -1}
 		return
 	}
@@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
 	body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
 
 	if err != nil {
-		logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
+		logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
 		doneCh <- resultTuple{id: ip, result: -1}
 		return
 	}
@@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
 	body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
 
 	if err != nil {
-		logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
+		logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
 		doneCh <- resultTuple{id: ip, result: -1}
 		return
 	}
@@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
 	doneCh <- resultTuple{id: ip, result: entriesNum}
 }
 
+func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
+
+	if err != nil {
+		logrus.Errorf("entriesNumber %s there was an error: %s", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`)
+	entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+	doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
+func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
+
+	if err != nil {
+		logrus.Errorf("queueLength %s there was an error: %s", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
+	entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+	doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
 func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
 	httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
 	if doneCh != nil {
@@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
 	body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
 
 	if err != nil {
-		logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
+		logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
 		doneCh <- resultTuple{id: ip, result: -1}
 		return
 	}
@@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
 	doneCh <- resultTuple{id: ip, result: entriesNum}
 }
 
+func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
+	x := 0
+	for ; x < number; x++ {
+		k := key + strconv.Itoa(x)
+		// write key
+		writeTableKey(ip, port, networkName, tableName, k)
+	}
+	doneCh <- resultTuple{id: ip, result: x}
+}
+
+func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
+	x := 0
+	for ; x < number; x++ {
+		k := key + strconv.Itoa(x)
+		// write key
+		deleteTableKey(ip, port, networkName, tableName, k)
+	}
+	doneCh <- resultTuple{id: ip, result: x}
+}
+
 func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
 	for x := 0; ; x++ {
 		select {
@@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) {
 	doneCh <- resultTuple{id: ip, result: 0}
 }
 
-func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
+func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
 	startTime := time.Now().UnixNano()
 	var successTime int64
 
-	// Loop for 2 minutes to guartee that the result is stable
+	// Loop for 2 minutes to guarantee that the result is stable
 	for {
 		select {
 		case <-ctx.Done():
 			// Validate test success, if the time is set means that all the tables are empty
 			if successTime != 0 {
-				logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
+				opTime = time.Duration(successTime-startTime) / time.Millisecond
+				logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
 				return
 			}
 			log.Fatal("Test failed, there is still entries in the tables of the nodes")
@@ -403,6 +444,107 @@ func doNetworkPeers(ips []string, args []string) {
 	close(doneCh)
 }
 
+// network-stats-queue networkName <gt/lt> queueSize
+func doNetworkStatsQueue(ips []string, args []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	networkName := args[0]
+	comparison := args[1]
+	size, _ := strconv.Atoi(args[2])
+
+	// check all the nodes
+	for _, ip := range ips {
+		go dbQueueLength(ip, servicePort, networkName, doneCh)
+	}
+
+	var avgQueueSize int
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		node := <-doneCh
+		switch comparison {
+		case "lt":
+			if node.result > size {
+				log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
+			}
+		case "gt":
+			if node.result < size {
+				log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
+			}
+		default:
+			log.Fatal("unknown comparison operator")
+		}
+		avgQueueSize += node.result
+	}
+	close(doneCh)
+	avgQueueSize /= len(ips)
+	fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
+}
+
+// write-keys networkName tableName parallelWriters numberOfKeysEach
+func doWriteKeys(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	numberOfKeys, _ := strconv.Atoi(args[3])
+
+	doneCh := make(chan resultTuple, parallelWriters)
+	// Enable watch of tables from clients
+	for i := 0; i < parallelWriters; i++ {
+		go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// Start parallel writers that will create and delete unique keys
+	defer close(doneCh)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// check table entries for 2 minutes
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
+	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
+}
+
+// delete-keys networkName tableName parallelWriters numberOfKeysEach
+func doDeleteKeys(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	numberOfKeys, _ := strconv.Atoi(args[3])
+
+	doneCh := make(chan resultTuple, parallelWriters)
+	// Enable watch of tables from clients
+	for i := 0; i < parallelWriters; i++ {
+		go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// Start parallel writers that will create and delete unique keys
+	defer close(doneCh)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// check table entries for 2 minutes
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	cancel()
+	fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
+}
+
 // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
 func doWriteDeleteUniqueKeys(ips []string, args []string) {
 	networkName := args[0]
@@ -432,11 +574,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
 	cancel()
 	ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
-	checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
+	opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
 }
 
 // write-unique-keys networkName tableName numParallelWriters writeTimeSec
@@ -469,8 +612,9 @@ func doWriteUniqueKeys(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
 }
 
 // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
@@ -497,8 +641,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
 }
 
 // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
@@ -542,8 +687,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
 }
 
 // write-wait-leave networkName tableName numParallelWriters writeTimeSec
@@ -577,8 +723,9 @@ func doWriteWaitLeave(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
 }
 
 // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
@@ -626,8 +773,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
 
 	// check table entries for 2 minutes
 	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
-	checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
+	opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
 	cancel()
+	fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
 }
 
 var cmdArgChec = map[string]int{
@@ -687,9 +835,21 @@ func Client(args []string) {
 		// leave-network networkName
 		doLeaveNetwork(ips, commandArgs)
 	case "network-peers":
-		// network-peers networkName maxRetry
+		// network-peers networkName expectedNumberPeers maxRetry
 		doNetworkPeers(ips, commandArgs)
-
+		//	case "network-stats-entries":
+		//		// network-stats-entries networkName maxRetry
+		//		doNetworkPeers(ips, commandArgs)
+	case "network-stats-queue":
+		// network-stats-queue networkName <lt/gt> queueSize
+		doNetworkStatsQueue(ips, commandArgs)
+
+	case "write-keys":
+		// write-keys networkName tableName parallelWriters numberOfKeysEach
+		doWriteKeys(ips, commandArgs)
+	case "delete-keys":
+		// delete-keys networkName tableName parallelWriters numberOfKeysEach
+		doDeleteKeys(ips, commandArgs)
 	case "write-unique-keys":
 		// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
 		doWriteUniqueKeys(ips, commandArgs)

+ 4 - 0
libnetwork/cmd/networkdb-test/testMain.go

@@ -10,6 +10,10 @@ import (
 )
 
 func main() {
+	formatter := &logrus.TextFormatter{
+		FullTimestamp: true,
+	}
+	logrus.SetFormatter(formatter)
 	logrus.Infof("Starting the image with these args: %v", os.Args)
 	if len(os.Args) < 1 {
 		log.Fatal("You need at least 1 argument [client/server]")

+ 10 - 0
libnetwork/diagnostic/types.go

@@ -120,3 +120,13 @@ type TablePeersResult struct {
 	TableObj
 	Elements []PeerEntryObj `json:"entries"`
 }
+
+// NetworkStatsResult network db stats related to entries and queue len for a network
+type NetworkStatsResult struct {
+	Entries  int `json:"entries"`
+	QueueLen int `jsoin:"qlen"`
+}
+
+func (n *NetworkStatsResult) String() string {
+	return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen)
+}

+ 39 - 0
libnetwork/networkdb/networkdbdiagnostic.go

@@ -28,6 +28,7 @@ var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
 	"/deleteentry":  dbDeleteEntry,
 	"/getentry":     dbGetEntry,
 	"/gettable":     dbGetTable,
+	"/networkstats": dbNetworkStats,
 }
 
 func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
@@ -411,3 +412,41 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
 	}
 	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
 }
+
+func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnostic.DebugHTTPForm(r)
+	_, json := diagnostic.ParseHTTPFormOptions(r)
+
+	// audit logs
+	log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
+	log.Info("network stats")
+
+	if len(r.Form["nid"]) < 1 {
+		rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
+		log.Error("network stats failed, wrong input")
+		diagnostic.HTTPReply(w, rsp, json)
+		return
+	}
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		nDB.RLock()
+		networks := nDB.networks[nDB.config.NodeID]
+		network, ok := networks[r.Form["nid"][0]]
+
+		entries := -1
+		qLen := -1
+		if ok {
+			entries = network.entriesNumber
+			qLen = network.tableBroadcasts.NumQueued()
+		}
+		nDB.RUnlock()
+
+		rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen})
+		log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done")
+		diagnostic.HTTPReply(w, rsp, json)
+		return
+	}
+	diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
+}