浏览代码

NetworkDB testing infra

- Diagnose framework that exposes REST API for db interaction
- Dockerfile to build the test image
- Periodic print of stats regarding queue size
- Client and server side for integration with testkit
- Added write-delete-leave-join
- Added test write-delete-wait-leave-join
- Added write-wait-leave-join

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
Flavio Crisciani 8 年之前
父节点
当前提交
585964bf32

+ 1 - 0
libnetwork/.gitignore

@@ -37,3 +37,4 @@ cmd/dnet/dnet
 .settings/
 
 libnetworkbuild.created
+test/networkDb/testMain

+ 133 - 0
libnetwork/diagnose/diagnose.go

@@ -0,0 +1,133 @@
+package diagnose
+
+import (
+	"fmt"
+	"net"
+	"net/http"
+	"sync"
+
+	"github.com/Sirupsen/logrus"
+)
+
+// HTTPHandlerFunc TODO
+type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
+
+type httpHandlerCustom struct {
+	ctx interface{}
+	F   func(interface{}, http.ResponseWriter, *http.Request)
+}
+
+// ServeHTTP TODO
+func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	h.F(h.ctx, w, r)
+}
+
+var diagPaths2Func = map[string]HTTPHandlerFunc{
+	"/":      notImplemented,
+	"/help":  help,
+	"/ready": ready,
+}
+
+// Server when the debug is enabled exposes a
+// This data structure is protected by the Agent mutex so does not require and additional mutex here
+type Server struct {
+	sk                net.Listener
+	port              int
+	mux               *http.ServeMux
+	registeredHanders []string
+	sync.Mutex
+}
+
+// Init TODO
+func (n *Server) Init() {
+	n.mux = http.NewServeMux()
+
+	// Register local handlers
+	n.RegisterHandler(n, diagPaths2Func)
+}
+
+// RegisterHandler TODO
+func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
+	n.Lock()
+	defer n.Unlock()
+	for path, fun := range hdlrs {
+		n.mux.Handle(path, httpHandlerCustom{ctx, fun})
+		n.registeredHanders = append(n.registeredHanders, path)
+	}
+}
+
+// EnableDebug opens a TCP socket to debug the passed network DB
+func (n *Server) EnableDebug(ip string, port int) {
+	n.Lock()
+	defer n.Unlock()
+
+	n.port = port
+	logrus.SetLevel(logrus.DebugLevel)
+
+	if n.sk != nil {
+		logrus.Infof("The server is already up and running")
+		return
+	}
+
+	logrus.Infof("Starting the server listening on %d for commands", port)
+
+	// // Create the socket
+	// var err error
+	// n.sk, err = net.Listen("tcp", listeningAddr)
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+	//
+	// go func() {
+	// 	http.Serve(n.sk, n.mux)
+	// }()
+	http.ListenAndServe(":8000", n.mux)
+}
+
+// DisableDebug stop the dubug and closes the tcp socket
+func (n *Server) DisableDebug() {
+	n.Lock()
+	defer n.Unlock()
+	n.sk.Close()
+	n.sk = nil
+}
+
+// IsDebugEnable returns true when the debug is enabled
+func (n *Server) IsDebugEnable() bool {
+	n.Lock()
+	defer n.Unlock()
+	return n.sk != nil
+}
+
+func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path)
+}
+
+func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	n, ok := ctx.(*Server)
+	if ok {
+		for _, path := range n.registeredHanders {
+			fmt.Fprintf(w, "%s\n", path)
+		}
+	}
+}
+
+func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	fmt.Fprintf(w, "OK\n")
+}
+
+// DebugHTTPForm TODO
+func DebugHTTPForm(r *http.Request) {
+	r.ParseForm()
+	for k, v := range r.Form {
+		logrus.Debugf("Form[%q] = %q\n", k, v)
+	}
+}
+
+// HTTPReplyError TODO
+func HTTPReplyError(w http.ResponseWriter, message, usage string) {
+	fmt.Fprintf(w, "%s\n", message)
+	if usage != "" {
+		fmt.Fprintf(w, "Usage: %s\n", usage)
+	}
+}

