|
@@ -17,17 +17,11 @@
|
|
|
package sql
|
|
|
|
|
|
import (
|
|
|
- "bytes"
|
|
|
- "crypto/sha256"
|
|
|
- "database/sql"
|
|
|
- "fmt"
|
|
|
"path"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/88250/lute/ast"
|
|
|
"github.com/88250/lute/parse"
|
|
|
- "github.com/emirpasic/gods/sets/hashset"
|
|
|
"github.com/siyuan-note/eventbus"
|
|
|
"github.com/siyuan-note/logging"
|
|
|
"github.com/siyuan-note/siyuan/kernel/task"
|
|
@@ -35,19 +29,20 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- operationQueue []*treeQueueOperation
|
|
|
- upsertTreeQueueLock = sync.Mutex{}
|
|
|
+ operationQueue []*dbQueueOperation
|
|
|
+ dbQueueLock = sync.Mutex{}
|
|
|
|
|
|
txLock = sync.Mutex{}
|
|
|
)
|
|
|
|
|
|
-type treeQueueOperation struct {
|
|
|
+type dbQueueOperation struct {
|
|
|
inQueueTime time.Time
|
|
|
- action string // upsert/delete/delete_id/rename
|
|
|
+ action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs
|
|
|
|
|
|
- upsertTree *parse.Tree // upsert
|
|
|
+ upsertTree *parse.Tree // upsert/insert_refs
|
|
|
removeTreeBox, removeTreePath string // delete
|
|
|
removeTreeIDBox, removeTreeID string // delete_id
|
|
|
+ box string // delete_box/delete_box_refs
|
|
|
renameTree *parse.Tree // rename
|
|
|
renameTreeOldHPath string // rename
|
|
|
}
|
|
@@ -88,8 +83,8 @@ func IsEmptyQueue() bool {
|
|
|
}
|
|
|
|
|
|
func ClearQueue() {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
operationQueue = nil
|
|
|
}
|
|
|
|
|
@@ -108,7 +103,6 @@ func FlushQueue() {
|
|
|
}
|
|
|
|
|
|
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
|
|
|
- boxes := hashset.New()
|
|
|
for _, op := range ops {
|
|
|
switch op.action {
|
|
|
case "upsert":
|
|
@@ -116,17 +110,21 @@ func FlushQueue() {
|
|
|
if err = upsertTree(tx, tree, context); nil != err {
|
|
|
logging.LogErrorf("upsert tree [%s] into database failed: %s", tree.Box+tree.Path, err)
|
|
|
}
|
|
|
- boxes.Add(op.upsertTree.Box)
|
|
|
case "delete":
|
|
|
batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
|
|
|
- boxes.Add(op.removeTreeBox)
|
|
|
case "delete_id":
|
|
|
DeleteByRootID(tx, op.removeTreeID)
|
|
|
- boxes.Add(op.removeTreeIDBox)
|
|
|
case "rename":
|
|
|
batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
|
|
|
updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
|
|
|
- boxes.Add(op.renameTree.Box)
|
|
|
+ case "delete_box":
|
|
|
+ DeleteByBoxTx(tx, op.box)
|
|
|
+ case "delete_box_refs":
|
|
|
+ DeleteRefsByBoxTx(tx, op.box)
|
|
|
+ case "insert_refs":
|
|
|
+ InsertRefs(tx, op.upsertTree)
|
|
|
+ case "update_refs":
|
|
|
+ UpsertRefs(tx, op.upsertTree)
|
|
|
default:
|
|
|
logging.LogErrorf("unknown operation [%s]", op.action)
|
|
|
}
|
|
@@ -136,39 +134,78 @@ func FlushQueue() {
|
|
|
if 5000 < elapsed {
|
|
|
logging.LogInfof("op tx [%dms]", elapsed)
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- start = time.Now()
|
|
|
- tx, err = BeginTx()
|
|
|
- if nil != err {
|
|
|
- return
|
|
|
+func mergeUpsertTrees() (ops []*dbQueueOperation) {
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
+
|
|
|
+ ops = operationQueue
|
|
|
+ operationQueue = nil
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func UpdateRefsTreeQueue(tree *parse.Tree) {
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
+
|
|
|
+ newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "update_refs"}
|
|
|
+ for i, op := range operationQueue {
|
|
|
+ if "update_refs" == op.action && op.upsertTree.ID == tree.ID {
|
|
|
+ operationQueue[i] = newOp
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
- for _, box := range boxes.Values() {
|
|
|
- if !ast.IsNodeIDPattern(box.(string)) {
|
|
|
- continue
|
|
|
+ operationQueue = append(operationQueue, newOp)
|
|
|
+}
|
|
|
+
|
|
|
+func InsertRefsTreeQueue(tree *parse.Tree) {
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
+
|
|
|
+ newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "insert_refs"}
|
|
|
+ for i, op := range operationQueue {
|
|
|
+ if "insert_refs" == op.action && op.upsertTree.ID == tree.ID {
|
|
|
+ operationQueue[i] = newOp
|
|
|
+ return
|
|
|
}
|
|
|
- updateBoxHash(tx, box.(string))
|
|
|
}
|
|
|
- CommitTx(tx)
|
|
|
- elapsed = time.Now().Sub(start).Milliseconds()
|
|
|
- if 1000 < elapsed {
|
|
|
- logging.LogInfof("hash tx [%dms]", elapsed)
|
|
|
+ operationQueue = append(operationQueue, newOp)
|
|
|
+}
|
|
|
+
|
|
|
+func DeleteBoxRefsQueue(boxID string) {
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
+
|
|
|
+ newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box_refs"}
|
|
|
+ for i, op := range operationQueue {
|
|
|
+ if "delete_box_refs" == op.action && op.box == boxID {
|
|
|
+ operationQueue[i] = newOp
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
+ operationQueue = append(operationQueue, newOp)
|
|
|
}
|
|
|
|
|
|
-func mergeUpsertTrees() (ops []*treeQueueOperation) {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+func DeleteBoxQueue(boxID string) {
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
|
|
|
- ops = operationQueue
|
|
|
- operationQueue = nil
|
|
|
- return
|
|
|
+ newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box"}
|
|
|
+ for i, op := range operationQueue {
|
|
|
+ if "delete_box" == op.action && op.box == boxID {
|
|
|
+ operationQueue[i] = newOp
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ operationQueue = append(operationQueue, newOp)
|
|
|
}
|
|
|
|
|
|
func UpsertTreeQueue(tree *parse.Tree) {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
|
|
|
- newOp := &treeQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"}
|
|
|
+ newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"}
|
|
|
for i, op := range operationQueue {
|
|
|
if "upsert" == op.action && op.upsertTree.ID == tree.ID { // 相同树则覆盖
|
|
|
operationQueue[i] = newOp
|
|
@@ -179,10 +216,10 @@ func UpsertTreeQueue(tree *parse.Tree) {
|
|
|
}
|
|
|
|
|
|
func RenameTreeQueue(tree *parse.Tree, oldHPath string) {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
|
|
|
- newOp := &treeQueueOperation{
|
|
|
+ newOp := &dbQueueOperation{
|
|
|
renameTree: tree,
|
|
|
renameTreeOldHPath: oldHPath,
|
|
|
inQueueTime: time.Now(),
|
|
@@ -197,10 +234,10 @@ func RenameTreeQueue(tree *parse.Tree, oldHPath string) {
|
|
|
}
|
|
|
|
|
|
func RemoveTreeQueue(box, rootID string) {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
|
|
|
- var tmp []*treeQueueOperation
|
|
|
+ var tmp []*dbQueueOperation
|
|
|
// 将已有的 upsert 操作去重
|
|
|
for _, op := range operationQueue {
|
|
|
if "upsert" == op.action && op.upsertTree.ID != rootID {
|
|
@@ -209,15 +246,15 @@ func RemoveTreeQueue(box, rootID string) {
|
|
|
}
|
|
|
operationQueue = tmp
|
|
|
|
|
|
- newOp := &treeQueueOperation{removeTreeIDBox: box, removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"}
|
|
|
+ newOp := &dbQueueOperation{removeTreeIDBox: box, removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"}
|
|
|
operationQueue = append(operationQueue, newOp)
|
|
|
}
|
|
|
|
|
|
func RemoveTreePathQueue(treeBox, treePathPrefix string) {
|
|
|
- upsertTreeQueueLock.Lock()
|
|
|
- defer upsertTreeQueueLock.Unlock()
|
|
|
+ dbQueueLock.Lock()
|
|
|
+ defer dbQueueLock.Unlock()
|
|
|
|
|
|
- var tmp []*treeQueueOperation
|
|
|
+ var tmp []*dbQueueOperation
|
|
|
// 将已有的 upsert 操作去重
|
|
|
for _, op := range operationQueue {
|
|
|
if "upsert" == op.action && (op.removeTreeBox != treeBox || op.upsertTree.Path != treePathPrefix) {
|
|
@@ -226,31 +263,6 @@ func RemoveTreePathQueue(treeBox, treePathPrefix string) {
|
|
|
}
|
|
|
operationQueue = tmp
|
|
|
|
|
|
- newOp := &treeQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"}
|
|
|
+ newOp := &dbQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"}
|
|
|
operationQueue = append(operationQueue, newOp)
|
|
|
}
|
|
|
-
|
|
|
-func updateBoxHash(tx *sql.Tx, boxID string) {
|
|
|
- sum := boxChecksum(boxID)
|
|
|
- PutBoxHash(tx, boxID, sum)
|
|
|
-}
|
|
|
-
|
|
|
-func boxChecksum(box string) (ret string) {
|
|
|
- rows, err := query("SELECT hash FROM blocks WHERE type = 'd' AND box = ? ORDER BY id DESC", box)
|
|
|
- if nil != err {
|
|
|
- logging.LogErrorf("sql query failed: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
- defer rows.Close()
|
|
|
- buf := bytes.Buffer{}
|
|
|
- for rows.Next() {
|
|
|
- var hash string
|
|
|
- if err = rows.Scan(&hash); nil != err {
|
|
|
- logging.LogErrorf("query scan field failed: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
- buf.WriteString(hash)
|
|
|
- }
|
|
|
- ret = fmt.Sprintf("%x", sha256.Sum256(buf.Bytes()))
|
|
|
- return
|
|
|
-}
|