|
@@ -2,6 +2,7 @@ package networkdb
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
+ "context"
|
|
|
"crypto/rand"
|
|
|
"encoding/hex"
|
|
|
"fmt"
|
|
@@ -17,10 +18,12 @@ import (
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- reapPeriod = 5 * time.Second
|
|
|
- retryInterval = 1 * time.Second
|
|
|
- nodeReapInterval = 24 * time.Hour
|
|
|
- nodeReapPeriod = 2 * time.Hour
|
|
|
+ reapPeriod = 5 * time.Second
|
|
|
+ rejoinClusterDuration = 10 * time.Second
|
|
|
+ rejoinInterval = 60 * time.Second
|
|
|
+ retryInterval = 1 * time.Second
|
|
|
+ nodeReapInterval = 24 * time.Hour
|
|
|
+ nodeReapPeriod = 2 * time.Hour
|
|
|
)
|
|
|
|
|
|
type logWriter struct{}
|
|
@@ -154,7 +157,7 @@ func (nDB *NetworkDB) clusterInit() error {
|
|
|
return fmt.Errorf("failed to create memberlist: %v", err)
|
|
|
}
|
|
|
|
|
|
- nDB.stopCh = make(chan struct{})
|
|
|
+ nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background())
|
|
|
nDB.memberlist = mlist
|
|
|
|
|
|
for _, trigger := range []struct {
|
|
@@ -166,16 +169,17 @@ func (nDB *NetworkDB) clusterInit() error {
|
|
|
{config.PushPullInterval, nDB.bulkSyncTables},
|
|
|
{retryInterval, nDB.reconnectNode},
|
|
|
{nodeReapPeriod, nDB.reapDeadNode},
|
|
|
+ {rejoinInterval, nDB.rejoinClusterBootStrap},
|
|
|
} {
|
|
|
t := time.NewTicker(trigger.interval)
|
|
|
- go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
|
|
|
+ go nDB.triggerFunc(trigger.interval, t.C, trigger.fn)
|
|
|
nDB.tickers = append(nDB.tickers, t)
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
|
|
|
+func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) {
|
|
|
t := time.NewTicker(retryInterval)
|
|
|
defer t.Stop()
|
|
|
|
|
@@ -191,7 +195,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
|
|
|
continue
|
|
|
}
|
|
|
return
|
|
|
- case <-stop:
|
|
|
+ case <-ctx.Done():
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -202,8 +206,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
|
|
|
mlist := nDB.memberlist
|
|
|
|
|
|
if _, err := mlist.Join(members); err != nil {
|
|
|
- // In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
|
|
|
- go nDB.retryJoin(members, nDB.stopCh)
|
|
|
+ // In case of failure, we no longer need to explicitly call retryJoin.
|
|
|
+ // rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec
|
|
|
return fmt.Errorf("could not join node to memberlist: %v", err)
|
|
|
}
|
|
|
|
|
@@ -225,7 +229,8 @@ func (nDB *NetworkDB) clusterLeave() error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- close(nDB.stopCh)
|
|
|
+ // cancel the context
|
|
|
+ nDB.cancelCtx()
|
|
|
|
|
|
for _, t := range nDB.tickers {
|
|
|
t.Stop()
|
|
@@ -234,19 +239,19 @@ func (nDB *NetworkDB) clusterLeave() error {
|
|
|
return mlist.Shutdown()
|
|
|
}
|
|
|
|
|
|
-func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
|
|
|
+func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) {
|
|
|
// Use a random stagger to avoid syncronizing
|
|
|
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
|
|
|
select {
|
|
|
case <-time.After(randStagger):
|
|
|
- case <-stop:
|
|
|
+ case <-nDB.ctx.Done():
|
|
|
return
|
|
|
}
|
|
|
for {
|
|
|
select {
|
|
|
case <-C:
|
|
|
f()
|
|
|
- case <-stop:
|
|
|
+ case <-nDB.ctx.Done():
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster,
|
|
|
+// if not, call the cluster join to merge 2 separate clusters that are formed when all managers
|
|
|
+// stopped/started at the same time
|
|
|
+func (nDB *NetworkDB) rejoinClusterBootStrap() {
|
|
|
+ nDB.RLock()
|
|
|
+ if len(nDB.bootStrapIP) == 0 {
|
|
|
+ nDB.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
|
|
+ for _, bootIP := range nDB.bootStrapIP {
|
|
|
+ for _, node := range nDB.nodes {
|
|
|
+ if node.Addr.Equal(bootIP) {
|
|
|
+ // One of the bootstrap nodes is part of the cluster, return
|
|
|
+ nDB.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ bootStrapIPs = append(bootStrapIPs, bootIP.String())
|
|
|
+ }
|
|
|
+ nDB.RUnlock()
|
|
|
+ // None of the bootStrap nodes are in the cluster, call memberlist join
|
|
|
+ logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
|
|
+ ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
|
|
+ defer cancel()
|
|
|
+ nDB.retryJoin(ctx, bootStrapIPs)
|
|
|
+}
|
|
|
+
|
|
|
func (nDB *NetworkDB) reconnectNode() {
|
|
|
nDB.RLock()
|
|
|
if len(nDB.failedNodes) == 0 {
|