+ 3 - 0
libnetwork/networkdb/delegate.go

@@ -104,6 +104,9 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
 	}
 
 	n = nDB.checkAndGetNode(nEvent)
+	if n == nil {
+		return false
+	}
 
 	nDB.purgeSameNode(n)
 	n.ltime = nEvent.LTime

+ 19 - 0
libnetwork/networkdb/networkdb.go

@@ -108,6 +108,11 @@ type PeerInfo struct {
 	IP   string
 }
 
+// PeerClusterInfo represents the peer (gossip cluster) nodes
+type PeerClusterInfo struct {
+	PeerInfo
+}
+
 type node struct {
 	memberlist.Node
 	ltime serf.LamportTime
@@ -253,6 +258,20 @@ func (nDB *NetworkDB) Close() {
 	}
 }
 
+// ClusterPeers returns all the gossip cluster peers.
+func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
+	nDB.RLock()
+	defer nDB.RUnlock()
+	peers := make([]PeerInfo, 0, len(nDB.nodes))
+	for _, node := range nDB.nodes {
+		peers = append(peers, PeerInfo{
+			Name: node.Name,
+			IP:   node.Node.Addr.String(),
+		})
+	}
+	return peers
+}
+
 // Peers returns the gossip peers for a given network.
 func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
 	nDB.RLock()

+ 242 - 0
libnetwork/networkdb/networkdbdiagnose.go

