This commit is contained in:
Daniel 2023-10-04 11:45:41 +08:00
parent 828eeee4be
commit 56ace2e427
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
2 changed files with 21 additions and 50 deletions

View file

@ -35,7 +35,6 @@ func StartCron() {
go every(2*time.Hour, model.StatJob)
go every(2*time.Hour, model.RefreshCheckJob)
go every(3*time.Second, model.FlushUpdateRefTextRenameDocJob)
go every(50*time.Millisecond, model.FlushTxJob)
go every(util.SQLFlushInterval, sql.FlushTxJob)
go every(util.SQLFlushInterval, sql.FlushHistoryTxJob)
go every(util.SQLFlushInterval, sql.FlushAssetContentTxJob)

View file

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"time"
@ -65,11 +64,6 @@ func IsUnfoldHeading(transactions *[]*Transaction) bool {
return false
}
var (
txQueue []*Transaction
txQueueLock = sync.Mutex{}
)
func WaitForWritingFiles() {
var printLog bool
var lastPrintLog bool
@ -86,25 +80,34 @@ func WaitForWritingFiles() {
}
}
var (
txQueue = make(chan *Transaction, 7)
flushLock = sync.Mutex{}
)
func isWritingFiles() bool {
time.Sleep(time.Duration(20) * time.Millisecond)
return 0 < len(txQueue) || util.IsMutexLocked(&txQueueLock) || util.IsMutexLocked(&flushLock)
return 0 < len(txQueue) || util.IsMutexLocked(&flushLock)
}
func FlushTxJob() {
flushTx()
func init() {
go func() {
for {
select {
case tx := <-txQueue:
flushTx(tx)
}
}
}()
}
var flushLock = sync.Mutex{}
func flushTx() {
func flushTx(tx *Transaction) {
defer logging.Recover()
flushLock.Lock()
defer flushLock.Unlock()
currentTx := mergeTx()
start := time.Now()
if txErr := performTx(currentTx); nil != txErr {
if txErr := performTx(tx); nil != txErr {
switch txErr.code {
case TxErrCodeBlockNotFound:
util.PushTxErr("Transaction failed", txErr.code, nil)
@ -116,48 +119,17 @@ func flushTx() {
}
}
elapsed := time.Now().Sub(start).Milliseconds()
if 0 < len(currentTx.DoOperations) {
if 0 < len(tx.DoOperations) {
if 2000 < elapsed {
logging.LogWarnf("op tx [%dms]", elapsed)
}
}
}
func mergeTx() (ret *Transaction) {
txQueueLock.Lock()
defer txQueueLock.Unlock()
ret = &Transaction{}
var doOps []*Operation
for _, tx := range txQueue {
for _, op := range tx.DoOperations {
if l := len(doOps); 0 < l {
lastOp := doOps[l-1]
if "update" == lastOp.Action && "update" == op.Action && lastOp.ID == op.ID { // 连续相同的更新操作
lastOp.discard = true
}
}
doOps = append(doOps, op)
}
}
for _, op := range doOps {
if !op.discard {
ret.DoOperations = append(ret.DoOperations, op)
}
}
txQueue = nil
return
}
func PerformTransactions(transactions *[]*Transaction) {
txQueueLock.Lock()
txQueue = append(txQueue, *transactions...)
sort.Slice(txQueue, func(i, j int) bool {
return txQueue[i].Timestamp < txQueue[j].Timestamp
})
txQueueLock.Unlock()
for _, tx := range *transactions {
txQueue <- tx
}
return
}