Prechádzať zdrojové kódy

Adjust corner case for reconnect logic

Previous logic was not accounting that each node is
in the node list so the bootstrap nodes won't retry
to reconnect because they will always find themselves
in the node map
Added test that validate the gossip island condition

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
Flavio Crisciani 7 rokov pred
rodič
commit
500d9f4515

+ 23 - 6
libnetwork/networkdb/cluster.go

@@ -285,18 +285,35 @@ func (nDB *NetworkDB) rejoinClusterBootStrap() {
 		return
 		return
 	}
 	}
 
 
+	myself, _ := nDB.nodes[nDB.config.NodeID]
 	bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
 	bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
 	for _, bootIP := range nDB.bootStrapIP {
 	for _, bootIP := range nDB.bootStrapIP {
-		for _, node := range nDB.nodes {
-			if node.Addr.Equal(bootIP) {
-				// One of the bootstrap nodes is part of the cluster, return
-				nDB.RUnlock()
-				return
+		// botostrap IPs are usually IP:port from the Join
+		var bootstrapIP net.IP
+		ipStr, _, err := net.SplitHostPort(bootIP)
+		if err != nil {
+			// try to parse it as an IP with port
+			// Note this seems to be the case for swarm that do not specify any port
+			ipStr = bootIP
+		}
+		bootstrapIP = net.ParseIP(ipStr)
+		if bootstrapIP != nil {
+			for _, node := range nDB.nodes {
+				if node.Addr.Equal(bootstrapIP) && !node.Addr.Equal(myself.Addr) {
+					// One of the bootstrap nodes (and not myself) is part of the cluster, return
+					nDB.RUnlock()
+					return
+				}
 			}
 			}
+			bootStrapIPs = append(bootStrapIPs, bootIP)
 		}
 		}
-		bootStrapIPs = append(bootStrapIPs, bootIP.String())
 	}
 	}
 	nDB.RUnlock()
 	nDB.RUnlock()
+	if len(bootStrapIPs) == 0 {
+		// this will also avoid to call the Join with an empty list erasing the current bootstrap ip list
+		logrus.Debug("rejoinClusterBootStrap did not find any valid IP")
+		return
+	}
 	// None of the bootStrap nodes are in the cluster, call memberlist join
 	// None of the bootStrap nodes are in the cluster, call memberlist join
 	logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
 	logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
 	ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
 	ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)

+ 3 - 6
libnetwork/networkdb/networkdb.go

@@ -5,7 +5,6 @@ package networkdb
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
-	"net"
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
@@ -96,7 +95,7 @@ type NetworkDB struct {
 
 
 	// bootStrapIP is the list of IPs that can be used to bootstrap
 	// bootStrapIP is the list of IPs that can be used to bootstrap
 	// the gossip.
 	// the gossip.
-	bootStrapIP []net.IP
+	bootStrapIP []string
 
 
 	// lastStatsTimestamp is the last timestamp when the stats got printed
 	// lastStatsTimestamp is the last timestamp when the stats got printed
 	lastStatsTimestamp time.Time
 	lastStatsTimestamp time.Time
@@ -268,10 +267,8 @@ func New(c *Config) (*NetworkDB, error) {
 // instances passed by the caller in the form of addr:port
 // instances passed by the caller in the form of addr:port
 func (nDB *NetworkDB) Join(members []string) error {
 func (nDB *NetworkDB) Join(members []string) error {
 	nDB.Lock()
 	nDB.Lock()
-	nDB.bootStrapIP = make([]net.IP, 0, len(members))
-	for _, m := range members {
-		nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
-	}
+	nDB.bootStrapIP = append([]string(nil), members...)
+	logrus.Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
 	nDB.Unlock()
 	nDB.Unlock()
 	return nDB.clusterJoin(members)
 	return nDB.clusterJoin(members)
 }
 }

+ 65 - 5
libnetwork/networkdb/networkdb_test.go

@@ -31,6 +31,12 @@ func TestMain(m *testing.M) {
 	os.Exit(m.Run())
 	os.Exit(m.Run())
 }
 }
 
 
+func launchNode(t *testing.T, conf Config) *NetworkDB {
+	db, err := New(&conf)
+	require.NoError(t, err)
+	return db
+}
+
 func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
 func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
 	var dbs []*NetworkDB
 	var dbs []*NetworkDB
 	for i := 0; i < num; i++ {
 	for i := 0; i < num; i++ {
@@ -38,12 +44,9 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Co
 		localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
 		localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
 		localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
 		localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
 		localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
 		localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
-		db, err := New(&localConfig)
-		require.NoError(t, err)
-
+		db := launchNode(t, localConfig)
 		if i != 0 {
 		if i != 0 {
-			err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
-			assert.NoError(t, err)
+			assert.NoError(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
 		}
 		}
 
 
 		dbs = append(dbs, db)
 		dbs = append(dbs, db)
@@ -803,3 +806,60 @@ func TestParallelDelete(t *testing.T) {
 
 
 	closeNetworkDBInstances(dbs)
 	closeNetworkDBInstances(dbs)
 }
 }
+
+func TestNetworkDBIslands(t *testing.T) {
+	logrus.SetLevel(logrus.DebugLevel)
+	dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())
+
+	// Get the node IP used currently
+	node, _ := dbs[0].nodes[dbs[0].config.NodeID]
+	baseIPStr := node.Addr.String()
+	// Node 0,1,2 are going to be the 3 bootstrap nodes
+	members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
+		fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
+		fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
+	// Rejoining will update the list of the bootstrap members
+	for i := 3; i < 5; i++ {
+		assert.NoError(t, dbs[i].Join(members))
+	}
+
+	// Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
+	for i := 0; i < 3; i++ {
+		logrus.Infof("node %d leaving", i)
+		dbs[i].Close()
+		time.Sleep(2 * time.Second)
+	}
+
+	// Give some time to let the system propagate the messages and free up the ports
+	time.Sleep(10 * time.Second)
+
+	// Verify that the nodes are actually all gone and marked appropiately
+	for i := 3; i < 5; i++ {
+		assert.Len(t, dbs[i].leftNodes, 3)
+		assert.Len(t, dbs[i].failedNodes, 0)
+	}
+
+	// Spawn again the first 3 nodes with different names but same IP:port
+	for i := 0; i < 3; i++ {
+		logrus.Infof("node %d coming back", i)
+		dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
+		dbs[i] = launchNode(t, *dbs[i].config)
+		time.Sleep(2 * time.Second)
+	}
+
+	// Give some time for the reconnect routine to run, it runs every 60s
+	time.Sleep(50 * time.Second)
+
+	// Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
+	for i := 0; i < 5; i++ {
+		assert.Len(t, dbs[i].nodes, 5)
+		assert.Len(t, dbs[i].failedNodes, 0)
+		if i < 3 {
+			// nodes from 0 to 3 has no left nodes
+			assert.Len(t, dbs[i].leftNodes, 0)
+		} else {
+			// nodes from 4 to 5 has the 3 previous left nodes
+			assert.Len(t, dbs[i].leftNodes, 3)
+		}
+	}
+}