Browse Source

fix blockThreshold full bug

Signed-off-by: ningmingxiao <ning.mingxiao@zte.com.cn>
ningmingxiao 3 years ago
parent
commit
dcfe23a038
1 changed files with 11 additions and 5 deletions
  1. 11 5
      pkg/ioutils/bytespipe.go

+ 11 - 5
pkg/ioutils/bytespipe.go

@@ -29,11 +29,12 @@ var (
 // and releases new byte slices to adjust to current needs, so the buffer
 // won't be overgrown after peak loads.
 type BytesPipe struct {
-	mu       sync.Mutex
-	wait     *sync.Cond
-	buf      []*fixedBuffer
-	bufLen   int
-	closeErr error // error to return from next Read. set to nil if not closed.
+	mu        sync.Mutex
+	wait      *sync.Cond
+	buf       []*fixedBuffer
+	bufLen    int
+	closeErr  error // error to return from next Read. set to nil if not closed.
+	readBlock bool  // check read BytesPipe is Wait() or not
 }
 
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
@@ -85,6 +86,9 @@ loop0:
 
 		// make sure the buffer doesn't grow too big from this write
 		for bp.bufLen >= blockThreshold {
+			if bp.readBlock {
+				bp.wait.Broadcast()
+			}
 			bp.wait.Wait()
 			if bp.closeErr != nil {
 				continue loop0
@@ -129,7 +133,9 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
 		if bp.closeErr != nil {
 			return 0, bp.closeErr
 		}
+		bp.readBlock = true
 		bp.wait.Wait()
+		bp.readBlock = false
 		if bp.bufLen == 0 && bp.closeErr != nil {
 			return 0, bp.closeErr
 		}