@@ -0,0 +1,242 @@
+package networkdb
+
+import (
+	"fmt"
+	"net/http"
+	"strings"
+
+	"github.com/docker/libnetwork/diagnose"
+)
+
+const (
+	missingParameter = "missing parameter"
+)
+
+// NetDbPaths2Func TODO
+var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
+	"/join":         dbJoin,
+	"/networkpeers": dbPeers,
+	"/clusterpeers": dbClusterPeers,
+	"/joinnetwork":  dbJoinNetwork,
+	"/leavenetwork": dbLeaveNetwork,
+	"/createentry":  dbCreateEntry,
+	"/updateentry":  dbUpdateEntry,
+	"/deleteentry":  dbDeleteEntry,
+	"/getentry":     dbGetEntry,
+	"/gettable":     dbGetTable,
+}
+
+func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["members"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
+		return
+	}
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		err := nDB.Join(strings.Split(r.Form["members"][0], ","))
+		if err != nil {
+			fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err)
+			return
+		}
+
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["nid"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
+		return
+	}
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		peers := nDB.Peers(r.Form["nid"][0])
+		fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers))
+		for i, peerInfo := range peers {
+			fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
+		}
+	}
+}
+
+func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		peers := nDB.ClusterPeers()
+		fmt.Fprintf(w, "Total peers: %d\n", len(peers))
+		for i, peerInfo := range peers {
+			fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
+		}
+	}
+}
+
+func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 ||
+		len(r.Form["nid"]) < 1 ||
+		len(r.Form["key"]) < 1 ||
+		len(r.Form["value"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
+		return
+	}
+
+	tname := r.Form["tname"][0]
+	nid := r.Form["nid"][0]
+	key := r.Form["key"][0]
+	value := r.Form["value"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 ||
+		len(r.Form["nid"]) < 1 ||
+		len(r.Form["key"]) < 1 ||
+		len(r.Form["value"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
+		return
+	}
+
+	tname := r.Form["tname"][0]
+	nid := r.Form["nid"][0]
+	key := r.Form["key"][0]
+	value := r.Form["value"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 ||
+		len(r.Form["nid"]) < 1 ||
+		len(r.Form["key"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
+		return
+	}
+
+	tname := r.Form["tname"][0]
+	nid := r.Form["nid"][0]
+	key := r.Form["key"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		err := nDB.DeleteEntry(tname, nid, key)
+		if err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 ||
+		len(r.Form["nid"]) < 1 ||
+		len(r.Form["key"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
+		return
+	}
+
+	tname := r.Form["tname"][0]
+	nid := r.Form["nid"][0]
+	key := r.Form["key"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		value, err := nDB.GetEntry(tname, nid, key)
+		if err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value))
+	}
+}
+
+func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["nid"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
+		return
+	}
+
+	nid := r.Form["nid"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		if err := nDB.JoinNetwork(nid); err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["nid"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
+		return
+	}
+
+	nid := r.Form["nid"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		if err := nDB.LeaveNetwork(nid); err != nil {
+			diagnose.HTTPReplyError(w, err.Error(), "")
+			return
+		}
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 ||
+		len(r.Form["nid"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
+		return
+	}
+
+	tname := r.Form["tname"][0]
+	nid := r.Form["nid"][0]
+
+	nDB, ok := ctx.(*NetworkDB)
+	if ok {
+		table := nDB.GetTableByNetwork(tname, nid)
+		fmt.Fprintf(w, "total elements: %d\n", len(table))
+		i := 0
+		for k, v := range table {
+			fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte)))
+			i++
+		}
+	}
+}

+ 7 - 0
libnetwork/test/networkDb/Dockerfile

@@ -0,0 +1,7 @@
+FROM alpine
+
+COPY testMain /app/
+
+WORKDIR app
+
+ENTRYPOINT ["/app/testMain"]

+ 15 - 0
libnetwork/test/networkDb/README

@@ -0,0 +1,15 @@
+SERVER
+
+cd test/networkdb
+env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile .
+(only for testkit case) docker push fcrisciani/networkdb-test
+
+Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000
+
+CLIENT
+
+cd test/networkdb
+Join cluster: docker run -it --network net1 fcrisciani/networkdb-test client join testdb 8000
+Join network: docker run -it --network net1 fcrisciani/networkdb-test client join-network testdb 8000 test
+Run test: docker run -it --network net1 fcrisciani/networkdb-test client write-delete-unique-keys testdb 8000 test tableBla 3 10
+check table: curl "localhost:32768/gettable?nid=test&tname=table_name"

+ 693 - 0
libnetwork/test/networkDb/dbclient/ndbClient.go

@@ -0,0 +1,693 @@
+package dbclient
+
+import (
+	"context"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
+	"os"
+	"regexp"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+)
+
+var servicePort string
+
+const totalWrittenKeys string = "totalKeys"
+
+type resultTuple struct {
+	id     string
+	result int
+}
+
+func httpGetFatalError(ip, port, path string) {
+	// for {
+	body, err := httpGet(ip, port, path)
+	if err != nil || !strings.Contains(string(body), "OK") {
+		// if strings.Contains(err.Error(), "EOF") {
+		// 	logrus.Warnf("Got EOF path:%s err:%s", path, err)
+		// 	continue
+		// }
+		log.Fatalf("[%s] error %s %s", path, err, body)
+	}
+	// break
+	// }
+}
+
+func httpGet(ip, port, path string) ([]byte, error) {
+	resp, err := http.Get("http://" + ip + ":" + port + path)
+	if err != nil {
+		logrus.Errorf("httpGet error:%s", err)
+		return nil, err
+	}
+	defer resp.Body.Close()
+	body, err := ioutil.ReadAll(resp.Body)
+	return body, err
+}
+
+func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
+	httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
+
+	if doneCh != nil {
+		doneCh <- resultTuple{id: ip, result: 0}
+	}
+}
+
+func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
+	httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
+
+	if doneCh != nil {
+		doneCh <- resultTuple{id: ip, result: 0}
+	}
+}
+
+func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
+	httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
+
+	if doneCh != nil {
+		doneCh <- resultTuple{id: ip, result: 0}
+	}
+}
+
+func writeTableKey(ip, port, networkName, tableName, key string) {
+	createPath := "/createentry?nid=" + networkName + "&tname=" + tableName + "&value=v&key="
+	httpGetFatalError(ip, port, createPath+key)
+}
+
+func deleteTableKey(ip, port, networkName, tableName, key string) {
+	deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
+	httpGetFatalError(ip, port, deletePath+key)
+}
+
+func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/clusterpeers")
+
+	if err != nil {
+		logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
+	peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
+
+	doneCh <- resultTuple{id: ip, result: peersNum}
+}
+
+func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
+
+	if err != nil {
+		logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
+	peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
+
+	doneCh <- resultTuple{id: ip, result: peersNum}
+}
+
+func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
+
+	if err != nil {
+		logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
+	entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+	doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
+func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+	httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
+	if doneCh != nil {
+		doneCh <- resultTuple{id: ip, result: 0}
+	}
+}
+
+func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
+	body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
+
+	if err != nil {
+		logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
+		doneCh <- resultTuple{id: ip, result: -1}
+		return
+	}
+	elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
+	entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
+	doneCh <- resultTuple{id: ip, result: entriesNum}
+}
+
+func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+	for x := 0; ; x++ {
+		select {
+		case <-ctx.Done():
+			doneCh <- resultTuple{id: ip, result: x}
+			return
+		default:
+			k := key + strconv.Itoa(x)
+			// write key
+			writeTableKey(ip, port, networkName, tableName, k)
+			// give time to send out key writes
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+}
+
+func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+	for x := 0; ; x++ {
+		select {
+		case <-ctx.Done():
+			doneCh <- resultTuple{id: ip, result: x}
+			return
+		default:
+			k := key + strconv.Itoa(x)
+			// write key
+			writeTableKey(ip, port, networkName, tableName, k)
+			// give time to send out key writes
+			time.Sleep(100 * time.Millisecond)
+			// delete key
+			deleteTableKey(ip, port, networkName, tableName, k)
+		}
+	}
+}
+
+func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
+	for x := 0; ; x++ {
+		select {
+		case <-ctx.Done():
+			doneCh <- resultTuple{id: ip, result: x}
+			return
+		default:
+			k := key + strconv.Itoa(x)
+			// write key
+			writeTableKey(ip, port, networkName, tableName, k)
+			time.Sleep(100 * time.Millisecond)
+			// delete key
+			deleteTableKey(ip, port, networkName, tableName, k)
+			// give some time
+			time.Sleep(100 * time.Millisecond)
+			// leave network
+			leaveNetwork(ip, port, networkName, nil)
+			// join network
+			joinNetwork(ip, port, networkName, nil)
+		}
+	}
+}
+
+func ready(ip, port string, doneCh chan resultTuple) {
+	for {
+		body, err := httpGet(ip, port, "/ready")
+		if err != nil || !strings.Contains(string(body), "OK") {
+			time.Sleep(500 * time.Millisecond)
+			continue
+		}
+		// success
+		break
+	}
+	// notify the completion
+	doneCh <- resultTuple{id: ip, result: 0}
+}
+
+func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
+	startTime := time.Now().UnixNano()
+	var successTime int64
+
+	// Loop for 2 minutes to guartee that the result is stable
+	for {
+		select {
+		case <-ctx.Done():
+			// Validate test success, if the time is set means that all the tables are empty
+			if successTime != 0 {
+				logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
+				return
+			}
+			log.Fatal("Test failed, there is still entries in the tables of the nodes")
+		default:
+			logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
+			doneCh := make(chan resultTuple, len(ips))
+			for _, ip := range ips {
+				go fn(ip, servicePort, networkName, tableName, doneCh)
+			}
+
+			nodesWithCorrectEntriesNum := 0
+			for i := len(ips); i > 0; i-- {
+				tableEntries := <-doneCh
+				logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
+				if tableEntries.result == expectedEntries {
+					nodesWithCorrectEntriesNum++
+				}
+			}
+			close(doneCh)
+			if nodesWithCorrectEntriesNum == len(ips) {
+				if successTime == 0 {
+					successTime = time.Now().UnixNano()
+					logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
+				}
+			} else {
+				successTime = 0
+			}
+			time.Sleep(10 * time.Second)
+		}
+	}
+}
+
+func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
+	var totalKeys int
+	resultTable := make(map[string]int)
+	for i := 0; i < parallelWriters; i++ {
+		logrus.Infof("Waiting for %d workers", parallelWriters-i)
+		workerReturn := <-doneCh
+		totalKeys += workerReturn.result
+		if mustWrite && workerReturn.result == 0 {
+			log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
+		}
+		if !mustWrite && workerReturn.result != 0 {
+			log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
+		}
+		if mustWrite {
+			resultTable[workerReturn.id] = workerReturn.result
+			logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
+		}
+	}
+	resultTable[totalWrittenKeys] = totalKeys
+	return resultTable
+}
+
+// ready
+func doReady(ips []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	// check all the nodes
+	for _, ip := range ips {
+		go ready(ip, servicePort, doneCh)
+	}
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		<-doneCh
+	}
+	close(doneCh)
+}
+
+// join
+func doJoin(ips []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	// check all the nodes
+	for i, ip := range ips {
+		members := append([]string(nil), ips[:i]...)
+		members = append(members, ips[i+1:]...)
+		go joinCluster(ip, servicePort, members, doneCh)
+	}
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		<-doneCh
+	}
+	close(doneCh)
+}
+
+// cluster-peers expectedNumberPeers
+func doClusterPeers(ips []string, args []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	expectedPeers, _ := strconv.Atoi(args[0])
+	// check all the nodes
+	for _, ip := range ips {
+		go clusterPeersNumber(ip, servicePort, doneCh)
+	}
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		node := <-doneCh
+		if node.result != expectedPeers {
+			log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+		}
+	}
+	close(doneCh)
+}
+
+// join-network networkName
+func doJoinNetwork(ips []string, args []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	// check all the nodes
+	for _, ip := range ips {
+		go joinNetwork(ip, servicePort, args[0], doneCh)
+	}
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		<-doneCh
+	}
+	close(doneCh)
+}
+
+// leave-network networkName
+func doLeaveNetwork(ips []string, args []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	// check all the nodes
+	for _, ip := range ips {
+		go leaveNetwork(ip, servicePort, args[0], doneCh)
+	}
+	// wait for the readiness of all nodes
+	for i := len(ips); i > 0; i-- {
+		<-doneCh
+	}
+	close(doneCh)
+}
+
+// cluster-peers networkName expectedNumberPeers maxRetry
+func doNetworkPeers(ips []string, args []string) {
+	doneCh := make(chan resultTuple, len(ips))
+	networkName := args[0]
+	expectedPeers, _ := strconv.Atoi(args[1])
+	maxRetry, _ := strconv.Atoi(args[2])
+	for retry := 0; retry < maxRetry; retry++ {
+		// check all the nodes
+		for _, ip := range ips {
+			go networkPeersNumber(ip, servicePort, networkName, doneCh)
+		}
+		// wait for the readiness of all nodes
+		for i := len(ips); i > 0; i-- {
+			node := <-doneCh
+			if node.result != expectedPeers {
+				if retry == maxRetry-1 {
+					log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+				} else {
+					logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
+				}
+				time.Sleep(1 * time.Second)
+			}
+		}
+	}
+	close(doneCh)
+}
+
+// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteUniqueKeys(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+
+	doneCh := make(chan resultTuple, parallelWriters)
+	// Enable watch of tables from clients
+	for i := 0; i < parallelWriters; i++ {
+		go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// Start parallel writers that will create and delete unique keys
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	cancel()
+	ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
+	checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
+	cancel()
+}
+
+// write-unique-keys networkName tableName numParallelWriters writeTimeSec
+func doWriteUniqueKeys(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+
+	doneCh := make(chan resultTuple, parallelWriters)
+	// Enable watch of tables from clients
+	for i := 0; i < parallelWriters; i++ {
+		go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// Start parallel writers that will create and delete unique keys
+	defer close(doneCh)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
+	cancel()
+}
+
+// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteLeaveJoin(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+
+	// Start parallel writers that will create and delete unique keys
+	doneCh := make(chan resultTuple, parallelWriters)
+	defer close(doneCh)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	cancel()
+}
+
+// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
+func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+
+	// Start parallel writers that will create and delete unique keys
+	doneCh := make(chan resultTuple, parallelWriters)
+	defer close(doneCh)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// The writers will leave the network
+	for i := 0; i < parallelWriters; i++ {
+		logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+		go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// Give some time
+	time.Sleep(100 * time.Millisecond)
+
+	// The writers will join the network
+	for i := 0; i < parallelWriters; i++ {
+		logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
+		go joinNetwork(ips[i], servicePort, networkName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	cancel()
+}
+
+// write-wait-leave networkName tableName numParallelWriters writeTimeSec
+func doWriteWaitLeave(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+
+	// Start parallel writers that will create and delete unique keys
+	doneCh := make(chan resultTuple, parallelWriters)
+	defer close(doneCh)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	// The writers will leave the network
+	for i := 0; i < parallelWriters; i++ {
+		logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+		go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+	}
+	waitWriters(parallelWriters, false, doneCh)
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
+	cancel()
+}
+
+// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
+func doWriteWaitLeaveJoin(ips []string, args []string) {
+	networkName := args[0]
+	tableName := args[1]
+	parallelWriters, _ := strconv.Atoi(args[2])
+	writeTimeSec, _ := strconv.Atoi(args[3])
+	parallerlLeaver, _ := strconv.Atoi(args[4])
+
+	// Start parallel writers that will create and delete unique keys
+	doneCh := make(chan resultTuple, parallelWriters)
+	defer close(doneCh)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
+	for i := 0; i < parallelWriters; i++ {
+		key := "key-" + strconv.Itoa(i) + "-"
+		logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
+		go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
+	}
+
+	// Sync with all the writers
+	keyMap := waitWriters(parallelWriters, true, doneCh)
+	cancel()
+	logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
+
+	keysExpected := keyMap[totalWrittenKeys]
+	// The Leavers will leave the network
+	for i := 0; i < parallerlLeaver; i++ {
+		logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
+		go leaveNetwork(ips[i], servicePort, networkName, doneCh)
+		// Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
+		keysExpected -= keyMap[ips[i]]
+	}
+	waitWriters(parallerlLeaver, false, doneCh)
+
+	// Give some time
+	time.Sleep(100 * time.Millisecond)
+
+	// The writers will join the network
+	for i := 0; i < parallerlLeaver; i++ {
+		logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
+		go joinNetwork(ips[i], servicePort, networkName, doneCh)
+	}
+	waitWriters(parallerlLeaver, false, doneCh)
+
+	// check table entries for 2 minutes
+	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
+	checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
+	cancel()
+}
+
+var cmdArgChec = map[string]int{
+	"debug":                    0,
+	"fail":                     0,
+	"ready":                    2,
+	"join":                     2,
+	"leave":                    2,
+	"join-network":             3,
+	"leave-network":            3,
+	"cluster-peers":            3,
+	"write-delete-unique-keys": 4,
+}
+
+// Client is a client
+func Client(args []string) {
+	logrus.Infof("[CLIENT] Starting with arguments %v", args)
+	command := args[0]
+
+	if len(args) < cmdArgChec[command] {
+		log.Fatalf("Command %s requires %d arguments, aborting...", command, cmdArgChec[command])
+	}
+
+	switch command {
+	case "debug":
+		time.Sleep(1 * time.Hour)
+		os.Exit(0)
+	case "fail":
+		log.Fatalf("Test error condition with message: error error error")
+	}
+
+	serviceName := args[1]
+	ips, _ := net.LookupHost("tasks." + serviceName)
+	logrus.Infof("got the ips %v", ips)
+	if len(ips) == 0 {
+		log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
+	}
+	servicePort = args[2]
+	commandArgs := args[3:]
+	logrus.Infof("Executing %s with args:%v", command, commandArgs)
+	switch command {
+	case "ready":
+		doReady(ips)
+	case "join":
+		doJoin(ips)
+	case "leave":
+
+	case "cluster-peers":
+		// cluster-peers
+		doClusterPeers(ips, commandArgs)
+
+	case "join-network":
+		// join-network networkName
+		doJoinNetwork(ips, commandArgs)
+	case "leave-network":
+		// leave-network networkName
+		doLeaveNetwork(ips, commandArgs)
+	case "network-peers":
+		// network-peers networkName maxRetry
+		doNetworkPeers(ips, commandArgs)
+
+	case "write-unique-keys":
+		// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+		doWriteUniqueKeys(ips, commandArgs)
+	case "write-delete-unique-keys":
+		// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
+		doWriteDeleteUniqueKeys(ips, commandArgs)
+	case "write-delete-leave-join":
+		// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
+		doWriteDeleteLeaveJoin(ips, commandArgs)
+	case "write-delete-wait-leave-join":
+		// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
+		doWriteDeleteWaitLeaveJoin(ips, commandArgs)
+	case "write-wait-leave":
+		// write-wait-leave networkName tableName numParallelWriters writeTimeSec
+		doWriteWaitLeave(ips, commandArgs)
+	case "write-wait-leave-join":
+		// write-wait-leave networkName tableName numParallelWriters writeTimeSec
+		doWriteWaitLeaveJoin(ips, commandArgs)
+	default:
+		log.Fatalf("Command %s not recognized", command)
+	}
+}

+ 109 - 0
libnetwork/test/networkDb/dbserver/ndbServer.go

@@ -0,0 +1,109 @@
+package dbserver
+
+import (
+	"errors"
+	"fmt"
+	"log"
+	"net"
+	"net/http"
+	"os"
+	"strconv"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/diagnose"
+	"github.com/docker/libnetwork/networkdb"
+	"github.com/docker/libnetwork/test/networkDb/dummyclient"
+)
+
+var nDB *networkdb.NetworkDB
+var server diagnose.Server
+var ipAddr string
+
+var testerPaths2Func = map[string]diagnose.HTTPHandlerFunc{
+	"/myip": ipaddress,
+}
+
+func ipaddress(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	fmt.Fprintf(w, "%s\n", ipAddr)
+}
+
+// Server starts the server
+func Server(args []string) {
+	logrus.Infof("[SERVER] Starting with arguments %v", args)
+	if len(args) < 1 {
+		log.Fatal("Port number is a mandatory argument, aborting...")
+	}
+	port, _ := strconv.Atoi(args[0])
+	var localNodeName string
+	var ok bool
+	if localNodeName, ok = os.LookupEnv("TASK_ID"); !ok {
+		log.Fatal("TASK_ID environment variable not set, aborting...")
+	}
+	logrus.Infof("[SERVER] Starting node %s on port %d", localNodeName, port)
+
+	ip, err := getIPInterface("eth0")
+	if err != nil {
+		logrus.Errorf("%s There was a problem with the IP %s\n", localNodeName, err)
+		return
+	}
+	ipAddr = ip
+	logrus.Infof("%s uses IP %s\n", localNodeName, ipAddr)
+
+	server = diagnose.Server{}
+	server.Init()
+	conf := networkdb.DefaultConfig()
+	conf.NodeName = localNodeName
+	conf.AdvertiseAddr = ipAddr
+	conf.BindAddr = ipAddr
+	nDB, err = networkdb.New(conf)
+	if err != nil {
+		logrus.Infof("%s error in the DB init %s\n", localNodeName, err)
+		return
+	}
+
+	// Register network db handlers
+	server.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
+	server.RegisterHandler(nil, testerPaths2Func)
+	server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
+	server.EnableDebug("", port)
+}
+
+func getIPInterface(name string) (string, error) {
+	ifaces, err := net.Interfaces()
+	if err != nil {
+		return "", err
+	}
+	for _, iface := range ifaces {
+		if iface.Name != name {
+			continue // not the name specified
+		}
+
+		if iface.Flags&net.FlagUp == 0 {
+			return "", errors.New("Interfaces is down")
+		}
+
+		addrs, err := iface.Addrs()
+		if err != nil {
+			return "", err
+		}
+		for _, addr := range addrs {
+			var ip net.IP
+			switch v := addr.(type) {
+			case *net.IPNet:
+				ip = v.IP
+			case *net.IPAddr:
+				ip = v.IP
+			}
+			if ip == nil || ip.IsLoopback() {
+				continue
+			}
+			ip = ip.To4()
+			if ip == nil {
+				continue
+			}
+			return ip.String(), nil
+		}
+		return "", errors.New("Interfaces does not have a valid IPv4")
+	}
+	return "", errors.New("Interface not found")
+}

+ 118 - 0
libnetwork/test/networkDb/dummyclient/dummyClient.go

@@ -0,0 +1,118 @@
+package dummyclient
+
+import (
+	"fmt"
+	"log"
+	"net/http"
+
+	"github.com/Sirupsen/logrus"
+	events "github.com/docker/go-events"
+	"github.com/docker/libnetwork/diagnose"
+	"github.com/docker/libnetwork/networkdb"
+)
+
+// DummyClientPaths2Func exported paths for the client
+var DummyClientPaths2Func = map[string]diagnose.HTTPHandlerFunc{
+	"/watchtable":          watchTable,
+	"/watchedtableentries": watchTableEntries,
+}
+
+const (
+	missingParameter = "missing parameter"
+)
+
+type tableHandler struct {
+	cancelWatch func()
+	entries     map[string]string
+}
+
+var clientWatchTable = map[string]tableHandler{}
+
+func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
+		return
+	}
+
+	tableName := r.Form["tname"][0]
+	if _, ok := clientWatchTable[tableName]; ok {
+		fmt.Fprintf(w, "OK\n")
+		return
+	}
+
+	nDB, ok := ctx.(*networkdb.NetworkDB)
+	if ok {
+		ch, cancel := nDB.Watch(tableName, "", "")
+		clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)}
+		go handleTableEvents(tableName, ch)
+
+		fmt.Fprintf(w, "OK\n")
+	}
+}
+
+func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request) {
+	r.ParseForm()
+	diagnose.DebugHTTPForm(r)
+	if len(r.Form["tname"]) < 1 {
+		diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
+		return
+	}
+
+	tableName := r.Form["tname"][0]
+	table, ok := clientWatchTable[tableName]
+	if !ok {
+		fmt.Fprintf(w, "Table %s not watched\n", tableName)
+		return
+	}
+
+	fmt.Fprintf(w, "total elements: %d\n", len(table.entries))
+	i := 0
+	for k, v := range table.entries {
+		fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, v)
+		i++
+	}
+}
+
+func handleTableEvents(tableName string, ch *events.Channel) {
+	var (
+		// nid   string
+		eid   string
+		value []byte
+		isAdd bool
+	)
+
+	logrus.Infof("Started watching table:%s", tableName)
+	for {
+		select {
+		case <-ch.Done():
+			logrus.Infof("End watching %s", tableName)
+			return
+
+		case evt := <-ch.C:
+			logrus.Infof("Recevied new event on:%s", tableName)
+			switch event := evt.(type) {
+			case networkdb.CreateEvent:
+				// nid = event.NetworkID
+				eid = event.Key
+				value = event.Value
+				isAdd = true
+			case networkdb.DeleteEvent:
+				// nid = event.NetworkID
+				eid = event.Key
+				value = event.Value
+				isAdd = false
+			default:
+				log.Fatalf("Unexpected table event = %#v", event)
+			}
+			if isAdd {
+				// logrus.Infof("Add %s %s", tableName, eid)
+				clientWatchTable[tableName].entries[eid] = string(value)
+			} else {
+				// logrus.Infof("Del %s %s", tableName, eid)
+				delete(clientWatchTable[tableName].entries, eid)
+			}
+		}
+	}
+}

+ 24 - 0
libnetwork/test/networkDb/testMain.go

@@ -0,0 +1,24 @@
+package main
+
+import (
+	"log"
+	"os"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/test/networkDb/dbclient"
+	"github.com/docker/libnetwork/test/networkDb/dbserver"
+)
+
+func main() {
+	logrus.Infof("Starting the image with these args: %v", os.Args)
+	if len(os.Args) < 1 {
+		log.Fatal("You need at least 1 argument [client/server]")
+	}
+
+	switch os.Args[1] {
+	case "server":
+		dbserver.Server(os.Args[2:])
+	case "client":
+		dbclient.Client(os.Args[2:])
+	}
+}