Merge pull request #2200 from fcrisciani/networkdb-retry
Adjust corner case for reconnect logic
This commit is contained in:
commit
ef457321a9
3 changed files with 92 additions and 18 deletions
|
@ -285,18 +285,35 @@ func (nDB *NetworkDB) rejoinClusterBootStrap() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
myself, _ := nDB.nodes[nDB.config.NodeID]
|
||||||
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
||||||
for _, bootIP := range nDB.bootStrapIP {
|
for _, bootIP := range nDB.bootStrapIP {
|
||||||
for _, node := range nDB.nodes {
|
// botostrap IPs are usually IP:port from the Join
|
||||||
if node.Addr.Equal(bootIP) {
|
var bootstrapIP net.IP
|
||||||
// One of the bootstrap nodes is part of the cluster, return
|
ipStr, _, err := net.SplitHostPort(bootIP)
|
||||||
nDB.RUnlock()
|
if err != nil {
|
||||||
return
|
// try to parse it as an IP with port
|
||||||
}
|
// Note this seems to be the case for swarm that do not specify any port
|
||||||
|
ipStr = bootIP
|
||||||
|
}
|
||||||
|
bootstrapIP = net.ParseIP(ipStr)
|
||||||
|
if bootstrapIP != nil {
|
||||||
|
for _, node := range nDB.nodes {
|
||||||
|
if node.Addr.Equal(bootstrapIP) && !node.Addr.Equal(myself.Addr) {
|
||||||
|
// One of the bootstrap nodes (and not myself) is part of the cluster, return
|
||||||
|
nDB.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bootStrapIPs = append(bootStrapIPs, bootIP)
|
||||||
}
|
}
|
||||||
bootStrapIPs = append(bootStrapIPs, bootIP.String())
|
|
||||||
}
|
}
|
||||||
nDB.RUnlock()
|
nDB.RUnlock()
|
||||||
|
if len(bootStrapIPs) == 0 {
|
||||||
|
// this will also avoid to call the Join with an empty list erasing the current bootstrap ip list
|
||||||
|
logrus.Debug("rejoinClusterBootStrap did not find any valid IP")
|
||||||
|
return
|
||||||
|
}
|
||||||
// None of the bootStrap nodes are in the cluster, call memberlist join
|
// None of the bootStrap nodes are in the cluster, call memberlist join
|
||||||
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
||||||
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
||||||
|
|
|
@ -5,7 +5,6 @@ package networkdb
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -96,7 +95,7 @@ type NetworkDB struct {
|
||||||
|
|
||||||
// bootStrapIP is the list of IPs that can be used to bootstrap
|
// bootStrapIP is the list of IPs that can be used to bootstrap
|
||||||
// the gossip.
|
// the gossip.
|
||||||
bootStrapIP []net.IP
|
bootStrapIP []string
|
||||||
|
|
||||||
// lastStatsTimestamp is the last timestamp when the stats got printed
|
// lastStatsTimestamp is the last timestamp when the stats got printed
|
||||||
lastStatsTimestamp time.Time
|
lastStatsTimestamp time.Time
|
||||||
|
@ -268,10 +267,8 @@ func New(c *Config) (*NetworkDB, error) {
|
||||||
// instances passed by the caller in the form of addr:port
|
// instances passed by the caller in the form of addr:port
|
||||||
func (nDB *NetworkDB) Join(members []string) error {
|
func (nDB *NetworkDB) Join(members []string) error {
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
nDB.bootStrapIP = make([]net.IP, 0, len(members))
|
nDB.bootStrapIP = append([]string(nil), members...)
|
||||||
for _, m := range members {
|
logrus.Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
|
||||||
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
|
|
||||||
}
|
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
return nDB.clusterJoin(members)
|
return nDB.clusterJoin(members)
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,12 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(m.Run())
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func launchNode(t *testing.T, conf Config) *NetworkDB {
|
||||||
|
db, err := New(&conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
|
||||||
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
|
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
|
||||||
var dbs []*NetworkDB
|
var dbs []*NetworkDB
|
||||||
for i := 0; i < num; i++ {
|
for i := 0; i < num; i++ {
|
||||||
|
@ -38,12 +44,9 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Co
|
||||||
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
|
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
|
||||||
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
|
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
|
||||||
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
|
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
|
||||||
db, err := New(&localConfig)
|
db := launchNode(t, localConfig)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
|
assert.NoError(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbs = append(dbs, db)
|
dbs = append(dbs, db)
|
||||||
|
@ -803,3 +806,60 @@ func TestParallelDelete(t *testing.T) {
|
||||||
|
|
||||||
closeNetworkDBInstances(dbs)
|
closeNetworkDBInstances(dbs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNetworkDBIslands(t *testing.T) {
|
||||||
|
logrus.SetLevel(logrus.DebugLevel)
|
||||||
|
dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())
|
||||||
|
|
||||||
|
// Get the node IP used currently
|
||||||
|
node, _ := dbs[0].nodes[dbs[0].config.NodeID]
|
||||||
|
baseIPStr := node.Addr.String()
|
||||||
|
// Node 0,1,2 are going to be the 3 bootstrap nodes
|
||||||
|
members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
|
||||||
|
fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
|
||||||
|
fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
|
||||||
|
// Rejoining will update the list of the bootstrap members
|
||||||
|
for i := 3; i < 5; i++ {
|
||||||
|
assert.NoError(t, dbs[i].Join(members))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
logrus.Infof("node %d leaving", i)
|
||||||
|
dbs[i].Close()
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give some time to let the system propagate the messages and free up the ports
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
|
||||||
|
// Verify that the nodes are actually all gone and marked appropiately
|
||||||
|
for i := 3; i < 5; i++ {
|
||||||
|
assert.Len(t, dbs[i].leftNodes, 3)
|
||||||
|
assert.Len(t, dbs[i].failedNodes, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn again the first 3 nodes with different names but same IP:port
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
logrus.Infof("node %d coming back", i)
|
||||||
|
dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
|
||||||
|
dbs[i] = launchNode(t, *dbs[i].config)
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give some time for the reconnect routine to run, it runs every 60s
|
||||||
|
time.Sleep(50 * time.Second)
|
||||||
|
|
||||||
|
// Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
assert.Len(t, dbs[i].nodes, 5)
|
||||||
|
assert.Len(t, dbs[i].failedNodes, 0)
|
||||||
|
if i < 3 {
|
||||||
|
// nodes from 0 to 3 has no left nodes
|
||||||
|
assert.Len(t, dbs[i].leftNodes, 0)
|
||||||
|
} else {
|
||||||
|
// nodes from 4 to 5 has the 3 previous left nodes
|
||||||
|
assert.Len(t, dbs[i].leftNodes, 3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue