Przeglądaj źródła

Merge pull request #17877 from aaronlehmann/capped-bytespipe

Cap the amount of buffering done by BytesPipe
unclejack 9 lat temu
rodzic
commit
13963957eb

+ 6 - 6
daemon/container.go

@@ -242,15 +242,15 @@ func (streamConfig *streamConfig) StdinPipe() io.WriteCloser {
 }
 
 func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
-	reader, writer := io.Pipe()
-	streamConfig.stdout.Add(writer)
-	return ioutils.NewBufReader(reader)
+	bytesPipe := ioutils.NewBytesPipe(nil)
+	streamConfig.stdout.Add(bytesPipe)
+	return bytesPipe
 }
 
 func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
-	reader, writer := io.Pipe()
-	streamConfig.stderr.Add(writer)
-	return ioutils.NewBufReader(reader)
+	bytesPipe := ioutils.NewBytesPipe(nil)
+	streamConfig.stderr.Add(bytesPipe)
+	return bytesPipe
 }
 
 // ExitOnNext signals to the monitor that it should not restart the container

+ 72 - 9
pkg/ioutils/bytespipe.go

@@ -1,16 +1,32 @@
 package ioutils
 
+import (
+	"errors"
+	"io"
+	"sync"
+)
+
+// maxCap is the highest capacity to use in byte slices that buffer data.
 const maxCap = 1e6
 
-// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
-// All written data could be read only once. Also BytesPipe is allocating
-// and releasing new byte slices to adjust to current needs, so there won't be
-// overgrown buffer after high load peak.
-// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
+// blockThreshold is the minimum number of bytes in the buffer which will cause
+// a write to BytesPipe to block when allocating a new slice.
+const blockThreshold = 1e6
+
+// ErrClosed is returned when Write is called on a closed BytesPipe.
+var ErrClosed = errors.New("write to closed BytesPipe")
+
+// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
+// All written data may be read at most once. Also, BytesPipe allocates
+// and releases new byte slices to adjust to current needs, so the buffer
+// won't be overgrown after peak loads.
 type BytesPipe struct {
+	mu       sync.Mutex
+	wait     *sync.Cond
 	buf      [][]byte // slice of byte-slices of buffered data
 	lastRead int      // index in the first slice to a read point
 	bufLen   int      // length of data buffered over the slices
+	closeErr error    // error to return from next Read. set to nil if not closed.
 }
 
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
@@ -20,15 +36,23 @@ func NewBytesPipe(buf []byte) *BytesPipe {
 	if cap(buf) == 0 {
 		buf = make([]byte, 0, 64)
 	}
-	return &BytesPipe{
+	bp := &BytesPipe{
 		buf: [][]byte{buf[:0]},
 	}
+	bp.wait = sync.NewCond(&bp.mu)
+	return bp
 }
 
 // Write writes p to BytesPipe.
 // It can allocate new []byte slices in a process of writing.
-func (bp *BytesPipe) Write(p []byte) (n int, err error) {
+func (bp *BytesPipe) Write(p []byte) (int, error) {
+	bp.mu.Lock()
+	defer bp.mu.Unlock()
+	written := 0
 	for {
+		if bp.closeErr != nil {
+			return written, ErrClosed
+		}
 		// write data to the last buffer
 		b := bp.buf[len(bp.buf)-1]
 		// copy data to the current empty allocated area
@@ -38,6 +62,8 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
 		// include written data in last buffer
 		bp.buf[len(bp.buf)-1] = b[:len(b)+n]
 
+		written += n
+
 		// if there was enough room to write all then break
 		if len(p) == n {
 			break
@@ -45,15 +71,40 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
 
 		// more data: write to the next slice
 		p = p[n:]
+
+		// block if too much data is still in the buffer
+		for bp.bufLen >= blockThreshold {
+			bp.wait.Wait()
+		}
+
 		// allocate slice that has twice the size of the last unless maximum reached
 		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
-		if maxCap < nextCap {
+		if nextCap > maxCap {
 			nextCap = maxCap
 		}
 		// add new byte slice to the buffers slice and continue writing
 		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
 	}
-	return
+	bp.wait.Broadcast()
+	return written, nil
+}
+
+// CloseWithError causes further reads from a BytesPipe to return immediately.
+func (bp *BytesPipe) CloseWithError(err error) error {
+	bp.mu.Lock()
+	if err != nil {
+		bp.closeErr = err
+	} else {
+		bp.closeErr = io.EOF
+	}
+	bp.wait.Broadcast()
+	bp.mu.Unlock()
+	return nil
+}
+
+// Close causes further reads from a BytesPipe to return immediately.
+func (bp *BytesPipe) Close() error {
+	return bp.CloseWithError(nil)
 }
 
 func (bp *BytesPipe) len() int {
@@ -63,6 +114,17 @@ func (bp *BytesPipe) len() int {
 // Read reads bytes from BytesPipe.
 // Data could be read only once.
 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
+	bp.mu.Lock()
+	defer bp.mu.Unlock()
+	if bp.len() == 0 {
+		if bp.closeErr != nil {
+			return 0, bp.closeErr
+		}
+		bp.wait.Wait()
+		if bp.len() == 0 && bp.closeErr != nil {
+			return 0, bp.closeErr
+		}
+	}
 	for {
 		read := copy(p, bp.buf[0][bp.lastRead:])
 		n += read
@@ -85,5 +147,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
 		bp.buf[0] = nil     // throw away old slice
 		bp.buf = bp.buf[1:] // switch to next
 	}
+	bp.wait.Broadcast()
 	return
 }

+ 34 - 17
pkg/ioutils/bytespipe_test.go

@@ -3,7 +3,9 @@ package ioutils
 import (
 	"crypto/sha1"
 	"encoding/hex"
+	"math/rand"
 	"testing"
+	"time"
 )
 
 func TestBytesPipeRead(t *testing.T) {
@@ -86,25 +88,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
 		// write/read through buffer
 		buf := NewBytesPipe(nil)
 		hash.Reset()
-		for i := 0; i < c.iterations; i++ {
-			for w := 0; w < c.writesPerLoop; w++ {
-				buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
-			}
-			for r := 0; r < c.readsPerLoop; r++ {
-				p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
+
+		done := make(chan struct{})
+
+		go func() {
+			// random delay before read starts
+			<-time.After(time.Duration(rand.Intn(10)) * time.Millisecond)
+			for i := 0; ; i++ {
+				p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
 				n, _ := buf.Read(p)
+				if n == 0 {
+					break
+				}
 				hash.Write(p[:n])
 			}
-		}
-		// read rest of the data from buffer
-		for i := 0; ; i++ {
-			p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
-			n, _ := buf.Read(p)
-			if n == 0 {
-				break
+
+			close(done)
+		}()
+
+		for i := 0; i < c.iterations; i++ {
+			for w := 0; w < c.writesPerLoop; w++ {
+				buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
 			}
-			hash.Write(p[:n])
 		}
+		buf.Close()
+		<-done
+
 		actual := hex.EncodeToString(hash.Sum(nil))
 
 		if expected != actual {
@@ -116,24 +125,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
 
 func BenchmarkBytesPipeWrite(b *testing.B) {
 	for i := 0; i < b.N; i++ {
+		readBuf := make([]byte, 1024)
 		buf := NewBytesPipe(nil)
+		go func() {
+			var err error
+			for err == nil {
+				_, err = buf.Read(readBuf)
+			}
+		}()
 		for j := 0; j < 1000; j++ {
 			buf.Write([]byte("pretty short line, because why not?"))
 		}
+		buf.Close()
 	}
 }
 
 func BenchmarkBytesPipeRead(b *testing.B) {
-	rd := make([]byte, 1024)
+	rd := make([]byte, 512)
 	for i := 0; i < b.N; i++ {
 		b.StopTimer()
 		buf := NewBytesPipe(nil)
-		for j := 0; j < 1000; j++ {
+		for j := 0; j < 500; j++ {
 			buf.Write(make([]byte, 1024))
 		}
 		b.StartTimer()
 		for j := 0; j < 1000; j++ {
-			if n, _ := buf.Read(rd); n != 1024 {
+			if n, _ := buf.Read(rd); n != 512 {
 				b.Fatalf("Wrong number of bytes: %d", n)
 			}
 		}

+ 0 - 87
pkg/ioutils/readers.go

@@ -4,7 +4,6 @@ import (
 	"crypto/sha256"
 	"encoding/hex"
 	"io"
-	"sync"
 )
 
 type readCloserWrapper struct {
@@ -45,92 +44,6 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
 	}
 }
 
-// bufReader allows the underlying reader to continue to produce
-// output by pre-emptively reading from the wrapped reader.
-// This is achieved by buffering this data in bufReader's
-// expanding buffer.
-type bufReader struct {
-	sync.Mutex
-	buf      io.ReadWriter
-	reader   io.Reader
-	err      error
-	wait     sync.Cond
-	drainBuf []byte
-}
-
-// NewBufReader returns a new bufReader.
-func NewBufReader(r io.Reader) io.ReadCloser {
-	reader := &bufReader{
-		buf:      NewBytesPipe(nil),
-		reader:   r,
-		drainBuf: make([]byte, 1024),
-	}
-	reader.wait.L = &reader.Mutex
-	go reader.drain()
-	return reader
-}
-
-// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
-func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
-	reader := &bufReader{
-		buf:      buffer,
-		drainBuf: drainBuffer,
-		reader:   r,
-	}
-	reader.wait.L = &reader.Mutex
-	go reader.drain()
-	return reader
-}
-
-func (r *bufReader) drain() {
-	for {
-		//Call to scheduler is made to yield from this goroutine.
-		//This avoids goroutine looping here when n=0,err=nil, fixes code hangs when run with GCC Go.
-		callSchedulerIfNecessary()
-		n, err := r.reader.Read(r.drainBuf)
-		r.Lock()
-		if err != nil {
-			r.err = err
-		} else {
-			if n == 0 {
-				// nothing written, no need to signal
-				r.Unlock()
-				continue
-			}
-			r.buf.Write(r.drainBuf[:n])
-		}
-		r.wait.Signal()
-		r.Unlock()
-		if err != nil {
-			break
-		}
-	}
-}
-
-func (r *bufReader) Read(p []byte) (n int, err error) {
-	r.Lock()
-	defer r.Unlock()
-	for {
-		n, err = r.buf.Read(p)
-		if n > 0 {
-			return n, err
-		}
-		if r.err != nil {
-			return 0, r.err
-		}
-		r.wait.Wait()
-	}
-}
-
-// Close closes the bufReader
-func (r *bufReader) Close() error {
-	closer, ok := r.reader.(io.ReadCloser)
-	if !ok {
-		return nil
-	}
-	return closer.Close()
-}
-
 // HashData returns the sha256 sum of src.
 func HashData(src io.Reader) (string, error) {
 	h := sha256.New()

+ 0 - 157
pkg/ioutils/readers_test.go

@@ -1,13 +1,9 @@
 package ioutils
 
 import (
-	"bytes"
 	"fmt"
-	"io"
-	"io/ioutil"
 	"strings"
 	"testing"
-	"time"
 )
 
 // Implement io.Reader
@@ -58,101 +54,6 @@ func TestReaderErrWrapperRead(t *testing.T) {
 	}
 }
 
-func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
-	reader, writer := io.Pipe()
-
-	drainBuffer := make([]byte, 1024)
-	buffer := NewBytesPipe(nil)
-	bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
-
-	// Write everything down to a Pipe
-	// Usually, a pipe should block but because of the buffered reader,
-	// the writes will go through
-	done := make(chan bool)
-	go func() {
-		writer.Write([]byte("hello world"))
-		writer.Close()
-		done <- true
-	}()
-
-	// Drain the reader *after* everything has been written, just to verify
-	// it is indeed buffering
-	select {
-	case <-done:
-	case <-time.After(1 * time.Second):
-		t.Fatal("timeout")
-	}
-
-	output, err := ioutil.ReadAll(bufreader)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(output, []byte("hello world")) {
-		t.Error(string(output))
-	}
-}
-
-func TestBufReader(t *testing.T) {
-	reader, writer := io.Pipe()
-	bufreader := NewBufReader(reader)
-
-	// Write everything down to a Pipe
-	// Usually, a pipe should block but because of the buffered reader,
-	// the writes will go through
-	done := make(chan bool)
-	go func() {
-		writer.Write([]byte("hello world"))
-		writer.Close()
-		done <- true
-	}()
-
-	// Drain the reader *after* everything has been written, just to verify
-	// it is indeed buffering
-	<-done
-	output, err := ioutil.ReadAll(bufreader)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(output, []byte("hello world")) {
-		t.Error(string(output))
-	}
-}
-
-func TestBufReaderCloseWithNonReaderCloser(t *testing.T) {
-	reader := strings.NewReader("buffer")
-	bufreader := NewBufReader(reader)
-
-	if err := bufreader.Close(); err != nil {
-		t.Fatal(err)
-	}
-
-}
-
-// implements io.ReadCloser
-type simpleReaderCloser struct {
-	err error
-}
-
-func (r *simpleReaderCloser) Read(p []byte) (n int, err error) {
-	return 0, r.err
-}
-
-func (r *simpleReaderCloser) Close() error {
-	r.err = io.EOF
-	return nil
-}
-
-func TestBufReaderCloseWithReaderCloser(t *testing.T) {
-	reader := &simpleReaderCloser{}
-	bufreader := NewBufReader(reader)
-
-	err := bufreader.Close()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-}
-
 func TestHashData(t *testing.T) {
 	reader := strings.NewReader("hash-me")
 	actual, err := HashData(reader)
@@ -164,61 +65,3 @@ func TestHashData(t *testing.T) {
 		t.Fatalf("Expecting %s, got %s", expected, actual)
 	}
 }
-
-type repeatedReader struct {
-	readCount int
-	maxReads  int
-	data      []byte
-}
-
-func newRepeatedReader(max int, data []byte) *repeatedReader {
-	return &repeatedReader{0, max, data}
-}
-
-func (r *repeatedReader) Read(p []byte) (int, error) {
-	if r.readCount >= r.maxReads {
-		return 0, io.EOF
-	}
-	r.readCount++
-	n := copy(p, r.data)
-	return n, nil
-}
-
-func testWithData(data []byte, reads int) {
-	reader := newRepeatedReader(reads, data)
-	bufReader := NewBufReader(reader)
-	io.Copy(ioutil.Discard, bufReader)
-}
-
-func Benchmark1M10BytesReads(b *testing.B) {
-	reads := 1000000
-	readSize := int64(10)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}
-
-func Benchmark1M1024BytesReads(b *testing.B) {
-	reads := 1000000
-	readSize := int64(1024)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}
-
-func Benchmark10k32KBytesReads(b *testing.B) {
-	reads := 10000
-	readSize := int64(32 * 1024)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}