fix blockThreshold full bug

Signed-off-by: ningmingxiao <ning.mingxiao@zte.com.cn>
(cherry picked from commit dcfe23a038)
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
ningmingxiao 2021-12-30 23:10:48 +08:00 committed by Cory Snider
parent 941a07b339
commit 3da45c0fe7

View file

@ -29,11 +29,12 @@ var (
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct {
mu sync.Mutex
wait *sync.Cond
buf []*fixedBuffer
bufLen int
closeErr error // error to return from next Read. set to nil if not closed.
mu sync.Mutex
wait *sync.Cond
buf []*fixedBuffer
bufLen int
closeErr error // error to return from next Read. set to nil if not closed.
readBlock bool // check read BytesPipe is Wait() or not
}
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
@ -85,6 +86,9 @@ loop0:
// make sure the buffer doesn't grow too big from this write
for bp.bufLen >= blockThreshold {
if bp.readBlock {
bp.wait.Broadcast()
}
bp.wait.Wait()
if bp.closeErr != nil {
continue loop0
@ -129,7 +133,9 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
if bp.closeErr != nil {
return 0, bp.closeErr
}
bp.readBlock = true
bp.wait.Wait()
bp.readBlock = false
if bp.bufLen == 0 && bp.closeErr != nil {
return 0, bp.closeErr
}