Quellcode durchsuchen

:art: 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

Liang Ding vor 2 Jahren
Ursprung
Commit
0a2d095d00
3 geänderte Dateien mit 60 neuen und 6 gelöschten Zeilen
  1. 11 5
      kernel/model/index_fix.go
  2. 37 0
      kernel/sql/database.go
  3. 12 1
      kernel/sql/queue.go

+ 11 - 5
kernel/model/index_fix.go

@@ -69,20 +69,23 @@ func autoFixIndex() {
 	for _, root := range roots {
 	for _, root := range roots {
 		rootMap[root.ID] = root
 		rootMap[root.ID] = root
 	}
 	}
+
+	var toRemoveRootIDs []string
 	var deletes int
 	var deletes int
 	for _, rootID := range duplicatedRootIDs {
 	for _, rootID := range duplicatedRootIDs {
 		root := rootMap[rootID]
 		root := rootMap[rootID]
 		if nil == root {
 		if nil == root {
 			continue
 			continue
 		}
 		}
-
-		//logging.LogWarnf("exist more than one tree [%s], reindex it", rootID)
-		sql.RemoveTreeQueue(root.Box, rootID)
 		deletes++
 		deletes++
+		toRemoveRootIDs = append(toRemoveRootIDs, rootID)
 		if util.IsExiting {
 		if util.IsExiting {
 			break
 			break
 		}
 		}
 	}
 	}
+	toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs)
+	sql.BatchRemoveTreeQueue(toRemoveRootIDs)
+
 	if 0 < deletes {
 	if 0 < deletes {
 		logging.LogWarnf("exist more than one tree duplicated [%d], reindex it", deletes)
 		logging.LogWarnf("exist more than one tree duplicated [%d], reindex it", deletes)
 	}
 	}
@@ -203,17 +206,20 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) {
 	for _, block := range blocks {
 	for _, block := range blocks {
 		roots[block.RootID] = block
 		roots[block.RootID] = block
 	}
 	}
+	var toRemoveRootIDs []string
 	for id, root := range roots {
 	for id, root := range roots {
 		if nil == root {
 		if nil == root {
 			continue
 			continue
 		}
 		}
 
 
-		logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box)
-		sql.RemoveTreeQueue(root.Box, root.ID)
+		toRemoveRootIDs = append(toRemoveRootIDs, id)
 		if util.IsExiting {
 		if util.IsExiting {
 			break
 			break
 		}
 		}
 	}
 	}
+	toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs)
+	//logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box)
+	sql.BatchRemoveTreeQueue(toRemoveRootIDs)
 }
 }
 
 
 func reindexTreeByPath(box, p string, i, size int) {
 func reindexTreeByPath(box, p string, i, size int) {

+ 37 - 0
kernel/sql/database.go

@@ -20,6 +20,7 @@ import (
 	"bytes"
 	"bytes"
 	"database/sql"
 	"database/sql"
 	"errors"
 	"errors"
+	"fmt"
 	"os"
 	"os"
 	"path/filepath"
 	"path/filepath"
 	"regexp"
 	"regexp"
@@ -982,6 +983,42 @@ func deleteByRootID(tx *sql.Tx, rootID string, context map[string]interface{}) (
 	return
 	return
 }
 }
 
 
+func batchDeleteByRootIDs(tx *sql.Tx, rootIDs []string, context map[string]interface{}) (err error) {
+	ids := strings.Join(rootIDs, "','")
+	ids = "('" + ids + "')"
+	stmt := "DELETE FROM blocks WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM blocks_fts WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM blocks_fts_case_insensitive WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM spans WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM assets WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM refs WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	stmt = "DELETE FROM file_annotation_refs WHERE root_id IN " + ids
+	if err = execStmtTx(tx, stmt); nil != err {
+		return
+	}
+	ClearBlockCache()
+	eventbus.Publish(eventbus.EvtSQLDeleteBlocks, context, fmt.Sprintf("%d", len(rootIDs)))
+	return
+}
+
 func batchDeleteByPathPrefix(tx *sql.Tx, boxID, pathPrefix string) (err error) {
 func batchDeleteByPathPrefix(tx *sql.Tx, boxID, pathPrefix string) (err error) {
 	stmt := "DELETE FROM blocks WHERE box = ? AND path LIKE ?"
 	stmt := "DELETE FROM blocks WHERE box = ? AND path LIKE ?"
 	if err = execStmtTx(tx, stmt, boxID, pathPrefix+"%"); nil != err {
 	if err = execStmtTx(tx, stmt, boxID, pathPrefix+"%"); nil != err {

+ 12 - 1
kernel/sql/queue.go

@@ -41,12 +41,13 @@ var (
 
 
 type dbQueueOperation struct {
 type dbQueueOperation struct {
 	inQueueTime time.Time
 	inQueueTime time.Time
-	action      string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index
+	action      string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index/delete_ids
 
 
 	indexPath                     string      // index
 	indexPath                     string      // index
 	upsertTree                    *parse.Tree // upsert/insert_refs
 	upsertTree                    *parse.Tree // upsert/insert_refs
 	removeTreeBox, removeTreePath string      // delete
 	removeTreeBox, removeTreePath string      // delete
 	removeTreeIDBox, removeTreeID string      // delete_id
 	removeTreeIDBox, removeTreeID string      // delete_id
+	removeTreeIDs                 []string    // delete_ids
 	box                           string      // delete_box/delete_box_refs/index
 	box                           string      // delete_box/delete_box_refs/index
 	renameTree                    *parse.Tree // rename
 	renameTree                    *parse.Tree // rename
 	renameTreeOldHPath            string      // rename
 	renameTreeOldHPath            string      // rename
@@ -148,6 +149,8 @@ func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (e
 		err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
 		err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
 	case "delete_id":
 	case "delete_id":
 		err = deleteByRootID(tx, op.removeTreeID, context)
 		err = deleteByRootID(tx, op.removeTreeID, context)
+	case "delete_ids":
+		err = batchDeleteByRootIDs(tx, op.removeTreeIDs, context)
 	case "rename":
 	case "rename":
 		err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
 		err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
 		if nil != err {
 		if nil != err {
@@ -295,6 +298,14 @@ func RemoveTreeQueue(box, rootID string) {
 	operationQueue = append(operationQueue, newOp)
 	operationQueue = append(operationQueue, newOp)
 }
 }
 
 
+func BatchRemoveTreeQueue(rootIDs []string) {
+	dbQueueLock.Lock()
+	defer dbQueueLock.Unlock()
+
+	newOp := &dbQueueOperation{removeTreeIDs: rootIDs, inQueueTime: time.Now(), action: "delete_ids"}
+	operationQueue = append(operationQueue, newOp)
+}
+
 func RemoveTreePathQueue(treeBox, treePathPrefix string) {
 func RemoveTreePathQueue(treeBox, treePathPrefix string) {
 	dbQueueLock.Lock()
 	dbQueueLock.Lock()
 	defer dbQueueLock.Unlock()
 	defer dbQueueLock.Unlock()