♻️ Improve transaction flush

This commit is contained in:
Daniel 2024-10-22 19:20:44 +08:00
parent 72d1a059ac
commit 8dd0629b9c
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
23 changed files with 69 additions and 82 deletions

View file

@ -376,7 +376,7 @@ func getRefText(c *gin.Context) {
}
id := arg["id"].(string)
model.WaitForWritingFiles()
model.FlushTxQueue()
refText := model.GetBlockRefText(id)
if "" == refText {
// 空块返回 id https://github.com/siyuan-note/siyuan/issues/10259

View file

@ -74,7 +74,7 @@ func moveOutlineHeading(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -127,7 +127,7 @@ func appendDailyNoteBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -180,7 +180,7 @@ func prependDailyNoteBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -241,7 +241,7 @@ func unfoldBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
broadcastTransactions(transactions)
}
@ -301,7 +301,7 @@ func foldBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
broadcastTransactions(transactions)
}
@ -355,7 +355,7 @@ func moveBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -400,7 +400,7 @@ func appendBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -445,7 +445,7 @@ func prependBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -508,7 +508,7 @@ func insertBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)
@ -599,7 +599,7 @@ func updateBlock(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
ret.Data = transactions
broadcastTransactions(transactions)

View file

@ -223,7 +223,7 @@ func heading2Doc(c *gin.Context) {
return
}
model.WaitForWritingFiles()
model.FlushTxQueue()
luteEngine := util.NewLute()
tree, err := filesys.LoadTree(targetNotebook, targetPath, luteEngine)
if err != nil {
@ -268,7 +268,7 @@ func li2Doc(c *gin.Context) {
return
}
model.WaitForWritingFiles()
model.FlushTxQueue()
luteEngine := util.NewLute()
tree, err := filesys.LoadTree(targetNotebook, targetPath, luteEngine)
if err != nil {
@ -591,7 +591,7 @@ func createDoc(c *gin.Context) {
return
}
model.WaitForWritingFiles()
model.FlushTxQueue()
box := model.Conf.Box(notebook)
pushCreate(box, p, tree.Root.ID, arg)
@ -621,7 +621,7 @@ func createDailyNote(c *gin.Context) {
return
}
model.WaitForWritingFiles()
model.FlushTxQueue()
box := model.Conf.Box(notebook)
luteEngine := util.NewLute()
tree, err := filesys.LoadTree(box.ID, p, luteEngine)
@ -720,7 +720,7 @@ func createDocWithMd(c *gin.Context) {
}
ret.Data = id
model.WaitForWritingFiles()
model.FlushTxQueue()
box := model.Conf.Box(notebook)
b, _ := model.GetBlock(id, nil)
p := b.Path

View file

@ -325,7 +325,7 @@ func removeRiffCards(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
if "" != deckID {
deck := model.Decks[deckID]
@ -363,7 +363,7 @@ func addRiffCards(c *gin.Context) {
}
model.PerformTransactions(&transactions)
model.WaitForWritingFiles()
model.FlushTxQueue()
deck := model.Decks[deckID]
ret.Data = deckData(deck)

View file

@ -85,7 +85,7 @@ func performTransactions(c *gin.Context) {
func pushTransactions(app, session string, transactions []*model.Transaction) {
pushMode := util.PushModeBroadcastExcludeSelf
if 0 < len(transactions) && 0 < len(transactions[0].DoOperations) {
model.WaitForWritingFiles() // 等待文件写入完成,后续渲染才能读取到最新的数据
model.FlushTxQueue() // 等待文件写入完成,后续渲染才能读取到最新的数据
action := transactions[0].DoOperations[0].Action
isAttrViewTx := strings.Contains(strings.ToLower(action), "attrview")

View file

@ -38,7 +38,7 @@ import (
)
func RefreshBacklink(id string) {
WaitForWritingFiles()
FlushTxQueue()
refreshRefsByDefID(id)
}

View file

@ -463,7 +463,7 @@ func SwapBlockRef(refID, defID string, includeChildren bool) (err error) {
return
}
}
WaitForWritingFiles()
FlushTxQueue()
util.ReloadUI()
return
}

View file

@ -51,7 +51,7 @@ func SetBlockReminder(id string, timed string) (err error) {
timedMills = t.UnixMilli()
}
WaitForWritingFiles()
FlushTxQueue()
attrs := sql.GetBlockAttrs(id)
tree, err := LoadTreeByBlockID(id)
@ -101,7 +101,7 @@ func BatchSetBlockAttrs(blockAttrs []map[string]interface{}) (err error) {
return
}
WaitForWritingFiles()
FlushTxQueue()
var blockIDs []string
for _, blockAttr := range blockAttrs {
@ -149,7 +149,7 @@ func SetBlockAttrs(id string, nameValues map[string]string) (err error) {
return
}
WaitForWritingFiles()
FlushTxQueue()
tree, err := LoadTreeByBlockID(id)
if err != nil {

View file

@ -53,7 +53,7 @@ type AttrView struct {
}
func GetDocInfo(blockID string) (ret *BlockInfo) {
WaitForWritingFiles()
FlushTxQueue()
tree, err := LoadTreeByBlockID(blockID)
if err != nil {
@ -125,7 +125,7 @@ func GetDocInfo(blockID string) (ret *BlockInfo) {
}
func GetDocsInfo(blockIDs []string, queryRefCount bool, queryAv bool) (rets []*BlockInfo) {
WaitForWritingFiles()
FlushTxQueue()
trees := filesys.LoadTrees(blockIDs)
for _, blockID := range blockIDs {

View file

@ -158,7 +158,7 @@ func BookmarkLabels() (ret []string) {
}
func BuildBookmark() (ret *Bookmarks) {
WaitForWritingFiles()
FlushTxQueue()
sql.FlushQueue()
ret = &Bookmarks{}

View file

@ -619,7 +619,7 @@ func fullReindex() {
util.PushEndlessProgress(Conf.language(35))
defer util.PushClearProgress()
WaitForWritingFiles()
FlushTxQueue()
if err := sql.InitDatabase(true); err != nil {
os.Exit(logging.ExitCodeReadOnlyDatabase)

View file

@ -597,7 +597,7 @@ func Close(force, setCurrentWorkspace bool, execInstallPkg int) (exitCode int) {
logging.LogInfof("exiting kernel [force=%v, setCurrentWorkspace=%v, execInstallPkg=%d]", force, setCurrentWorkspace, execInstallPkg)
util.PushMsg(Conf.Language(95), 10000*60)
WaitForWritingFiles()
FlushTxQueue()
if !force {
if Conf.Sync.Enabled && 3 != Conf.Sync.Mode &&

View file

@ -478,7 +478,7 @@ func ExportData() (zipPath string, err error) {
}
func exportData(exportFolder string) (zipPath string, err error) {
WaitForWritingFiles()
FlushTxQueue()
logging.LogInfof("exporting data...")
@ -521,7 +521,7 @@ func exportData(exportFolder string) (zipPath string, err error) {
}
func ExportResources(resourcePaths []string, mainName string) (exportFilePath string, err error) {
WaitForWritingFiles()
FlushTxQueue()
// 用于导出的临时文件夹完整路径
exportFolderPath := filepath.Join(util.TempDir, "export", mainName)

View file

@ -504,7 +504,7 @@ func BlocksWordCount(ids []string) (ret *util.BlockStatResult) {
}
func StatTree(id string) (ret *util.BlockStatResult) {
WaitForWritingFiles()
FlushTxQueue()
tree, _ := LoadTreeByBlockID(id)
if nil == tree {
@ -614,7 +614,7 @@ func GetDoc(startID, endID, id string, index int, query string, queryTypes map[s
//pprof.StartCPUProfile(cpuProfile)
//defer pprof.StopCPUProfile()
WaitForWritingFiles() // 写入数据时阻塞,避免获取到的数据不一致
FlushTxQueue() // 写入数据时阻塞,避免获取到的数据不一致
inputIndex := index
tree, err := LoadTreeByBlockID(id)
@ -1121,7 +1121,7 @@ func DuplicateDoc(tree *parse.Tree) {
resetTree(tree, "Duplicated", false)
createTreeTx(tree)
WaitForWritingFiles()
FlushTxQueue()
// 复制为副本时将该副本块插入到数据库中 https://github.com/siyuan-note/siyuan/issues/11959
ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
@ -1170,7 +1170,7 @@ func CreateDocByMd(boxID, p, title, md string, sorts []string) (tree *parse.Tree
return
}
WaitForWritingFiles()
FlushTxQueue()
ChangeFileTreeSort(box.ID, sorts)
return
}
@ -1185,7 +1185,7 @@ func CreateWithMarkdown(tags, boxID, hPath, md, parentID, id string, withMath bo
return
}
WaitForWritingFiles()
FlushTxQueue()
luteEngine := util.NewLute()
if withMath {
luteEngine.SetInlineMath(true)
@ -1206,7 +1206,7 @@ func CreateWithMarkdown(tags, boxID, hPath, md, parentID, id string, withMath bo
nameValues["tags"] = tags
SetBlockAttrs(retID, nameValues)
WaitForWritingFiles()
FlushTxQueue()
return
}
@ -1231,7 +1231,7 @@ func CreateDailyNote(boxID string) (p string, existed bool, err error) {
return
}
WaitForWritingFiles()
FlushTxQueue()
existRoot := treenode.GetBlockTreeRootByHPath(box.ID, hPath)
if nil != existRoot {
@ -1305,7 +1305,7 @@ func CreateDailyNote(boxID string) (p string, existed bool, err error) {
}
IncSync()
WaitForWritingFiles()
FlushTxQueue()
tree, err := LoadTreeByBlockID(id)
if err != nil {
@ -1451,7 +1451,7 @@ func MoveDocs(fromPaths []string, toBoxID, toPath string, callback interface{})
defer util.PushClearProgress()
}
WaitForWritingFiles()
FlushTxQueue()
luteEngine := util.NewLute()
count := 0
for fromPath, fromBox := range pathsBoxes {
@ -1635,7 +1635,7 @@ func RemoveDoc(boxID, p string) {
return
}
WaitForWritingFiles()
FlushTxQueue()
luteEngine := util.NewLute()
removeDoc(box, p, luteEngine)
IncSync()
@ -1648,7 +1648,7 @@ func RemoveDocs(paths []string) {
paths = util.FilterSelfChildDocs(paths)
pathsBoxes := getBoxesByPaths(paths)
WaitForWritingFiles()
FlushTxQueue()
luteEngine := util.NewLute()
for p, box := range pathsBoxes {
removeDoc(box, p, luteEngine)
@ -1772,7 +1772,7 @@ func RenameDoc(boxID, p, title string) (err error) {
return
}
WaitForWritingFiles()
FlushTxQueue()
luteEngine := util.NewLute()
tree, err := filesys.LoadTree(box.ID, p, luteEngine)
if err != nil {
@ -1928,7 +1928,7 @@ func createDoc(boxID, p, title, dom string) (tree *parse.Tree, err error) {
transaction := &Transaction{DoOperations: []*Operation{{Action: "create", Data: tree}}}
PerformTransactions(&[]*Transaction{transaction})
WaitForWritingFiles()
FlushTxQueue()
return
}
@ -2000,7 +2000,7 @@ func ChangeFileTreeSort(boxID string, paths []string) {
return
}
WaitForWritingFiles()
FlushTxQueue()
box := Conf.Box(boxID)
sortIDs := map[string]int{}
max := 0

View file

@ -202,7 +202,7 @@ func resetFlashcards(deckID string, blockIDs []string) {
}
PerformTransactions(&transactions)
WaitForWritingFiles()
FlushTxQueue()
}
func GetFlashcardNotebooks() (ret []*Box) {

View file

@ -33,7 +33,7 @@ func AutoSpace(rootID string) (err error) {
util.PushProtyleLoading(rootID, Conf.Language(116))
defer util.PushReloadProtyle(rootID)
WaitForWritingFiles()
FlushTxQueue()
generateOpTypeHistory(tree, HistoryOpFormat)
luteEngine := NewLute()

View file

@ -63,7 +63,7 @@ func generateFileHistory() {
return
}
WaitForWritingFiles()
FlushTxQueue()
// 生成文档历史
for _, box := range Conf.GetOpenedBoxes() {
@ -225,7 +225,7 @@ func RollbackDocHistory(boxID, historyPath string) (err error) {
return
}
WaitForWritingFiles()
FlushTxQueue()
srcPath := historyPath
var destPath, parentHPath string

View file

@ -660,7 +660,7 @@ func ImportFromLocalPath(boxID, localPath string, toPath string) (err error) {
lockSync()
defer unlockSync()
WaitForWritingFiles()
FlushTxQueue()
var baseHPath, baseTargetPath, boxLocalPath string
if "/" == toPath {

View file

@ -45,7 +45,7 @@ func CreateBox(name string) (id string, err error) {
name = Conf.language(105)
}
WaitForWritingFiles()
FlushTxQueue()
createDocLock.Lock()
defer createDocLock.Unlock()
@ -106,7 +106,7 @@ func RemoveBox(boxID string) (err error) {
return errors.New(fmt.Sprintf("can not remove [%s] caused by it is a reserved file", boxID))
}
WaitForWritingFiles()
FlushTxQueue()
isUserGuide := IsUserGuide(boxID)
createDocLock.Lock()
defer createDocLock.Unlock()
@ -147,7 +147,7 @@ func RemoveBox(boxID string) (err error) {
}
func Unmount(boxID string) {
WaitForWritingFiles()
FlushTxQueue()
unmount0(boxID)
evt := util.NewCmdResult("unmount", 0, util.PushModeBroadcast)
@ -178,7 +178,7 @@ func Mount(boxID string) (alreadyMount bool, err error) {
boxLock.Store(boxID, true)
defer boxLock.Delete(boxID)
WaitForWritingFiles()
FlushTxQueue()
isUserGuide := IsUserGuide(boxID)
localPath := filepath.Join(util.DataDir, boxID)

View file

@ -210,7 +210,7 @@ func (tx *Transaction) doMoveOutlineHeading(operation *Operation) (ret *TxErr) {
func Outline(rootID string, preview bool) (ret []*Path, err error) {
time.Sleep(util.FrontendQueueInterval)
WaitForWritingFiles()
FlushTxQueue()
ret = []*Path{}
tree, _ := LoadTreeByBlockID(rootID)

View file

@ -628,7 +628,7 @@ func checkoutRepo(id string) {
}
util.PushEndlessProgress(Conf.Language(63))
WaitForWritingFiles()
FlushTxQueue()
CloseWatchAssets()
defer WatchAssets()
CloseWatchEmojis()
@ -962,7 +962,7 @@ func IndexRepo(memo string) (err error) {
start := time.Now()
latest, _ := repo.Latest()
WaitForWritingFiles()
FlushTxQueue()
index, err := repo.Index(memo, map[string]interface{}{
eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress,
})

View file

@ -206,7 +206,7 @@ type Tag struct {
type Tags []*Tag
func BuildTags() (ret *Tags) {
WaitForWritingFiles()
FlushTxQueue()
sql.FlushQueue()
ret = &Tags{}

View file

@ -56,19 +56,9 @@ func IsMoveOutlineHeading(transactions *[]*Transaction) bool {
return false
}
func WaitForWritingFiles() {
var printLog bool
var lastPrintLog bool
for i := 0; isWritingFiles(); i++ {
time.Sleep(5 * time.Millisecond)
if 2000 < i && !printLog { // 10s 后打日志
logging.LogWarnf("file is writing: \n%s", logging.ShortStack())
printLog = true
}
if 12000 < i && !lastPrintLog { // 60s 后打日志
logging.LogWarnf("file is still writing")
lastPrintLog = true
}
func FlushTxQueue() {
for 0 < len(txQueue) || isFlushing {
time.Sleep(10 * time.Millisecond)
}
}
@ -78,20 +68,17 @@ var (
isFlushing = false
)
func isWritingFiles() bool {
time.Sleep(time.Duration(50) * time.Millisecond)
return 0 < len(txQueue) || isFlushing
func init() {
go flushQueue()
}
func init() {
go func() {
for {
select {
case tx := <-txQueue:
flushTx(tx)
}
func flushQueue() {
for {
select {
case tx := <-txQueue:
flushTx(tx)
}
}()
}
}
func flushTx(tx *Transaction) {