瀏覽代碼

Merge pull request #1403 from mrjana/networkdb

Ignore delete events for non-existent entries
Santhosh Manohar 9 年之前
父節點
當前提交
6f31dee9d1
共有 3 個文件被更改,包括 17 次插入16 次删除
  1. 2 9
      libnetwork/networkdb/cluster.go
  2. 13 7
      libnetwork/networkdb/delegate.go
  3. 2 0
      libnetwork/networkdb/networkdb.go

+ 2 - 9
libnetwork/networkdb/cluster.go

@@ -200,10 +200,7 @@ func (nDB *NetworkDB) reapNetworks() {
 }
 }
 
 
 func (nDB *NetworkDB) reapTableEntries() {
 func (nDB *NetworkDB) reapTableEntries() {
-	var (
-		paths   []string
-		entries []*entry
-	)
+	var paths []string
 
 
 	now := time.Now()
 	now := time.Now()
 
 
@@ -219,14 +216,12 @@ func (nDB *NetworkDB) reapTableEntries() {
 		}
 		}
 
 
 		paths = append(paths, path)
 		paths = append(paths, path)
-		entries = append(entries, entry)
 		return false
 		return false
 	})
 	})
 	nDB.RUnlock()
 	nDB.RUnlock()
 
 
 	nDB.Lock()
 	nDB.Lock()
-	for i, path := range paths {
-		entry := entries[i]
+	for _, path := range paths {
 		params := strings.Split(path[1:], "/")
 		params := strings.Split(path[1:], "/")
 		tname := params[0]
 		tname := params[0]
 		nid := params[1]
 		nid := params[1]
@@ -239,8 +234,6 @@ func (nDB *NetworkDB) reapTableEntries() {
 		if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
 		if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
 			logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
 			logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
 		}
 		}
-
-		nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
 	}
 	}
 	nDB.Unlock()
 	nDB.Unlock()
 }
 }

+ 13 - 7
libnetwork/networkdb/delegate.go

@@ -84,28 +84,34 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
 		return true
 		return true
 	}
 	}
 
 
-	if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil {
+	e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
+	if err != nil && tEvent.Type == TableEventTypeDelete {
+		// If it is a delete event and we don't have the entry here nothing to do.
+		return false
+	}
+
+	if err == nil {
 		// We have the latest state. Ignore the event
 		// We have the latest state. Ignore the event
 		// since it is stale.
 		// since it is stale.
-		if entry.ltime >= tEvent.LTime {
+		if e.ltime >= tEvent.LTime {
 			return false
 			return false
 		}
 		}
 	}
 	}
 
 
-	entry := &entry{
+	e = &entry{
 		ltime:    tEvent.LTime,
 		ltime:    tEvent.LTime,
 		node:     tEvent.NodeName,
 		node:     tEvent.NodeName,
 		value:    tEvent.Value,
 		value:    tEvent.Value,
 		deleting: tEvent.Type == TableEventTypeDelete,
 		deleting: tEvent.Type == TableEventTypeDelete,
 	}
 	}
 
 
-	if entry.deleting {
-		entry.deleteTime = time.Now()
+	if e.deleting {
+		e.deleteTime = time.Now()
 	}
 	}
 
 
 	nDB.Lock()
 	nDB.Lock()
-	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry)
-	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry)
+	nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
+	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
 	nDB.Unlock()
 	nDB.Unlock()
 
 
 	var op opType
 	var op opType

+ 2 - 0
libnetwork/networkdb/networkdb.go

@@ -326,6 +326,8 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
 
 
 		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
 		nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
 		nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
 		nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
+
+		nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
 		return false
 		return false
 	})
 	})
 	nDB.Unlock()
 	nDB.Unlock()