|
@@ -72,13 +72,14 @@ func WaitForWritingFiles() {
|
|
}
|
|
}
|
|
|
|
|
|
var (
|
|
var (
|
|
- txQueue = make(chan *Transaction, 7)
|
|
|
|
- flushLock = sync.Mutex{}
|
|
|
|
|
|
+ txQueue = make(chan *Transaction, 7)
|
|
|
|
+ flushLock = sync.Mutex{}
|
|
|
|
+ isFlushing = false
|
|
)
|
|
)
|
|
|
|
|
|
func isWritingFiles() bool {
|
|
func isWritingFiles() bool {
|
|
time.Sleep(time.Duration(50) * time.Millisecond)
|
|
time.Sleep(time.Duration(50) * time.Millisecond)
|
|
- return 0 < len(txQueue)
|
|
|
|
|
|
+ return 0 < len(txQueue) || isFlushing
|
|
}
|
|
}
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
@@ -95,7 +96,11 @@ func init() {
|
|
func flushTx(tx *Transaction) {
|
|
func flushTx(tx *Transaction) {
|
|
defer logging.Recover()
|
|
defer logging.Recover()
|
|
flushLock.Lock()
|
|
flushLock.Lock()
|
|
- defer flushLock.Unlock()
|
|
|
|
|
|
+ isFlushing = true
|
|
|
|
+ defer func() {
|
|
|
|
+ isFlushing = false
|
|
|
|
+ flushLock.Unlock()
|
|
|
|
+ }()
|
|
|
|
|
|
start := time.Now()
|
|
start := time.Now()
|
|
if txErr := performTx(tx); nil != txErr {
|
|
if txErr := performTx(tx); nil != txErr {
|