|
@@ -108,6 +108,11 @@ type PeerInfo struct {
|
|
|
IP string
|
|
|
}
|
|
|
|
|
|
+// PeerClusterInfo represents the peer (gossip cluster) nodes
|
|
|
+type PeerClusterInfo struct {
|
|
|
+ PeerInfo
|
|
|
+}
|
|
|
+
|
|
|
type node struct {
|
|
|
memberlist.Node
|
|
|
ltime serf.LamportTime
|
|
@@ -253,6 +258,20 @@ func (nDB *NetworkDB) Close() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// ClusterPeers returns all the gossip cluster peers.
|
|
|
+func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
|
|
|
+ nDB.RLock()
|
|
|
+ defer nDB.RUnlock()
|
|
|
+ peers := make([]PeerInfo, 0, len(nDB.nodes))
|
|
|
+ for _, node := range nDB.nodes {
|
|
|
+ peers = append(peers, PeerInfo{
|
|
|
+ Name: node.Name,
|
|
|
+ IP: node.Node.Addr.String(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ return peers
|
|
|
+}
|
|
|
+
|
|
|
// Peers returns the gossip peers for a given network.
|
|
|
func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
|
|
|
nDB.RLock()
|
|
@@ -399,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
|
|
|
}
|
|
|
|
|
|
func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
|
|
|
- nDB.Lock()
|
|
|
for nid, nodes := range nDB.networkNodes {
|
|
|
updatedNodes := make([]string, 0, len(nodes))
|
|
|
for _, node := range nodes {
|
|
@@ -414,11 +432,25 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
|
|
|
}
|
|
|
|
|
|
delete(nDB.networks, deletedNode)
|
|
|
- nDB.Unlock()
|
|
|
}
|
|
|
|
|
|
+// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
|
|
|
+// 1) when a notification is coming of a node leaving the network
|
|
|
+// - Walk all the network entries and mark the leaving node's entries for deletion
|
|
|
+// These will be garbage collected when the reap timer will expire
|
|
|
+// 2) when the local node is leaving the network
|
|
|
+// - Walk all the network entries:
|
|
|
+// A) if the entry is owned by the local node
|
|
|
+// then we will mark it for deletion. This will ensure that if a node did not
|
|
|
+// yet received the notification that the local node is leaving, will be aware
|
|
|
+// of the entries to be deleted.
|
|
|
+// B) if the entry is owned by a remote node, then we can safely delete it. This
|
|
|
+// ensures that if we join back this network as we receive the CREATE event for
|
|
|
+// entries owned by remote nodes, we will accept them and we notify the application
|
|
|
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
|
|
|
- nDB.Lock()
|
|
|
+ // Indicates if the delete is triggered for the local node
|
|
|
+ isNodeLocal := node == nDB.config.NodeName
|
|
|
+
|
|
|
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
|
|
|
func(path string, v interface{}) bool {
|
|
|
oldEntry := v.(*entry)
|
|
@@ -427,7 +459,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
|
|
|
tname := params[1]
|
|
|
key := params[2]
|
|
|
|
|
|
- if oldEntry.node != node {
|
|
|
+ // If the entry is owned by a remote node and this node is not leaving the network
|
|
|
+ if oldEntry.node != node && !isNodeLocal {
|
|
|
+ // Don't do anything because the event is triggered for a node that does not own this entry
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ // If this entry is already marked for deletion and this node is not leaving the network
|
|
|
+ if oldEntry.deleting && !isNodeLocal {
|
|
|
+ // Don't do anything this entry will be already garbage collected using the old reapTime
|
|
|
return false
|
|
|
}
|
|
|
|
|
@@ -439,17 +479,29 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
|
|
|
reapTime: reapInterval,
|
|
|
}
|
|
|
|
|
|
- nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
|
|
- nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
|
|
+ // we arrived at this point in 2 cases:
|
|
|
+ // 1) this entry is owned by the node that is leaving the network
|
|
|
+ // 2) the local node is leaving the network
|
|
|
+ if oldEntry.node == node {
|
|
|
+ if isNodeLocal {
|
|
|
+ // TODO fcrisciani: this can be removed if there is no way to leave the network
|
|
|
+ // without doing a delete of all the objects
|
|
|
+ entry.ltime++
|
|
|
+ }
|
|
|
+ nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
|
|
+ nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
|
|
+ } else {
|
|
|
+ // the local node is leaving the network, all the entries of remote nodes can be safely removed
|
|
|
+ nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
|
|
|
+ nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
|
|
|
+ }
|
|
|
|
|
|
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
|
|
|
return false
|
|
|
})
|
|
|
- nDB.Unlock()
|
|
|
}
|
|
|
|
|
|
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
|
|
|
- nDB.Lock()
|
|
|
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
|
|
|
oldEntry := v.(*entry)
|
|
|
if oldEntry.node != node {
|
|
@@ -461,27 +513,18 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
|
|
|
nid := params[1]
|
|
|
key := params[2]
|
|
|
|
|
|
- entry := &entry{
|
|
|
- ltime: oldEntry.ltime,
|
|
|
- node: node,
|
|
|
- value: oldEntry.value,
|
|
|
- deleting: true,
|
|
|
- reapTime: reapInterval,
|
|
|
- }
|
|
|
+ nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
|
|
|
+ nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
|
|
|
|
|
|
- nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
|
|
- nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
|
|
-
|
|
|
- nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
|
|
|
+ nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
|
|
|
return false
|
|
|
})
|
|
|
- nDB.Unlock()
|
|
|
}
|
|
|
|
|
|
// WalkTable walks a single table in NetworkDB and invokes the passed
|
|
|
// function for each entry in the table passing the network, key,
|
|
|
// value. The walk stops if the passed function returns a true.
|
|
|
-func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
|
|
|
+func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
|
|
|
nDB.RLock()
|
|
|
values := make(map[string]interface{})
|
|
|
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
|
|
@@ -494,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo
|
|
|
params := strings.Split(k[1:], "/")
|
|
|
nid := params[1]
|
|
|
key := params[2]
|
|
|
- if fn(nid, key, v.(*entry).value) {
|
|
|
+ if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
@@ -554,37 +597,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
|
|
|
|
|
|
nDB.Lock()
|
|
|
defer nDB.Unlock()
|
|
|
- var (
|
|
|
- paths []string
|
|
|
- entries []*entry
|
|
|
- )
|
|
|
|
|
|
+ // Remove myself from the list of the nodes participating to the network
|
|
|
nDB.deleteNetworkNode(nid, nDB.config.NodeName)
|
|
|
|
|
|
- nwWalker := func(path string, v interface{}) bool {
|
|
|
- entry, ok := v.(*entry)
|
|
|
- if !ok {
|
|
|
- return false
|
|
|
- }
|
|
|
- paths = append(paths, path)
|
|
|
- entries = append(entries, entry)
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
|
|
|
- for _, path := range paths {
|
|
|
- params := strings.Split(path[1:], "/")
|
|
|
- tname := params[1]
|
|
|
- key := params[2]
|
|
|
-
|
|
|
- if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
|
|
|
- logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
|
|
|
- }
|
|
|
-
|
|
|
- if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
|
|
|
- logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
|
|
|
- }
|
|
|
- }
|
|
|
+ // Update all the local entries marking them for deletion and delete all the remote entries
|
|
|
+ nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName)
|
|
|
|
|
|
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
|
|
|
if !ok {
|
|
@@ -597,6 +615,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
|
|
|
}
|
|
|
|
|
|
n.ltime = ltime
|
|
|
+ n.reapTime = reapInterval
|
|
|
n.leaving = true
|
|
|
return nil
|
|
|
}
|
|
@@ -660,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
|
|
|
n.ltime = ltime
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-func (nDB *NetworkDB) updateLocalTableTime() {
|
|
|
- nDB.Lock()
|
|
|
- defer nDB.Unlock()
|
|
|
-
|
|
|
- ltime := nDB.tableClock.Increment()
|
|
|
- nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
|
|
|
- entry := v.(*entry)
|
|
|
- if entry.node != nDB.config.NodeName {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- params := strings.Split(path[1:], "/")
|
|
|
- tname := params[0]
|
|
|
- nid := params[1]
|
|
|
- key := params[2]
|
|
|
- entry.ltime = ltime
|
|
|
-
|
|
|
- nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
|
|
|
- nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
|
|
-
|
|
|
- return false
|
|
|
- })
|
|
|
-}
|