From d6440c91394a0151a4db0325c728b9f894050a62 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Mon, 17 Jul 2017 08:36:43 -0700 Subject: [PATCH] optimize the rebroadcast for failure case Before when a node was failing, all the nodes would bump the lamport time of all their entries. This means that if a node flap, there will be a storm of update of all the entries. This commit on the base of the previous logic guarantees that only the node that joins back will readvertise its own entries, the other nodes won't need to advertise again. Signed-off-by: Flavio Crisciani --- libnetwork/networkdb/cluster.go | 7 ----- libnetwork/networkdb/event_delegate.go | 8 +++-- libnetwork/networkdb/networkdb.go | 42 ++------------------------ 3 files changed, 8 insertions(+), 49 deletions(-) diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index e011a5336e..ba9d91826f 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -290,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() { return } - // Update all the local table state to a new time to - // force update on the node we are trying to rejoin, just in - // case that node has these in deleting state still. This is - // facilitate fast convergence after recovering from a gossip - // failure. - nDB.updateLocalTableTime() - logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) nDB.bulkSync([]string{node.Name}, true) } diff --git a/libnetwork/networkdb/event_delegate.go b/libnetwork/networkdb/event_delegate.go index 23e16832e7..0f32194cf3 100644 --- a/libnetwork/networkdb/event_delegate.go +++ b/libnetwork/networkdb/event_delegate.go @@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { var failed bool logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opDelete) - e.nDB.deleteNodeTableEntries(mn.Name) - e.nDB.deleteNetworkEntriesForNode(mn.Name) + // The node left or failed, delete all the entries created by it. + // If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted + // If the node instead left because was going down, then it makes sense to just delete all its state e.nDB.Lock() + e.nDB.deleteNetworkEntriesForNode(mn.Name) + e.nDB.deleteNodeTableEntries(mn.Name) if n, ok := e.nDB.nodes[mn.Name]; ok { delete(e.nDB.nodes, mn.Name) @@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { if failed { logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) } - } func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) { diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index a7a12eda89..6447a16357 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -418,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { } func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { - nDB.Lock() for nid, nodes := range nDB.networkNodes { updatedNodes := make([]string, 0, len(nodes)) for _, node := range nodes { @@ -433,7 +432,6 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { } delete(nDB.networks, deletedNode) - nDB.Unlock() } // deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes: @@ -504,7 +502,6 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { } func (nDB *NetworkDB) deleteNodeTableEntries(node string) { - nDB.Lock() nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { oldEntry := v.(*entry) if oldEntry.node != node { @@ -516,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nid := params[1] key := params[2] - entry := &entry{ - ltime: oldEntry.ltime, - node: node, - value: oldEntry.value, - deleting: true, - reapTime: reapInterval, - } + nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) - - nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) + nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value)) return false }) - nDB.Unlock() } // WalkTable walks a single table in NetworkDB and invokes the passed @@ -691,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { n.ltime = ltime } } - -func (nDB *NetworkDB) updateLocalTableTime() { - nDB.Lock() - defer nDB.Unlock() - - ltime := nDB.tableClock.Increment() - nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { - entry := v.(*entry) - if entry.node != nDB.config.NodeName { - return false - } - - params := strings.Split(path[1:], "/") - tname := params[0] - nid := params[1] - key := params[2] - entry.ltime = ltime - - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) - - return false - }) -}