Procházet zdrojové kódy

:art: 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

Liang Ding před 2 roky
rodič
revize
9b28cdfdf1
3 změnil soubory, kde provedl 29 přidání a 19 odebrání
  1. 1 1
      kernel/sql/queue.go
  2. 15 8
      kernel/task/queue.go
  3. 13 10
      kernel/treenode/blocktree.go

+ 1 - 1
kernel/sql/queue.go

@@ -54,7 +54,7 @@ type dbQueueOperation struct {
 func AutoFlushTx() {
 	for {
 		time.Sleep(util.SQLFlushInterval)
-		task.PrependTask(task.DatabaseIndexCommit, FlushQueue)
+		task.AppendTask(task.DatabaseIndexCommit, FlushQueue)
 	}
 }
 

+ 15 - 8
kernel/task/queue.go

@@ -52,6 +52,7 @@ func PrependTask(action string, handler interface{}, args ...interface{}) {
 		return
 	}
 
+	cancelTask(action, args...)
 	taskQueue = append([]*Task{newTask(action, handler, args...)}, taskQueue...)
 }
 
@@ -64,20 +65,26 @@ func AppendTask(action string, handler interface{}, args ...interface{}) {
 		return
 	}
 
+	cancelTask(action, args...)
 	taskQueue = append(taskQueue, newTask(action, handler, args...))
 }
 
-func CancelTask(actions ...string) {
-	queueLock.Lock()
-	defer queueLock.Unlock()
-
+func cancelTask(action string, args ...interface{}) {
 	for i := len(taskQueue) - 1; i >= 0; i-- {
 		task := taskQueue[i]
-		for _, action := range actions {
-			if action == task.Action {
-				taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
-				break
+		if action == task.Action {
+			if len(task.Args) != len(args) {
+				continue
 			}
+
+			for j, arg := range args {
+				if arg != task.Args[j] {
+					continue
+				}
+			}
+
+			taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
+			break
 		}
 	}
 }

+ 13 - 10
kernel/treenode/blocktree.go

@@ -35,7 +35,7 @@ import (
 
 var blockTrees = map[string]*BlockTree{}
 var blockTreesLock = sync.Mutex{}
-var blockTreesChanged = false
+var blockTreesChanged = time.Time{}
 
 type BlockTree struct {
 	ID       string // 块 ID
@@ -209,7 +209,7 @@ func SetBlockTreePath(tree *parse.Tree) {
 			b.BoxID, b.Path, b.HPath, b.Updated = tree.Box, tree.Path, tree.HPath, tree.Root.IALAttr("updated")
 		}
 	}
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func RemoveBlockTreesByRootID(rootID string) {
@@ -226,7 +226,7 @@ func RemoveBlockTreesByRootID(rootID string) {
 	for _, id := range ids {
 		delete(blockTrees, id)
 	}
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func RemoveBlockTreesByPath(path string) {
@@ -243,7 +243,7 @@ func RemoveBlockTreesByPath(path string) {
 	for _, id := range ids {
 		delete(blockTrees, id)
 	}
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func RemoveBlockTreesByPathPrefix(pathPrefix string) {
@@ -260,7 +260,7 @@ func RemoveBlockTreesByPathPrefix(pathPrefix string) {
 	for _, id := range ids {
 		delete(blockTrees, id)
 	}
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func RemoveBlockTreesByBoxID(boxID string) (ids []string) {
@@ -276,7 +276,7 @@ func RemoveBlockTreesByBoxID(boxID string) (ids []string) {
 	for _, id := range ids {
 		delete(blockTrees, id)
 	}
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 	return
 }
 
@@ -285,7 +285,7 @@ func RemoveBlockTree(id string) {
 	defer blockTreesLock.Unlock()
 
 	delete(blockTrees, id)
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func ReindexBlockTree(tree *parse.Tree) {
@@ -306,7 +306,7 @@ func ReindexBlockTree(tree *parse.Tree) {
 		blockTrees[n.ID] = &BlockTree{ID: n.ID, ParentID: parentID, RootID: tree.ID, BoxID: tree.Box, Path: tree.Path, HPath: tree.HPath, Updated: tree.Root.IALAttr("updated")}
 		return ast.WalkContinue
 	})
-	blockTreesChanged = true
+	blockTreesChanged = time.Now()
 }
 
 func AutoFlushBlockTree() {
@@ -361,11 +361,14 @@ func InitBlockTree(force bool) {
 }
 
 func SaveBlockTree(force bool) {
-	if !force && !blockTreesChanged {
+	if !force && blockTreesChanged.IsZero() {
 		return
 	}
 
 	start := time.Now()
+	if blockTreesChanged.Before(start.Add(7 * time.Second)) {
+		return
+	}
 
 	blockTreesLock.Lock()
 	data, err := msgpack.Marshal(blockTrees)
@@ -387,5 +390,5 @@ func SaveBlockTree(force bool) {
 		logging.LogWarnf("save block tree [size=%s] to [%s], elapsed [%.2fs]", humanize.Bytes(uint64(len(data))), util.BlockTreePath, elapsed)
 	}
 
-	blockTreesChanged = false
+	blockTreesChanged = time.Time{}
 }