optimize the rebroadcast for failure case
Before when a node was failing, all the nodes would bump the lamport time of all their entries. This means that if a node flap, there will be a storm of update of all the entries. This commit on the base of the previous logic guarantees that only the node that joins back will readvertise its own entries, the other nodes won't need to advertise again. Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
parent
a3ecb8902a
commit
d6440c9139
3 changed files with 8 additions and 49 deletions
|
@ -290,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() {
|
|||
return
|
||||
}
|
||||
|
||||
// Update all the local table state to a new time to
|
||||
// force update on the node we are trying to rejoin, just in
|
||||
// case that node has these in deleting state still. This is
|
||||
// facilitate fast convergence after recovering from a gossip
|
||||
// failure.
|
||||
nDB.updateLocalTableTime()
|
||||
|
||||
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
|
||||
nDB.bulkSync([]string{node.Name}, true)
|
||||
}
|
||||
|
|
|
@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
|||
var failed bool
|
||||
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
|
||||
e.broadcastNodeEvent(mn.Addr, opDelete)
|
||||
e.nDB.deleteNodeTableEntries(mn.Name)
|
||||
e.nDB.deleteNetworkEntriesForNode(mn.Name)
|
||||
// The node left or failed, delete all the entries created by it.
|
||||
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
|
||||
// If the node instead left because was going down, then it makes sense to just delete all its state
|
||||
e.nDB.Lock()
|
||||
e.nDB.deleteNetworkEntriesForNode(mn.Name)
|
||||
e.nDB.deleteNodeTableEntries(mn.Name)
|
||||
if n, ok := e.nDB.nodes[mn.Name]; ok {
|
||||
delete(e.nDB.nodes, mn.Name)
|
||||
|
||||
|
@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
|||
if failed {
|
||||
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
|
||||
|
|
|
@ -418,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
|
||||
nDB.Lock()
|
||||
for nid, nodes := range nDB.networkNodes {
|
||||
updatedNodes := make([]string, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
|
@ -433,7 +432,6 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
|
|||
}
|
||||
|
||||
delete(nDB.networks, deletedNode)
|
||||
nDB.Unlock()
|
||||
}
|
||||
|
||||
// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
|
||||
|
@ -504,7 +502,6 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
|
||||
nDB.Lock()
|
||||
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
|
||||
oldEntry := v.(*entry)
|
||||
if oldEntry.node != node {
|
||||
|
@ -516,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
|
|||
nid := params[1]
|
||||
key := params[2]
|
||||
|
||||
entry := &entry{
|
||||
ltime: oldEntry.ltime,
|
||||
node: node,
|
||||
value: oldEntry.value,
|
||||
deleting: true,
|
||||
reapTime: reapInterval,
|
||||
}
|
||||
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
|
||||
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
|
||||
|
||||
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
||||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
||||
|
||||
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
|
||||
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
|
||||
return false
|
||||
})
|
||||
nDB.Unlock()
|
||||
}
|
||||
|
||||
// WalkTable walks a single table in NetworkDB and invokes the passed
|
||||
|
@ -691,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
|
|||
n.ltime = ltime
|
||||
}
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) updateLocalTableTime() {
|
||||
nDB.Lock()
|
||||
defer nDB.Unlock()
|
||||
|
||||
ltime := nDB.tableClock.Increment()
|
||||
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
|
||||
entry := v.(*entry)
|
||||
if entry.node != nDB.config.NodeName {
|
||||
return false
|
||||
}
|
||||
|
||||
params := strings.Split(path[1:], "/")
|
||||
tname := params[0]
|
||||
nid := params[1]
|
||||
key := params[2]
|
||||
entry.ltime = ltime
|
||||
|
||||
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
||||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue