Преглед изворни кода

:art: 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

Liang Ding пре 2 година
родитељ
комит
6495574fea
8 измењених фајлова са 60 додато и 55 уклоњено
  1. 2 4
      kernel/model/history.go
  2. 2 2
      kernel/sql/aseet.go
  3. 7 6
      kernel/sql/block.go
  4. 4 4
      kernel/sql/block_ref.go
  5. 17 11
      kernel/sql/database.go
  6. 22 14
      kernel/sql/queue.go
  7. 2 3
      kernel/sql/stat.go
  8. 4 11
      kernel/sql/upsert.go

+ 2 - 4
kernel/model/history.go

@@ -524,11 +524,10 @@ func clearOutdatedHistoryDir(historyDir string) {
 		p := strings.TrimPrefix(dir, util.HistoryDir)
 		p = filepath.ToSlash(p[1:])
 		if txErr = sql.DeleteHistoriesByPathPrefix(tx, dir); nil != txErr {
-			sql.RollbackTx(tx)
 			logging.LogErrorf("delete history [%s] failed: %s", dir, txErr)
 			return
 		}
-		if txErr = sql.CommitTx(tx); nil != txErr {
+		if txErr = sql.CommitHistoryTx(tx); nil != txErr {
 			logging.LogErrorf("commit history tx failed: %s", txErr)
 			return
 		}
@@ -683,10 +682,9 @@ func indexHistoryDir(name string, luteEngine *lute.Lute) {
 	}
 	if err := sql.InsertHistories(tx, histories); nil != err {
 		logging.LogErrorf("insert histories failed: %s", err)
-		sql.RollbackTx(tx)
 		return
 	}
-	if err := sql.CommitTx(tx); nil != err {
+	if err := sql.CommitHistoryTx(tx); nil != err {
 		logging.LogErrorf("commit transaction failed: %s", err)
 		return
 	}

+ 2 - 2
kernel/sql/aseet.go

@@ -94,12 +94,12 @@ func docTitleImgAsset(root *ast.Node) *Asset {
 
 func DeleteAssetsByHashes(hashes []string) {
 	sqlStmt := "DELETE FROM assets WHERE hash IN ('" + strings.Join(hashes, "','") + "') OR hash = ''"
-	tx, err := BeginTx()
+	tx, err := beginTx()
 	if nil != err {
 		return
 	}
 	execStmtTx(tx, sqlStmt)
-	CommitTx(tx)
+	commitTx(tx)
 }
 
 func QueryAssetByHash(hash string) (ret *Asset) {

+ 7 - 6
kernel/sql/block.go

@@ -46,27 +46,28 @@ type Block struct {
 	Updated  string
 }
 
-func updateRootContent(tx *sql.Tx, content, updated, id string) {
+func updateRootContent(tx *sql.Tx, content, updated, id string) (err error) {
 	stmt := "UPDATE blocks SET content = ?, fcontent = ?, updated = ? WHERE id = ?"
-	if err := execStmtTx(tx, stmt, content, content, updated, id); nil != err {
+	if err = execStmtTx(tx, stmt, content, content, updated, id); nil != err {
 		return
 	}
 	stmt = "UPDATE blocks_fts SET content = ?, fcontent = ?, updated = ? WHERE id = ?"
-	if err := execStmtTx(tx, stmt, content, content, updated, id); nil != err {
+	if err = execStmtTx(tx, stmt, content, content, updated, id); nil != err {
 		return
 	}
 	if !caseSensitive {
 		stmt = "UPDATE blocks_fts_case_insensitive SET content = ?, fcontent = ?, updated = ? WHERE id = ?"
-		if err := execStmtTx(tx, stmt, content, content, updated, id); nil != err {
+		if err = execStmtTx(tx, stmt, content, content, updated, id); nil != err {
 			return
 		}
 	}
 	removeBlockCache(id)
 	cache.RemoveBlockIAL(id)
+	return
 }
 
 func UpdateBlockContent(block *Block) {
-	tx, err := BeginTx()
+	tx, err := beginTx()
 	if nil != err {
 		return
 	}
@@ -93,7 +94,7 @@ func UpdateBlockContent(block *Block) {
 }
 
 func DeleteTree(table, rootID string) {
-	tx, err := BeginTx()
+	tx, err := beginTx()
 	if nil != err {
 		return
 	}

+ 4 - 4
kernel/sql/block_ref.go

@@ -37,13 +37,13 @@ type Ref struct {
 	Type             string
 }
 
-func UpsertRefs(tx *sql.Tx, tree *parse.Tree) {
-	if err := deleteRefsByPath(tx, tree.Box, tree.Path); nil != err {
+func upsertRefs(tx *sql.Tx, tree *parse.Tree) (err error) {
+	if err = deleteRefsByPath(tx, tree.Box, tree.Path); nil != err {
 		return
 	}
-	if err := deleteFileAnnotationRefsByPath(tx, tree.Box, tree.Path); nil != err {
+	if err = deleteFileAnnotationRefsByPath(tx, tree.Box, tree.Path); nil != err {
 		return
 	}
-	insertRef(tx, tree)
+	err = insertRefs(tx, tree)
 	return
 }

+ 17 - 11
kernel/sql/database.go

@@ -785,7 +785,7 @@ func DeleteBlockByIDs(tx *sql.Tx, ids []string) (err error) {
 	return deleteBlocksByIDs(tx, ids)
 }
 
-func DeleteByBoxTx(tx *sql.Tx, box string) (err error) {
+func deleteByBoxTx(tx *sql.Tx, box string) (err error) {
 	if err = deleteBlocksByBoxTx(tx, box); nil != err {
 		return
 	}
@@ -798,7 +798,7 @@ func DeleteByBoxTx(tx *sql.Tx, box string) (err error) {
 	if err = deleteAttributesByBoxTx(tx, box); nil != err {
 		return
 	}
-	if err = deleteRefsByBoxTx(tx, box); nil != err {
+	if err = deleteBlockRefsByBoxTx(tx, box); nil != err {
 		return
 	}
 	if err = deleteFileAnnotationRefsByBoxTx(tx, box); nil != err {
@@ -914,14 +914,14 @@ func deleteRefsByPathTx(tx *sql.Tx, box, path string) (err error) {
 	return
 }
 
-func DeleteRefsByBoxTx(tx *sql.Tx, box string) (err error) {
+func deleteRefsByBoxTx(tx *sql.Tx, box string) (err error) {
 	if err = deleteFileAnnotationRefsByBoxTx(tx, box); nil != err {
 		return
 	}
-	return deleteRefsByBoxTx(tx, box)
+	return deleteBlockRefsByBoxTx(tx, box)
 }
 
-func deleteRefsByBoxTx(tx *sql.Tx, box string) (err error) {
+func deleteBlockRefsByBoxTx(tx *sql.Tx, box string) (err error) {
 	stmt := "DELETE FROM refs WHERE box = ?"
 	err = execStmtTx(tx, stmt, box)
 	return
@@ -945,7 +945,7 @@ func deleteFileAnnotationRefsByBoxTx(tx *sql.Tx, box string) (err error) {
 	return
 }
 
-func DeleteByRootID(tx *sql.Tx, rootID string) (err error) {
+func deleteByRootID(tx *sql.Tx, rootID string) (err error) {
 	stmt := "DELETE FROM blocks WHERE root_id = ?"
 	if err = execStmtTx(tx, stmt, rootID); nil != err {
 		return
@@ -1049,7 +1049,7 @@ func query(query string, args ...interface{}) (*sql.Rows, error) {
 	return db.Query(query, args...)
 }
 
-func BeginTx() (tx *sql.Tx, err error) {
+func beginTx() (tx *sql.Tx, err error) {
 	if tx, err = db.Begin(); nil != err {
 		logging.LogErrorf("begin tx failed: %s\n  %s", err, logging.ShortStack())
 		if strings.Contains(err.Error(), "database is locked") {
@@ -1069,7 +1069,7 @@ func BeginHistoryTx() (tx *sql.Tx, err error) {
 	return
 }
 
-func CommitTx(tx *sql.Tx) (err error) {
+func CommitHistoryTx(tx *sql.Tx) (err error) {
 	if nil == tx {
 		logging.LogErrorf("tx is nil")
 		return errors.New("tx is nil")
@@ -1081,10 +1081,16 @@ func CommitTx(tx *sql.Tx) (err error) {
 	return
 }
 
-func RollbackTx(tx *sql.Tx) {
-	if err := tx.Rollback(); nil != err {
-		logging.LogErrorf("rollback tx failed: %s\n  %s", err, logging.ShortStack())
+func commitTx(tx *sql.Tx) (err error) {
+	if nil == tx {
+		logging.LogErrorf("tx is nil")
+		return errors.New("tx is nil")
+	}
+
+	if err = tx.Commit(); nil != err {
+		logging.LogErrorf("commit tx failed: %s\n  %s", err, logging.ShortStack())
 	}
+	return
 }
 
 func prepareExecInsertTx(tx *sql.Tx, stmtSQL string, args []interface{}) (err error) {

+ 22 - 14
kernel/sql/queue.go

@@ -97,7 +97,7 @@ func FlushQueue() {
 	txLock.Lock()
 	defer txLock.Unlock()
 	start := time.Now()
-	tx, err := BeginTx()
+	tx, err := beginTx()
 	if nil != err {
 		return
 	}
@@ -110,30 +110,38 @@ func FlushQueue() {
 
 		switch op.action {
 		case "upsert":
-			tree := op.upsertTree
-			if err = upsertTree(tx, tree, context); nil != err {
-				logging.LogErrorf("upsert tree [%s] into database failed: %s", tree.Box+tree.Path, err)
-			}
+			err = upsertTree(tx, op.upsertTree, context)
 		case "delete":
-			batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
+			err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
 		case "delete_id":
-			DeleteByRootID(tx, op.removeTreeID)
+			err = deleteByRootID(tx, op.removeTreeID)
 		case "rename":
-			batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
-			updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
+			err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
+			if nil != err {
+				break
+			}
+			err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
 		case "delete_box":
-			DeleteByBoxTx(tx, op.box)
+			err = deleteByBoxTx(tx, op.box)
 		case "delete_box_refs":
-			DeleteRefsByBoxTx(tx, op.box)
+			err = deleteRefsByBoxTx(tx, op.box)
 		case "insert_refs":
-			InsertRefs(tx, op.upsertTree)
+			err = insertRefs(tx, op.upsertTree)
 		case "update_refs":
-			UpsertRefs(tx, op.upsertTree)
+			err = upsertRefs(tx, op.upsertTree)
 		default:
 			logging.LogErrorf("unknown operation [%s]", op.action)
 		}
+
+		if nil != err {
+			logging.LogErrorf("queue operation failed: %s", err)
+			break
+		}
+	}
+
+	if err = commitTx(tx); nil != err {
+		logging.LogErrorf("commit tx failed: %s", err)
 	}
-	CommitTx(tx)
 	elapsed := time.Now().Sub(start).Milliseconds()
 	if 5000 < elapsed {
 		logging.LogInfof("op tx [%dms]", elapsed)

+ 2 - 3
kernel/sql/stat.go

@@ -43,15 +43,14 @@ func getDatabaseVer() (ret string) {
 
 func setDatabaseVer() {
 	key := "siyuan_database_ver"
-	tx, err := BeginTx()
+	tx, err := beginTx()
 	if nil != err {
 		return
 	}
 	if err = putStat(tx, key, util.DatabaseVer); nil != err {
-		RollbackTx(tx)
 		return
 	}
-	CommitTx(tx)
+	commitTx(tx)
 }
 
 func putStat(tx *sql.Tx, key, value string) (err error) {

+ 4 - 11
kernel/sql/upsert.go

@@ -26,7 +26,6 @@ import (
 	"github.com/88250/lute/parse"
 	"github.com/emirpasic/gods/sets/hashset"
 	"github.com/siyuan-note/eventbus"
-	"github.com/siyuan-note/logging"
 	"github.com/siyuan-note/siyuan/kernel/util"
 )
 
@@ -36,12 +35,6 @@ func init() {
 	luteEngine.RenderOptions.KramdownBlockIAL = false // 数据库 markdown 字段为标准 md,但是要保留 span block ial
 }
 
-func InsertRefs(tx *sql.Tx, tree *parse.Tree) {
-	if err := insertRef(tx, tree); nil != err {
-		logging.LogErrorf("insert refs tree [%s] into database failed: %s", tree.Box+tree.Path, err)
-	}
-}
-
 const (
 	BlocksInsert                   = "INSERT INTO blocks (id, parent_id, root_id, hash, box, path, hpath, name, alias, memo, tag, content, fcontent, markdown, length, type, subtype, ial, sort, created, updated) VALUES %s"
 	BlocksFTSInsert                = "INSERT INTO blocks_fts (id, parent_id, root_id, hash, box, path, hpath, name, alias, memo, tag, content, fcontent, markdown, length, type, subtype, ial, sort, created, updated) VALUES %s"
@@ -285,7 +278,7 @@ func insertSpans0(tx *sql.Tx, bulk []*Span) (err error) {
 	return
 }
 
-func insertRefs(tx *sql.Tx, refs []*Ref) (err error) {
+func insertBlockRefs(tx *sql.Tx, refs []*Ref) (err error) {
 	if 1 > len(refs) {
 		return
 	}
@@ -388,9 +381,9 @@ func insertFileAnnotationRefs0(tx *sql.Tx, bulk []*FileAnnotationRef) (err error
 	return
 }
 
-func insertRef(tx *sql.Tx, tree *parse.Tree) (err error) {
+func insertRefs(tx *sql.Tx, tree *parse.Tree) (err error) {
 	refs, fileAnnotationRefs := refsFromTree(tree)
-	if err = insertRefs(tx, refs); nil != err {
+	if err = insertBlockRefs(tx, refs); nil != err {
 		return
 	}
 	if err = insertFileAnnotationRefs(tx, fileAnnotationRefs); nil != err {
@@ -450,7 +443,7 @@ func upsertTree(tx *sql.Tx, tree *parse.Tree, context map[string]interface{}) (e
 	}
 
 	refs, fileAnnotationRefs := refsFromTree(tree)
-	if err = insertRefs(tx, refs); nil != err {
+	if err = insertBlockRefs(tx, refs); nil != err {
 		return
 	}
 	if err = insertFileAnnotationRefs(tx, fileAnnotationRefs); nil != err {