Pārlūkot izejas kodu

Merge pull request #43112 from ningmingxiao/dev1

fix blockThreshold full deadlock bug
Bjorn Neergaard 2 gadi atpakaļ
vecāks
revīzija
b34ffc99b8
2 mainītis faili ar 70 papildinājumiem un 5 dzēšanām
  1. 11 5
      pkg/ioutils/bytespipe.go
  2. 59 0
      pkg/ioutils/bytespipe_test.go

+ 11 - 5
pkg/ioutils/bytespipe.go

@@ -29,11 +29,12 @@ var (
 // and releases new byte slices to adjust to current needs, so the buffer
 // won't be overgrown after peak loads.
 type BytesPipe struct {
-	mu       sync.Mutex
-	wait     *sync.Cond
-	buf      []*fixedBuffer
-	bufLen   int
-	closeErr error // error to return from next Read. set to nil if not closed.
+	mu        sync.Mutex
+	wait      *sync.Cond
+	buf       []*fixedBuffer
+	bufLen    int
+	closeErr  error // error to return from next Read. set to nil if not closed.
+	readBlock bool  // check read BytesPipe is Wait() or not
 }
 
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
@@ -85,6 +86,9 @@ loop0:
 
 		// make sure the buffer doesn't grow too big from this write
 		for bp.bufLen >= blockThreshold {
+			if bp.readBlock {
+				bp.wait.Broadcast()
+			}
 			bp.wait.Wait()
 			if bp.closeErr != nil {
 				continue loop0
@@ -129,7 +133,9 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
 		if bp.closeErr != nil {
 			return 0, bp.closeErr
 		}
+		bp.readBlock = true
 		bp.wait.Wait()
+		bp.readBlock = false
 		if bp.bufLen == 0 && bp.closeErr != nil {
 			return 0, bp.closeErr
 		}

+ 59 - 0
pkg/ioutils/bytespipe_test.go

@@ -60,6 +60,65 @@ func TestBytesPipeWrite(t *testing.T) {
 	}
 }
 
+// Regression test for #41941.
+func TestBytesPipeDeadlock(t *testing.T) {
+	bp := NewBytesPipe()
+	bp.buf = []*fixedBuffer{getBuffer(blockThreshold)}
+
+	rd := make(chan error)
+	go func() {
+		n, err := bp.Read(make([]byte, 1))
+		t.Logf("Read n=%d, err=%v", n, err)
+		if n != 1 {
+			t.Errorf("short read: got %d, want 1", n)
+		}
+		rd <- err
+	}()
+
+	wr := make(chan error)
+	go func() {
+		const writeLen int = blockThreshold + 1
+		time.Sleep(time.Millisecond)
+		n, err := bp.Write(make([]byte, writeLen))
+		t.Logf("Write n=%d, err=%v", n, err)
+		if n != writeLen {
+			t.Errorf("short write: got %d, want %d", n, writeLen)
+		}
+		wr <- err
+	}()
+
+	timer := time.NewTimer(time.Second)
+	defer timer.Stop()
+	select {
+	case <-timer.C:
+		t.Fatal("deadlock! Neither Read() nor Write() returned.")
+	case rerr := <-rd:
+		if rerr != nil {
+			t.Fatal(rerr)
+		}
+		select {
+		case <-timer.C:
+			t.Fatal("deadlock! Write() did not return.")
+		case werr := <-wr:
+			if werr != nil {
+				t.Fatal(werr)
+			}
+		}
+	case werr := <-wr:
+		if werr != nil {
+			t.Fatal(werr)
+		}
+		select {
+		case <-timer.C:
+			t.Fatal("deadlock! Read() did not return.")
+		case rerr := <-rd:
+			if rerr != nil {
+				t.Fatal(rerr)
+			}
+		}
+	}
+}
+
 // Write and read in different speeds/chunk sizes and check valid data is read.
 func TestBytesPipeWriteRandomChunks(t *testing.T) {
 	cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{