diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index fe94fee939..c48c779558 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -133,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { - var flushEntries bool // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() - defer func() { - nDB.Unlock() - // When a node leaves a network on the last task removal cleanup the - // local entries for this network & node combination. When the tasks - // on a network are removed we could have missed the gossip updates. - // Not doing this cleanup can leave stale entries because bulksyncs - // from the node will no longer include this network state. - // - // deleteNodeNetworkEntries takes nDB lock. - if flushEntries { - nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) - } - }() + defer nDB.Unlock() if nEvent.NodeName == nDB.config.NodeName { return false @@ -179,7 +166,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { n.reapTime = reapInterval - flushEntries = true + + // The remote node is leaving the network, but not the gossip cluster. + // Mark all its entries in deleted state, this will guarantee that + // if some node bulk sync with us, the deleted state of + // these entries will be propagated. + nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) } if nEvent.Type == NetworkEventTypeLeave { @@ -214,9 +206,18 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.RLock() networks := nDB.networks[nDB.config.NodeName] network, ok := networks[tEvent.NetworkID] + // Check if the owner of the event is still part of the network + nodes := nDB.networkNodes[tEvent.NetworkID] + var nodePresent bool + for _, node := range nodes { + if node == tEvent.NodeName { + nodePresent = true + break + } + } nDB.RUnlock() - if !ok || network.leaving { - // I'm out of the network so do not propagate + if !ok || network.leaving || !nodePresent { + // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index ec9a8db51c..a7a12eda89 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -436,8 +436,23 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { nDB.Unlock() } +// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes: +// 1) when a notification is coming of a node leaving the network +// - Walk all the network entries and mark the leaving node's entries for deletion +// These will be garbage collected when the reap timer will expire +// 2) when the local node is leaving the network +// - Walk all the network entries: +// A) if the entry is owned by the local node +// then we will mark it for deletion. This will ensure that if a node did not +// yet received the notification that the local node is leaving, will be aware +// of the entries to be deleted. +// B) if the entry is owned by a remote node, then we can safely delete it. This +// ensures that if we join back this network as we receive the CREATE event for +// entries owned by remote nodes, we will accept them and we notify the application func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { - nDB.Lock() + // Indicates if the delete is triggered for the local node + isNodeLocal := node == nDB.config.NodeName + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { oldEntry := v.(*entry) @@ -446,7 +461,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { tname := params[1] key := params[2] - if oldEntry.node != node { + // If the entry is owned by a remote node and this node is not leaving the network + if oldEntry.node != node && !isNodeLocal { + // Don't do anything because the event is triggered for a node that does not own this entry + return false + } + + // If this entry is already marked for deletion and this node is not leaving the network + if oldEntry.deleting && !isNodeLocal { + // Don't do anything this entry will be already garbage collected using the old reapTime return false } @@ -458,13 +481,26 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { reapTime: reapInterval, } - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + // we arrived at this point in 2 cases: + // 1) this entry is owned by the node that is leaving the network + // 2) the local node is leaving the network + if oldEntry.node == node { + if isNodeLocal { + // TODO fcrisciani: this can be removed if there is no way to leave the network + // without doing a delete of all the objects + entry.ltime++ + } + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + } else { + // the local node is leaving the network, all the entries of remote nodes can be safely removed + nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + } nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) return false }) - nDB.Unlock() } func (nDB *NetworkDB) deleteNodeTableEntries(node string) { @@ -573,37 +609,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { nDB.Lock() defer nDB.Unlock() - var ( - paths []string - entries []*entry - ) + // Remove myself from the list of the nodes participating to the network nDB.deleteNetworkNode(nid, nDB.config.NodeName) - nwWalker := func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - paths = append(paths, path) - entries = append(entries, entry) - return false - } - - nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker) - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[1] - key := params[2] - - if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - - if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } - } + // Update all the local entries marking them for deletion and delete all the remote entries + nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName) nodeNetworks, ok := nDB.networks[nDB.config.NodeName] if !ok { @@ -616,6 +627,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { } n.ltime = ltime + n.reapTime = reapInterval n.leaving = true return nil }