Forráskód Böngészése

Merge pull request #1476 from sanimej/time

Use monotonic clock source to reap networkDB entries
Alessandro Boch 8 éve
szülő
commit
6b74a8d479

+ 17 - 8
libnetwork/networkdb/cluster.go

@@ -270,19 +270,27 @@ func (nDB *NetworkDB) reconnectNode() {
 	nDB.bulkSync([]string{node.Name}, true)
 	nDB.bulkSync([]string{node.Name}, true)
 }
 }
 
 
+// For timing the entry deletion in the repaer APIs that doesn't use monotonic clock
+// source (time.Now, Sub etc.) should be avoided. Hence we use reapTime in every
+// entry which is set initially to reapInterval and decremented by reapPeriod every time
+// the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
+// is safe as long as no other concurrent path touches the reapTime field.
 func (nDB *NetworkDB) reapState() {
 func (nDB *NetworkDB) reapState() {
 	nDB.reapNetworks()
 	nDB.reapNetworks()
 	nDB.reapTableEntries()
 	nDB.reapTableEntries()
 }
 }
 
 
 func (nDB *NetworkDB) reapNetworks() {
 func (nDB *NetworkDB) reapNetworks() {
-	now := time.Now()
 	nDB.Lock()
 	nDB.Lock()
 	for name, nn := range nDB.networks {
 	for name, nn := range nDB.networks {
 		for id, n := range nn {
 		for id, n := range nn {
-			if n.leaving && now.Sub(n.leaveTime) > reapInterval {
-				delete(nn, id)
-				nDB.deleteNetworkNode(id, name)
+			if n.leaving {
+				if n.reapTime <= 0 {
+					delete(nn, id)
+					nDB.deleteNetworkNode(id, name)
+					continue
+				}
+				n.reapTime -= reapPeriod
 			}
 			}
 		}
 		}
 	}
 	}
@@ -292,8 +300,6 @@ func (nDB *NetworkDB) reapNetworks() {
 func (nDB *NetworkDB) reapTableEntries() {
 func (nDB *NetworkDB) reapTableEntries() {
 	var paths []string
 	var paths []string
 
 
-	now := time.Now()
-
 	nDB.RLock()
 	nDB.RLock()
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
 		entry, ok := v.(*entry)
 		entry, ok := v.(*entry)
@@ -301,10 +307,13 @@ func (nDB *NetworkDB) reapTableEntries() {
 			return false
 			return false
 		}
 		}
 
 
-		if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
+		if !entry.deleting {
+			return false
+		}
+		if entry.reapTime > 0 {
+			entry.reapTime -= reapPeriod
 			return false
 			return false
 		}
 		}
-
 		paths = append(paths, path)
 		paths = append(paths, path)
 		return false
 		return false
 	})
 	})

+ 2 - 3
libnetwork/networkdb/delegate.go

@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"fmt"
 	"net"
 	"net"
 	"strings"
 	"strings"
-	"time"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/gogo/protobuf/proto"
 	"github.com/gogo/protobuf/proto"
@@ -121,7 +120,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
 		n.ltime = nEvent.LTime
 		n.ltime = nEvent.LTime
 		n.leaving = nEvent.Type == NetworkEventTypeLeave
 		n.leaving = nEvent.Type == NetworkEventTypeLeave
 		if n.leaving {
 		if n.leaving {
-			n.leaveTime = time.Now()
+			n.reapTime = reapInterval
 		}
 		}
 
 
 		nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
 		nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
@@ -178,7 +177,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 	}
 	}
 
 
 	if e.deleting {
 	if e.deleting {
-		e.deleteTime = time.Now()
+		e.reapTime = reapInterval
 	}
 	}
 
 
 	nDB.Lock()
 	nDB.Lock()

+ 16 - 14
libnetwork/networkdb/networkdb.go

@@ -107,8 +107,9 @@ type network struct {
 	// Node leave is in progress.
 	// Node leave is in progress.
 	leaving bool
 	leaving bool
 
 
-	// The time this node knew about the node's network leave.
-	leaveTime time.Time
+	// Number of seconds still left before a deleted network entry gets
+	// removed from networkDB
+	reapTime time.Duration
 
 
 	// The broadcast queue for table event gossip. This is only
 	// The broadcast queue for table event gossip. This is only
 	// initialized for this node's network attachment entries.
 	// initialized for this node's network attachment entries.
@@ -153,8 +154,9 @@ type entry struct {
 	// the cluster for certain amount of time after deletion.
 	// the cluster for certain amount of time after deletion.
 	deleting bool
 	deleting bool
 
 
-	// The wall clock time when this node learned about this deletion.
-	deleteTime time.Time
+	// Number of seconds still left before a deleted table entry gets
+	// removed from networkDB
+	reapTime time.Duration
 }
 }
 
 
 // New creates a new instance of NetworkDB using the Config passed by
 // New creates a new instance of NetworkDB using the Config passed by
@@ -286,11 +288,11 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
 	}
 	}
 
 
 	entry := &entry{
 	entry := &entry{
-		ltime:      nDB.tableClock.Increment(),
-		node:       nDB.config.NodeName,
-		value:      value,
-		deleting:   true,
-		deleteTime: time.Now(),
+		ltime:    nDB.tableClock.Increment(),
+		node:     nDB.config.NodeName,
+		value:    value,
+		deleting: true,
+		reapTime: reapInterval,
 	}
 	}
 
 
 	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
 	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
@@ -339,11 +341,11 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
 		key := params[2]
 		key := params[2]
 
 
 		entry := &entry{
 		entry := &entry{
-			ltime:      oldEntry.ltime,
-			node:       node,
-			value:      oldEntry.value,
-			deleting:   true,
-			deleteTime: time.Now(),
+			ltime:    oldEntry.ltime,
+			node:     node,
+			value:    oldEntry.value,
+			deleting: true,
+			reapTime: reapInterval,
 		}
 		}
 
 
 		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
 		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)