|
@@ -64,8 +64,6 @@ func IsUnfoldHeading(transactions *[]*Transaction) bool {
|
|
|
var (
|
|
|
txQueue []*Transaction
|
|
|
txQueueLock = sync.Mutex{}
|
|
|
-
|
|
|
- currentTx *Transaction
|
|
|
)
|
|
|
|
|
|
func WaitForWritingFiles() {
|
|
@@ -86,20 +84,21 @@ func WaitForWritingFiles() {
|
|
|
|
|
|
func isWritingFiles() bool {
|
|
|
time.Sleep(time.Duration(20) * time.Millisecond)
|
|
|
- if 0 < len(txQueue) || util.IsMutexLocked(&txQueueLock) {
|
|
|
- return true
|
|
|
- }
|
|
|
- return nil != currentTx
|
|
|
+ return 0 < len(txQueue) || util.IsMutexLocked(&txQueueLock) || util.IsMutexLocked(&flushLock)
|
|
|
}
|
|
|
|
|
|
func FlushTxJob() {
|
|
|
flushTx()
|
|
|
}
|
|
|
|
|
|
+var flushLock = sync.Mutex{}
|
|
|
+
|
|
|
func flushTx() {
|
|
|
defer logging.Recover()
|
|
|
+ flushLock.Lock()
|
|
|
+ defer flushLock.Unlock()
|
|
|
|
|
|
- currentTx = mergeTx()
|
|
|
+ currentTx := mergeTx()
|
|
|
start := time.Now()
|
|
|
if txErr := performTx(currentTx); nil != txErr {
|
|
|
switch txErr.code {
|
|
@@ -119,7 +118,6 @@ func flushTx() {
|
|
|
logging.LogWarnf("op tx [%dms]", elapsed)
|
|
|
}
|
|
|
}
|
|
|
- currentTx = nil
|
|
|
}
|
|
|
|
|
|
func mergeTx() (ret *Transaction) {
|