|
@@ -20,11 +20,11 @@ import (
|
|
|
"database/sql"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "github.com/siyuan-note/eventbus"
|
|
|
"runtime/debug"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/siyuan-note/eventbus"
|
|
|
"github.com/siyuan-note/logging"
|
|
|
"github.com/siyuan-note/siyuan/kernel/task"
|
|
|
"github.com/siyuan-note/siyuan/kernel/util"
|
|
@@ -39,7 +39,8 @@ var (
|
|
|
|
|
|
type historyDBQueueOperation struct {
|
|
|
inQueueTime time.Time
|
|
|
- action string // index/deletePathPrefix
|
|
|
+ action string // index/deletePathPrefix
|
|
|
+ context map[string]interface{} // 消息推送上下文
|
|
|
|
|
|
histories []*History // index
|
|
|
pathPrefix string // deletePathPrefix
|
|
@@ -59,7 +60,6 @@ func FlushHistoryQueue() {
|
|
|
defer txLock.Unlock()
|
|
|
start := time.Now()
|
|
|
|
|
|
- context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}
|
|
|
total := len(ops)
|
|
|
for i, op := range ops {
|
|
|
if util.IsExiting {
|
|
@@ -71,9 +71,9 @@ func FlushHistoryQueue() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- context["current"] = i
|
|
|
- context["total"] = total
|
|
|
- if err = execHistoryOp(op, tx, context); nil != err {
|
|
|
+ op.context["current"] = i
|
|
|
+ op.context["total"] = total
|
|
|
+ if err = execHistoryOp(op, tx); nil != err {
|
|
|
tx.Rollback()
|
|
|
logging.LogErrorf("queue operation failed: %s", err)
|
|
|
continue
|
|
@@ -99,12 +99,12 @@ func FlushHistoryQueue() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func execHistoryOp(op *historyDBQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) {
|
|
|
+func execHistoryOp(op *historyDBQueueOperation, tx *sql.Tx) (err error) {
|
|
|
switch op.action {
|
|
|
case "index":
|
|
|
- err = insertHistories(tx, op.histories, context)
|
|
|
+ err = insertHistories(tx, op.histories, op.context)
|
|
|
case "deletePathPrefix":
|
|
|
- err = deleteHistoriesByPathPrefix(tx, op.pathPrefix, context)
|
|
|
+ err = deleteHistoriesByPathPrefix(tx, op.pathPrefix, op.context)
|
|
|
default:
|
|
|
msg := fmt.Sprintf("unknown history operation [%s]", op.action)
|
|
|
logging.LogErrorf(msg)
|
|
@@ -117,15 +117,17 @@ func DeleteHistoriesByPathPrefixQueue(pathPrefix string) {
|
|
|
historyDBQueueLock.Lock()
|
|
|
defer historyDBQueueLock.Unlock()
|
|
|
|
|
|
- newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "deletePathPrefix", pathPrefix: pathPrefix}
|
|
|
+ newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "deletePathPrefix", pathPrefix: pathPrefix,
|
|
|
+ context: map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}}
|
|
|
historyOperationQueue = append(historyOperationQueue, newOp)
|
|
|
}
|
|
|
|
|
|
-func IndexHistoriesQueue(histories []*History) {
|
|
|
+func IndexHistoriesQueue(histories []*History, context map[string]interface{}) {
|
|
|
historyDBQueueLock.Lock()
|
|
|
defer historyDBQueueLock.Unlock()
|
|
|
|
|
|
- newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "index", histories: histories}
|
|
|
+ newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "index", histories: histories,
|
|
|
+ context: context}
|
|
|
historyOperationQueue = append(historyOperationQueue, newOp)
|
|
|
}
|
|
|
|