Adding a recovery mechanism for a split gossip cluster
Signed-off-by: Dani Louca <dani.louca@docker.com>
This commit is contained in:
parent
f5aa502856
commit
96472cdaea
3 changed files with 57 additions and 26 deletions
|
@ -2,6 +2,7 @@ package networkdb
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
@ -17,10 +18,12 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
reapPeriod = 5 * time.Second
|
||||
retryInterval = 1 * time.Second
|
||||
nodeReapInterval = 24 * time.Hour
|
||||
nodeReapPeriod = 2 * time.Hour
|
||||
reapPeriod = 5 * time.Second
|
||||
rejoinClusterDuration = 10 * time.Second
|
||||
rejoinInterval = 60 * time.Second
|
||||
retryInterval = 1 * time.Second
|
||||
nodeReapInterval = 24 * time.Hour
|
||||
nodeReapPeriod = 2 * time.Hour
|
||||
)
|
||||
|
||||
type logWriter struct{}
|
||||
|
@ -154,7 +157,7 @@ func (nDB *NetworkDB) clusterInit() error {
|
|||
return fmt.Errorf("failed to create memberlist: %v", err)
|
||||
}
|
||||
|
||||
nDB.stopCh = make(chan struct{})
|
||||
nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background())
|
||||
nDB.memberlist = mlist
|
||||
|
||||
for _, trigger := range []struct {
|
||||
|
@ -166,16 +169,17 @@ func (nDB *NetworkDB) clusterInit() error {
|
|||
{config.PushPullInterval, nDB.bulkSyncTables},
|
||||
{retryInterval, nDB.reconnectNode},
|
||||
{nodeReapPeriod, nDB.reapDeadNode},
|
||||
{rejoinInterval, nDB.rejoinClusterBootStrap},
|
||||
} {
|
||||
t := time.NewTicker(trigger.interval)
|
||||
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
|
||||
go nDB.triggerFunc(trigger.interval, t.C, trigger.fn)
|
||||
nDB.tickers = append(nDB.tickers, t)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
|
||||
func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) {
|
||||
t := time.NewTicker(retryInterval)
|
||||
defer t.Stop()
|
||||
|
||||
|
@ -191,7 +195,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
|
|||
continue
|
||||
}
|
||||
return
|
||||
case <-stop:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -202,8 +206,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
|
|||
mlist := nDB.memberlist
|
||||
|
||||
if _, err := mlist.Join(members); err != nil {
|
||||
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
|
||||
go nDB.retryJoin(members, nDB.stopCh)
|
||||
// In case of failure, we no longer need to explicitly call retryJoin.
|
||||
// rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec
|
||||
return fmt.Errorf("could not join node to memberlist: %v", err)
|
||||
}
|
||||
|
||||
|
@ -225,7 +229,8 @@ func (nDB *NetworkDB) clusterLeave() error {
|
|||
return err
|
||||
}
|
||||
|
||||
close(nDB.stopCh)
|
||||
// cancel the context
|
||||
nDB.cancelCtx()
|
||||
|
||||
for _, t := range nDB.tickers {
|
||||
t.Stop()
|
||||
|
@ -234,19 +239,19 @@ func (nDB *NetworkDB) clusterLeave() error {
|
|||
return mlist.Shutdown()
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
|
||||
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) {
|
||||
// Use a random stagger to avoid syncronizing
|
||||
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
|
||||
select {
|
||||
case <-time.After(randStagger):
|
||||
case <-stop:
|
||||
case <-nDB.ctx.Done():
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-C:
|
||||
f()
|
||||
case <-stop:
|
||||
case <-nDB.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() {
|
|||
}
|
||||
}
|
||||
|
||||
// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster,
|
||||
// if not, call the cluster join to merge 2 separate clusters that are formed when all managers
|
||||
// stopped/started at the same time
|
||||
func (nDB *NetworkDB) rejoinClusterBootStrap() {
|
||||
nDB.RLock()
|
||||
if len(nDB.bootStrapIP) == 0 {
|
||||
nDB.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
|
||||
for _, bootIP := range nDB.bootStrapIP {
|
||||
for _, node := range nDB.nodes {
|
||||
if node.Addr.Equal(bootIP) {
|
||||
// One of the bootstrap nodes is part of the cluster, return
|
||||
nDB.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
bootStrapIPs = append(bootStrapIPs, bootIP.String())
|
||||
}
|
||||
nDB.RUnlock()
|
||||
// None of the bootStrap nodes are in the cluster, call memberlist join
|
||||
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
|
||||
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
|
||||
defer cancel()
|
||||
nDB.retryJoin(ctx, bootStrapIPs)
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) reconnectNode() {
|
||||
nDB.RLock()
|
||||
if len(nDB.failedNodes) == 0 {
|
||||
|
|
|
@ -38,16 +38,11 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
|||
// If we are here means that the event is fresher and the node is known. Update the laport time
|
||||
n.ltime = nEvent.LTime
|
||||
|
||||
// If it is a node leave event for a manager and this is the only manager we
|
||||
// know of we want the reconnect logic to kick in. In a single manager
|
||||
// cluster manager's gossip can't be bootstrapped unless some other node
|
||||
// connects to it.
|
||||
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
||||
for _, ip := range nDB.bootStrapIP {
|
||||
if ip.Equal(n.Addr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// If the node is not known from memberlist we cannot process save any state of it else if it actually
|
||||
// dies we won't receive any notification and we will remain stuck with it
|
||||
if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
|
||||
logrus.Error("node: %s is unknown to memberlist", nEvent.NodeName)
|
||||
return false
|
||||
}
|
||||
|
||||
switch nEvent.Type {
|
||||
|
|
|
@ -3,6 +3,7 @@ package networkdb
|
|||
//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -77,9 +78,10 @@ type NetworkDB struct {
|
|||
// Broadcast queue for node event gossip.
|
||||
nodeBroadcasts *memberlist.TransmitLimitedQueue
|
||||
|
||||
// A central stop channel to stop all go routines running on
|
||||
// A central context to stop all go routines running on
|
||||
// behalf of the NetworkDB instance.
|
||||
stopCh chan struct{}
|
||||
ctx context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
|
||||
// A central broadcaster for all local watchers watching table
|
||||
// events.
|
||||
|
|
Loading…
Reference in a new issue