From a3ecb8902ab29fb99b10d5633ec12a035a8251bb Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Wed, 12 Jul 2017 07:47:36 -0700 Subject: [PATCH] fix join/leave join/leave fixes: - when a node leaves the network will deletes all the other nodes entries but will keep track of its to make sure that other nodes if they are tcp syncing will be aware of them being deleted. (a node that did not yet receive the network leave will potentially tcp/sync) add network reapTime, was not being set locally Signed-off-by: Flavio Crisciani --- libnetwork/networkdb/delegate.go | 35 +++++++------- libnetwork/networkdb/networkdb.go | 78 ++++++++++++++++++------------- 2 files changed, 63 insertions(+), 50 deletions(-) diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index fe94fee939..c48c779558 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -133,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { - var flushEntries bool // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() - defer func() { - nDB.Unlock() - // When a node leaves a network on the last task removal cleanup the - // local entries for this network & node combination. When the tasks - // on a network are removed we could have missed the gossip updates. - // Not doing this cleanup can leave stale entries because bulksyncs - // from the node will no longer include this network state. - // - // deleteNodeNetworkEntries takes nDB lock. - if flushEntries { - nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) - } - }() + defer nDB.Unlock() if nEvent.NodeName == nDB.config.NodeName { return false @@ -179,7 +166,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { n.reapTime = reapInterval - flushEntries = true + + // The remote node is leaving the network, but not the gossip cluster. + // Mark all its entries in deleted state, this will guarantee that + // if some node bulk sync with us, the deleted state of + // these entries will be propagated. + nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) } if nEvent.Type == NetworkEventTypeLeave { @@ -214,9 +206,18 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.RLock() networks := nDB.networks[nDB.config.NodeName] network, ok := networks[tEvent.NetworkID] + // Check if the owner of the event is still part of the network + nodes := nDB.networkNodes[tEvent.NetworkID] + var nodePresent bool + for _, node := range nodes { + if node == tEvent.NodeName { + nodePresent = true + break + } + } nDB.RUnlock() - if !ok || network.leaving { - // I'm out of the network so do not propagate + if !ok || network.leaving || !nodePresent { + // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index ec9a8db51c..a7a12eda89 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -436,8 +436,23 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { nDB.Unlock() } +// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes: +// 1) when a notification is coming of a node leaving the network +// - Walk all the network entries and mark the leaving node's entries for deletion +// These will be garbage collected when the reap timer will expire +// 2) when the local node is leaving the network +// - Walk all the network entries: +// A) if the entry is owned by the local node +// then we will mark it for deletion. This will ensure that if a node did not +// yet received the notification that the local node is leaving, will be aware +// of the entries to be deleted. +// B) if the entry is owned by a remote node, then we can safely delete it. This +// ensures that if we join back this network as we receive the CREATE event for +// entries owned by remote nodes, we will accept them and we notify the application func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { - nDB.Lock() + // Indicates if the delete is triggered for the local node + isNodeLocal := node == nDB.config.NodeName + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { oldEntry := v.(*entry) @@ -446,7 +461,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { tname := params[1] key := params[2] - if oldEntry.node != node { + // If the entry is owned by a remote node and this node is not leaving the network + if oldEntry.node != node && !isNodeLocal { + // Don't do anything because the event is triggered for a node that does not own this entry + return false + } + + // If this entry is already marked for deletion and this node is not leaving the network + if oldEntry.deleting && !isNodeLocal { + // Don't do anything this entry will be already garbage collected using the old reapTime return false } @@ -458,13 +481,26 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { reapTime: reapInterval, } - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + // we arrived at this point in 2 cases: + // 1) this entry is owned by the node that is leaving the network + // 2) the local node is leaving the network + if oldEntry.node == node { + if isNodeLocal { + // TODO fcrisciani: this can be removed if there is no way to leave the network + // without doing a delete of all the objects + entry.ltime++ + } + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + } else { + // the local node is leaving the network, all the entries of remote nodes can be safely removed + nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + } nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) return false }) - nDB.Unlock() } func (nDB *NetworkDB) deleteNodeTableEntries(node string) { @@ -573,37 +609,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { nDB.Lock() defer nDB.Unlock() - var ( - paths []string - entries []*entry - ) + // Remove myself from the list of the nodes participating to the network nDB.deleteNetworkNode(nid, nDB.config.NodeName) - nwWalker := func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - paths = append(paths, path) - entries = append(entries, entry) - return false - } - - nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker) - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[1] - key := params[2] - - if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - - if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } - } + // Update all the local entries marking them for deletion and delete all the remote entries + nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName) nodeNetworks, ok := nDB.networks[nDB.config.NodeName] if !ok { @@ -616,6 +627,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { } n.ltime = ltime + n.reapTime = reapInterval n.leaving = true return nil }