Merge remote-tracking branch 'origin/dev' into dev

This commit is contained in:
Vanessa 2023-01-26 20:16:08 +08:00
commit 77e178051a
4 changed files with 80 additions and 14 deletions

View file

@ -69,20 +69,23 @@ func autoFixIndex() {
for _, root := range roots {
rootMap[root.ID] = root
}
var toRemoveRootIDs []string
var deletes int
for _, rootID := range duplicatedRootIDs {
root := rootMap[rootID]
if nil == root {
continue
}
//logging.LogWarnf("exist more than one tree [%s], reindex it", rootID)
sql.RemoveTreeQueue(root.Box, rootID)
deletes++
toRemoveRootIDs = append(toRemoveRootIDs, rootID)
if util.IsExiting {
break
}
}
toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs)
sql.BatchRemoveTreeQueue(toRemoveRootIDs)
if 0 < deletes {
logging.LogWarnf("exist more than one tree duplicated [%d], reindex it", deletes)
}
@ -203,17 +206,20 @@ func reindexTreeByUpdated(rootUpdatedMap, dbRootUpdatedMap map[string]string) {
for _, block := range blocks {
roots[block.RootID] = block
}
var toRemoveRootIDs []string
for id, root := range roots {
if nil == root {
continue
}
logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box)
sql.RemoveTreeQueue(root.Box, root.ID)
toRemoveRootIDs = append(toRemoveRootIDs, id)
if util.IsExiting {
break
}
}
toRemoveRootIDs = gulu.Str.RemoveDuplicatedElem(toRemoveRootIDs)
//logging.LogWarnf("tree [%s] is not in block tree, remove it from [%s]", id, root.Box)
sql.BatchRemoveTreeQueue(toRemoveRootIDs)
}
func reindexTreeByPath(box, p string, i, size int) {

View file

@ -20,6 +20,7 @@ import (
"bytes"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
@ -982,6 +983,42 @@ func deleteByRootID(tx *sql.Tx, rootID string, context map[string]interface{}) (
return
}
func batchDeleteByRootIDs(tx *sql.Tx, rootIDs []string, context map[string]interface{}) (err error) {
ids := strings.Join(rootIDs, "','")
ids = "('" + ids + "')"
stmt := "DELETE FROM blocks WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM blocks_fts WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM blocks_fts_case_insensitive WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM spans WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM assets WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM refs WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
stmt = "DELETE FROM file_annotation_refs WHERE root_id IN " + ids
if err = execStmtTx(tx, stmt); nil != err {
return
}
ClearBlockCache()
eventbus.Publish(eventbus.EvtSQLDeleteBlocks, context, fmt.Sprintf("%d", len(rootIDs)))
return
}
func batchDeleteByPathPrefix(tx *sql.Tx, boxID, pathPrefix string) (err error) {
stmt := "DELETE FROM blocks WHERE box = ? AND path LIKE ?"
if err = execStmtTx(tx, stmt, boxID, pathPrefix+"%"); nil != err {

View file

@ -41,12 +41,13 @@ var (
type dbQueueOperation struct {
inQueueTime time.Time
action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index
action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs/index/delete_ids
indexPath string // index
upsertTree *parse.Tree // upsert/insert_refs
removeTreeBox, removeTreePath string // delete
removeTreeIDBox, removeTreeID string // delete_id
removeTreeIDs []string // delete_ids
box string // delete_box/delete_box_refs/index
renameTree *parse.Tree // rename
renameTreeOldHPath string // rename
@ -148,6 +149,8 @@ func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (e
err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
case "delete_id":
err = deleteByRootID(tx, op.removeTreeID, context)
case "delete_ids":
err = batchDeleteByRootIDs(tx, op.removeTreeIDs, context)
case "rename":
err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
if nil != err {
@ -295,6 +298,14 @@ func RemoveTreeQueue(box, rootID string) {
operationQueue = append(operationQueue, newOp)
}
func BatchRemoveTreeQueue(rootIDs []string) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
newOp := &dbQueueOperation{removeTreeIDs: rootIDs, inQueueTime: time.Now(), action: "delete_ids"}
operationQueue = append(operationQueue, newOp)
}
func RemoveTreePathQueue(treeBox, treePathPrefix string) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()

View file

@ -29,6 +29,7 @@ import (
"github.com/88250/lute/ast"
"github.com/88250/lute/parse"
"github.com/dustin/go-humanize"
"github.com/panjf2000/ants/v2"
util2 "github.com/siyuan-note/dejavu/util"
"github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/util"
@ -425,11 +426,13 @@ func InitBlockTree(force bool) {
}
size := uint64(0)
for _, entry := range entries {
if !strings.HasSuffix(entry.Name(), ".msgpack") {
continue
}
poolSize := runtime.NumCPU()
waitGroup := &sync.WaitGroup{}
p, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) {
defer waitGroup.Done()
entry := arg.(os.DirEntry)
p := filepath.Join(util.BlockTreePath, entry.Name())
var fh *os.File
fh, err = os.OpenFile(p, os.O_RDWR, 0644)
@ -461,13 +464,22 @@ func InitBlockTree(force bool) {
name := entry.Name()[0:strings.Index(entry.Name(), ".")]
blockTrees.Store(name, &btSlice{data: sliceData, changed: time.Time{}, m: &sync.Mutex{}})
size += uint64(len(data))
})
for _, entry := range entries {
if !strings.HasSuffix(entry.Name(), ".msgpack") {
continue
}
waitGroup.Add(1)
p.Invoke(entry)
}
waitGroup.Wait()
p.Release()
runtime.GC()
if elapsed := time.Since(start).Seconds(); 2 < elapsed {
logging.LogWarnf("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes((size)), util.BlockTreePath, elapsed)
}
elapsed := time.Since(start).Seconds()
logging.LogInfof("read block tree [%s] to [%s], elapsed [%.2fs]", humanize.Bytes((size)), util.BlockTreePath, elapsed)
return
}