Merge pull request #1775 from sanimej/gossip
Handle single manager reload by having workers reconnect
This commit is contained in:
commit
59994bbb15
4 changed files with 59 additions and 2 deletions
|
@ -284,7 +284,6 @@ func (nDB *NetworkDB) reconnectNode() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
|
if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
|
||||||
logrus.Errorf("failed to send node join during reconnect: %v", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,25 @@ func (d *delegate) NodeMeta(limit int) []byte {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
|
||||||
|
nDB.Lock()
|
||||||
|
defer nDB.Unlock()
|
||||||
|
|
||||||
|
for _, nodes := range []map[string]*node{
|
||||||
|
nDB.failedNodes,
|
||||||
|
nDB.leftNodes,
|
||||||
|
nDB.nodes,
|
||||||
|
} {
|
||||||
|
if n, ok := nodes[nEvent.NodeName]; ok {
|
||||||
|
if n.ltime >= nEvent.LTime {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
|
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
defer nDB.Unlock()
|
defer nDB.Unlock()
|
||||||
|
@ -63,10 +82,28 @@ func (nDB *NetworkDB) purgeSameNode(n *node) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
||||||
n := nDB.checkAndGetNode(nEvent)
|
// Update our local clock if the received messages has newer
|
||||||
|
// time.
|
||||||
|
nDB.networkClock.Witness(nEvent.LTime)
|
||||||
|
|
||||||
|
n := nDB.getNode(nEvent)
|
||||||
if n == nil {
|
if n == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
// If its a node leave event for a manager and this is the only manager we
|
||||||
|
// know of we want the reconnect logic to kick in. In a single manager
|
||||||
|
// cluster manager's gossip can't be bootstrapped unless some other node
|
||||||
|
// connects to it.
|
||||||
|
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
||||||
|
for _, ip := range nDB.bootStrapIP {
|
||||||
|
if ip.Equal(n.Addr) {
|
||||||
|
n.ltime = nEvent.LTime
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n = nDB.checkAndGetNode(nEvent)
|
||||||
|
|
||||||
nDB.purgeSameNode(n)
|
nDB.purgeSameNode(n)
|
||||||
n.ltime = nEvent.LTime
|
n.ltime = nEvent.LTime
|
||||||
|
@ -76,11 +113,13 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
nDB.nodes[n.Name] = n
|
nDB.nodes[n.Name] = n
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
|
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
|
||||||
return true
|
return true
|
||||||
case NodeEventTypeLeave:
|
case NodeEventTypeLeave:
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
nDB.leftNodes[n.Name] = n
|
nDB.leftNodes[n.Name] = n
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
|
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
||||||
|
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
|
||||||
e.broadcastNodeEvent(mn.Addr, opCreate)
|
e.broadcastNodeEvent(mn.Addr, opCreate)
|
||||||
e.nDB.Lock()
|
e.nDB.Lock()
|
||||||
// In case the node is rejoining after a failure or leave,
|
// In case the node is rejoining after a failure or leave,
|
||||||
|
@ -37,9 +38,12 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
||||||
|
|
||||||
e.nDB.nodes[mn.Name] = &node{Node: *mn}
|
e.nDB.nodes[mn.Name] = &node{Node: *mn}
|
||||||
e.nDB.Unlock()
|
e.nDB.Unlock()
|
||||||
|
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
||||||
|
var failed bool
|
||||||
|
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
|
||||||
e.broadcastNodeEvent(mn.Addr, opDelete)
|
e.broadcastNodeEvent(mn.Addr, opDelete)
|
||||||
e.nDB.deleteNodeTableEntries(mn.Name)
|
e.nDB.deleteNodeTableEntries(mn.Name)
|
||||||
e.nDB.deleteNetworkEntriesForNode(mn.Name)
|
e.nDB.deleteNetworkEntriesForNode(mn.Name)
|
||||||
|
@ -51,8 +55,13 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
||||||
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
|
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
|
||||||
n.reapTime = nodeReapInterval
|
n.reapTime = nodeReapInterval
|
||||||
e.nDB.failedNodes[mn.Name] = n
|
e.nDB.failedNodes[mn.Name] = n
|
||||||
|
failed = true
|
||||||
}
|
}
|
||||||
e.nDB.Unlock()
|
e.nDB.Unlock()
|
||||||
|
if failed {
|
||||||
|
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
|
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
|
||||||
|
|
|
@ -4,6 +4,7 @@ package networkdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -88,6 +89,10 @@ type NetworkDB struct {
|
||||||
|
|
||||||
// Reference to the memberlist's keyring to add & remove keys
|
// Reference to the memberlist's keyring to add & remove keys
|
||||||
keyring *memberlist.Keyring
|
keyring *memberlist.Keyring
|
||||||
|
|
||||||
|
// bootStrapIP is the list of IPs that can be used to bootstrap
|
||||||
|
// the gossip.
|
||||||
|
bootStrapIP []net.IP
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerInfo represents the peer (gossip cluster) nodes of a network
|
// PeerInfo represents the peer (gossip cluster) nodes of a network
|
||||||
|
@ -194,6 +199,11 @@ func New(c *Config) (*NetworkDB, error) {
|
||||||
// Join joins this NetworkDB instance with a list of peer NetworkDB
|
// Join joins this NetworkDB instance with a list of peer NetworkDB
|
||||||
// instances passed by the caller in the form of addr:port
|
// instances passed by the caller in the form of addr:port
|
||||||
func (nDB *NetworkDB) Join(members []string) error {
|
func (nDB *NetworkDB) Join(members []string) error {
|
||||||
|
nDB.Lock()
|
||||||
|
for _, m := range members {
|
||||||
|
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
|
||||||
|
}
|
||||||
|
nDB.Unlock()
|
||||||
return nDB.clusterJoin(members)
|
return nDB.clusterJoin(members)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue