Browse Source

Cap the amount of buffering done by BytesPipe

Turn BytesPipe's Read and Write functions into blocking, goroutine-safe
functions. Add a CloseWithError function to propagate an error code to
the Read function.

Adjust tests to work with the blocking Read and Write functions.

Remove BufReader, since now its users can use BytesPipe directly.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
Aaron Lehmann 9 years ago
parent
commit
64f8ee444d
5 changed files with 112 additions and 276 deletions
  1. 6 6
      daemon/container.go
  2. 72 9
      pkg/ioutils/bytespipe.go
  3. 34 17
      pkg/ioutils/bytespipe_test.go
  4. 0 87
      pkg/ioutils/readers.go
  5. 0 157
      pkg/ioutils/readers_test.go

+ 6 - 6
daemon/container.go

@@ -244,15 +244,15 @@ func (streamConfig *streamConfig) StdinPipe() io.WriteCloser {
 }
 }
 
 
 func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
 func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
-	reader, writer := io.Pipe()
-	streamConfig.stdout.Add(writer)
-	return ioutils.NewBufReader(reader)
+	bytesPipe := ioutils.NewBytesPipe(nil)
+	streamConfig.stdout.Add(bytesPipe)
+	return bytesPipe
 }
 }
 
 
 func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
 func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
-	reader, writer := io.Pipe()
-	streamConfig.stderr.Add(writer)
-	return ioutils.NewBufReader(reader)
+	bytesPipe := ioutils.NewBytesPipe(nil)
+	streamConfig.stderr.Add(bytesPipe)
+	return bytesPipe
 }
 }
 
 
 // ExitOnNext signals to the monitor that it should not restart the container
 // ExitOnNext signals to the monitor that it should not restart the container

+ 72 - 9
pkg/ioutils/bytespipe.go

@@ -1,16 +1,32 @@
 package ioutils
 package ioutils
 
 
+import (
+	"errors"
+	"io"
+	"sync"
+)
+
+// maxCap is the highest capacity to use in byte slices that buffer data.
 const maxCap = 1e6
 const maxCap = 1e6
 
 
-// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
-// All written data could be read only once. Also BytesPipe is allocating
-// and releasing new byte slices to adjust to current needs, so there won't be
-// overgrown buffer after high load peak.
-// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
+// blockThreshold is the minimum number of bytes in the buffer which will cause
+// a write to BytesPipe to block when allocating a new slice.
+const blockThreshold = 1e6
+
+// ErrClosed is returned when Write is called on a closed BytesPipe.
+var ErrClosed = errors.New("write to closed BytesPipe")
+
+// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
+// All written data may be read at most once. Also, BytesPipe allocates
+// and releases new byte slices to adjust to current needs, so the buffer
+// won't be overgrown after peak loads.
 type BytesPipe struct {
 type BytesPipe struct {
+	mu       sync.Mutex
+	wait     *sync.Cond
 	buf      [][]byte // slice of byte-slices of buffered data
 	buf      [][]byte // slice of byte-slices of buffered data
 	lastRead int      // index in the first slice to a read point
 	lastRead int      // index in the first slice to a read point
 	bufLen   int      // length of data buffered over the slices
 	bufLen   int      // length of data buffered over the slices
+	closeErr error    // error to return from next Read. set to nil if not closed.
 }
 }
 
 
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
@@ -20,15 +36,23 @@ func NewBytesPipe(buf []byte) *BytesPipe {
 	if cap(buf) == 0 {
 	if cap(buf) == 0 {
 		buf = make([]byte, 0, 64)
 		buf = make([]byte, 0, 64)
 	}
 	}
-	return &BytesPipe{
+	bp := &BytesPipe{
 		buf: [][]byte{buf[:0]},
 		buf: [][]byte{buf[:0]},
 	}
 	}
+	bp.wait = sync.NewCond(&bp.mu)
+	return bp
 }
 }
 
 
 // Write writes p to BytesPipe.
 // Write writes p to BytesPipe.
 // It can allocate new []byte slices in a process of writing.
 // It can allocate new []byte slices in a process of writing.
-func (bp *BytesPipe) Write(p []byte) (n int, err error) {
+func (bp *BytesPipe) Write(p []byte) (int, error) {
+	bp.mu.Lock()
+	defer bp.mu.Unlock()
+	written := 0
 	for {
 	for {
+		if bp.closeErr != nil {
+			return written, ErrClosed
+		}
 		// write data to the last buffer
 		// write data to the last buffer
 		b := bp.buf[len(bp.buf)-1]
 		b := bp.buf[len(bp.buf)-1]
 		// copy data to the current empty allocated area
 		// copy data to the current empty allocated area
@@ -38,6 +62,8 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
 		// include written data in last buffer
 		// include written data in last buffer
 		bp.buf[len(bp.buf)-1] = b[:len(b)+n]
 		bp.buf[len(bp.buf)-1] = b[:len(b)+n]
 
 
+		written += n
+
 		// if there was enough room to write all then break
 		// if there was enough room to write all then break
 		if len(p) == n {
 		if len(p) == n {
 			break
 			break
@@ -45,15 +71,40 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
 
 
 		// more data: write to the next slice
 		// more data: write to the next slice
 		p = p[n:]
 		p = p[n:]
+
+		// block if too much data is still in the buffer
+		for bp.bufLen >= blockThreshold {
+			bp.wait.Wait()
+		}
+
 		// allocate slice that has twice the size of the last unless maximum reached
 		// allocate slice that has twice the size of the last unless maximum reached
 		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
 		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
-		if maxCap < nextCap {
+		if nextCap > maxCap {
 			nextCap = maxCap
 			nextCap = maxCap
 		}
 		}
 		// add new byte slice to the buffers slice and continue writing
 		// add new byte slice to the buffers slice and continue writing
 		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
 		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
 	}
 	}
-	return
+	bp.wait.Broadcast()
+	return written, nil
+}
+
+// CloseWithError causes further reads from a BytesPipe to return immediately.
+func (bp *BytesPipe) CloseWithError(err error) error {
+	bp.mu.Lock()
+	if err != nil {
+		bp.closeErr = err
+	} else {
+		bp.closeErr = io.EOF
+	}
+	bp.wait.Broadcast()
+	bp.mu.Unlock()
+	return nil
+}
+
+// Close causes further reads from a BytesPipe to return immediately.
+func (bp *BytesPipe) Close() error {
+	return bp.CloseWithError(nil)
 }
 }
 
 
 func (bp *BytesPipe) len() int {
 func (bp *BytesPipe) len() int {
@@ -63,6 +114,17 @@ func (bp *BytesPipe) len() int {
 // Read reads bytes from BytesPipe.
 // Read reads bytes from BytesPipe.
 // Data could be read only once.
 // Data could be read only once.
 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
+	bp.mu.Lock()
+	defer bp.mu.Unlock()
+	if bp.len() == 0 {
+		if bp.closeErr != nil {
+			return 0, bp.closeErr
+		}
+		bp.wait.Wait()
+		if bp.len() == 0 && bp.closeErr != nil {
+			return 0, bp.closeErr
+		}
+	}
 	for {
 	for {
 		read := copy(p, bp.buf[0][bp.lastRead:])
 		read := copy(p, bp.buf[0][bp.lastRead:])
 		n += read
 		n += read
@@ -85,5 +147,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
 		bp.buf[0] = nil     // throw away old slice
 		bp.buf[0] = nil     // throw away old slice
 		bp.buf = bp.buf[1:] // switch to next
 		bp.buf = bp.buf[1:] // switch to next
 	}
 	}
+	bp.wait.Broadcast()
 	return
 	return
 }
 }

+ 34 - 17
pkg/ioutils/bytespipe_test.go

@@ -3,7 +3,9 @@ package ioutils
 import (
 import (
 	"crypto/sha1"
 	"crypto/sha1"
 	"encoding/hex"
 	"encoding/hex"
+	"math/rand"
 	"testing"
 	"testing"
+	"time"
 )
 )
 
 
 func TestBytesPipeRead(t *testing.T) {
 func TestBytesPipeRead(t *testing.T) {
@@ -86,25 +88,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
 		// write/read through buffer
 		// write/read through buffer
 		buf := NewBytesPipe(nil)
 		buf := NewBytesPipe(nil)
 		hash.Reset()
 		hash.Reset()
-		for i := 0; i < c.iterations; i++ {
-			for w := 0; w < c.writesPerLoop; w++ {
-				buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
-			}
-			for r := 0; r < c.readsPerLoop; r++ {
-				p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
+
+		done := make(chan struct{})
+
+		go func() {
+			// random delay before read starts
+			<-time.After(time.Duration(rand.Intn(10)) * time.Millisecond)
+			for i := 0; ; i++ {
+				p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
 				n, _ := buf.Read(p)
 				n, _ := buf.Read(p)
+				if n == 0 {
+					break
+				}
 				hash.Write(p[:n])
 				hash.Write(p[:n])
 			}
 			}
-		}
-		// read rest of the data from buffer
-		for i := 0; ; i++ {
-			p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
-			n, _ := buf.Read(p)
-			if n == 0 {
-				break
+
+			close(done)
+		}()
+
+		for i := 0; i < c.iterations; i++ {
+			for w := 0; w < c.writesPerLoop; w++ {
+				buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
 			}
 			}
-			hash.Write(p[:n])
 		}
 		}
+		buf.Close()
+		<-done
+
 		actual := hex.EncodeToString(hash.Sum(nil))
 		actual := hex.EncodeToString(hash.Sum(nil))
 
 
 		if expected != actual {
 		if expected != actual {
@@ -116,24 +125,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
 
 
 func BenchmarkBytesPipeWrite(b *testing.B) {
 func BenchmarkBytesPipeWrite(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
+		readBuf := make([]byte, 1024)
 		buf := NewBytesPipe(nil)
 		buf := NewBytesPipe(nil)
+		go func() {
+			var err error
+			for err == nil {
+				_, err = buf.Read(readBuf)
+			}
+		}()
 		for j := 0; j < 1000; j++ {
 		for j := 0; j < 1000; j++ {
 			buf.Write([]byte("pretty short line, because why not?"))
 			buf.Write([]byte("pretty short line, because why not?"))
 		}
 		}
+		buf.Close()
 	}
 	}
 }
 }
 
 
 func BenchmarkBytesPipeRead(b *testing.B) {
 func BenchmarkBytesPipeRead(b *testing.B) {
-	rd := make([]byte, 1024)
+	rd := make([]byte, 512)
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
 		b.StopTimer()
 		b.StopTimer()
 		buf := NewBytesPipe(nil)
 		buf := NewBytesPipe(nil)
-		for j := 0; j < 1000; j++ {
+		for j := 0; j < 500; j++ {
 			buf.Write(make([]byte, 1024))
 			buf.Write(make([]byte, 1024))
 		}
 		}
 		b.StartTimer()
 		b.StartTimer()
 		for j := 0; j < 1000; j++ {
 		for j := 0; j < 1000; j++ {
-			if n, _ := buf.Read(rd); n != 1024 {
+			if n, _ := buf.Read(rd); n != 512 {
 				b.Fatalf("Wrong number of bytes: %d", n)
 				b.Fatalf("Wrong number of bytes: %d", n)
 			}
 			}
 		}
 		}

+ 0 - 87
pkg/ioutils/readers.go

@@ -4,7 +4,6 @@ import (
 	"crypto/sha256"
 	"crypto/sha256"
 	"encoding/hex"
 	"encoding/hex"
 	"io"
 	"io"
-	"sync"
 )
 )
 
 
 type readCloserWrapper struct {
 type readCloserWrapper struct {
@@ -45,92 +44,6 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
 	}
 	}
 }
 }
 
 
-// bufReader allows the underlying reader to continue to produce
-// output by pre-emptively reading from the wrapped reader.
-// This is achieved by buffering this data in bufReader's
-// expanding buffer.
-type bufReader struct {
-	sync.Mutex
-	buf      io.ReadWriter
-	reader   io.Reader
-	err      error
-	wait     sync.Cond
-	drainBuf []byte
-}
-
-// NewBufReader returns a new bufReader.
-func NewBufReader(r io.Reader) io.ReadCloser {
-	reader := &bufReader{
-		buf:      NewBytesPipe(nil),
-		reader:   r,
-		drainBuf: make([]byte, 1024),
-	}
-	reader.wait.L = &reader.Mutex
-	go reader.drain()
-	return reader
-}
-
-// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
-func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
-	reader := &bufReader{
-		buf:      buffer,
-		drainBuf: drainBuffer,
-		reader:   r,
-	}
-	reader.wait.L = &reader.Mutex
-	go reader.drain()
-	return reader
-}
-
-func (r *bufReader) drain() {
-	for {
-		//Call to scheduler is made to yield from this goroutine.
-		//This avoids goroutine looping here when n=0,err=nil, fixes code hangs when run with GCC Go.
-		callSchedulerIfNecessary()
-		n, err := r.reader.Read(r.drainBuf)
-		r.Lock()
-		if err != nil {
-			r.err = err
-		} else {
-			if n == 0 {
-				// nothing written, no need to signal
-				r.Unlock()
-				continue
-			}
-			r.buf.Write(r.drainBuf[:n])
-		}
-		r.wait.Signal()
-		r.Unlock()
-		if err != nil {
-			break
-		}
-	}
-}
-
-func (r *bufReader) Read(p []byte) (n int, err error) {
-	r.Lock()
-	defer r.Unlock()
-	for {
-		n, err = r.buf.Read(p)
-		if n > 0 {
-			return n, err
-		}
-		if r.err != nil {
-			return 0, r.err
-		}
-		r.wait.Wait()
-	}
-}
-
-// Close closes the bufReader
-func (r *bufReader) Close() error {
-	closer, ok := r.reader.(io.ReadCloser)
-	if !ok {
-		return nil
-	}
-	return closer.Close()
-}
-
 // HashData returns the sha256 sum of src.
 // HashData returns the sha256 sum of src.
 func HashData(src io.Reader) (string, error) {
 func HashData(src io.Reader) (string, error) {
 	h := sha256.New()
 	h := sha256.New()

+ 0 - 157
pkg/ioutils/readers_test.go

@@ -1,13 +1,9 @@
 package ioutils
 package ioutils
 
 
 import (
 import (
-	"bytes"
 	"fmt"
 	"fmt"
-	"io"
-	"io/ioutil"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
-	"time"
 )
 )
 
 
 // Implement io.Reader
 // Implement io.Reader
@@ -58,101 +54,6 @@ func TestReaderErrWrapperRead(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
-	reader, writer := io.Pipe()
-
-	drainBuffer := make([]byte, 1024)
-	buffer := NewBytesPipe(nil)
-	bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
-
-	// Write everything down to a Pipe
-	// Usually, a pipe should block but because of the buffered reader,
-	// the writes will go through
-	done := make(chan bool)
-	go func() {
-		writer.Write([]byte("hello world"))
-		writer.Close()
-		done <- true
-	}()
-
-	// Drain the reader *after* everything has been written, just to verify
-	// it is indeed buffering
-	select {
-	case <-done:
-	case <-time.After(1 * time.Second):
-		t.Fatal("timeout")
-	}
-
-	output, err := ioutil.ReadAll(bufreader)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(output, []byte("hello world")) {
-		t.Error(string(output))
-	}
-}
-
-func TestBufReader(t *testing.T) {
-	reader, writer := io.Pipe()
-	bufreader := NewBufReader(reader)
-
-	// Write everything down to a Pipe
-	// Usually, a pipe should block but because of the buffered reader,
-	// the writes will go through
-	done := make(chan bool)
-	go func() {
-		writer.Write([]byte("hello world"))
-		writer.Close()
-		done <- true
-	}()
-
-	// Drain the reader *after* everything has been written, just to verify
-	// it is indeed buffering
-	<-done
-	output, err := ioutil.ReadAll(bufreader)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(output, []byte("hello world")) {
-		t.Error(string(output))
-	}
-}
-
-func TestBufReaderCloseWithNonReaderCloser(t *testing.T) {
-	reader := strings.NewReader("buffer")
-	bufreader := NewBufReader(reader)
-
-	if err := bufreader.Close(); err != nil {
-		t.Fatal(err)
-	}
-
-}
-
-// implements io.ReadCloser
-type simpleReaderCloser struct {
-	err error
-}
-
-func (r *simpleReaderCloser) Read(p []byte) (n int, err error) {
-	return 0, r.err
-}
-
-func (r *simpleReaderCloser) Close() error {
-	r.err = io.EOF
-	return nil
-}
-
-func TestBufReaderCloseWithReaderCloser(t *testing.T) {
-	reader := &simpleReaderCloser{}
-	bufreader := NewBufReader(reader)
-
-	err := bufreader.Close()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-}
-
 func TestHashData(t *testing.T) {
 func TestHashData(t *testing.T) {
 	reader := strings.NewReader("hash-me")
 	reader := strings.NewReader("hash-me")
 	actual, err := HashData(reader)
 	actual, err := HashData(reader)
@@ -164,61 +65,3 @@ func TestHashData(t *testing.T) {
 		t.Fatalf("Expecting %s, got %s", expected, actual)
 		t.Fatalf("Expecting %s, got %s", expected, actual)
 	}
 	}
 }
 }
-
-type repeatedReader struct {
-	readCount int
-	maxReads  int
-	data      []byte
-}
-
-func newRepeatedReader(max int, data []byte) *repeatedReader {
-	return &repeatedReader{0, max, data}
-}
-
-func (r *repeatedReader) Read(p []byte) (int, error) {
-	if r.readCount >= r.maxReads {
-		return 0, io.EOF
-	}
-	r.readCount++
-	n := copy(p, r.data)
-	return n, nil
-}
-
-func testWithData(data []byte, reads int) {
-	reader := newRepeatedReader(reads, data)
-	bufReader := NewBufReader(reader)
-	io.Copy(ioutil.Discard, bufReader)
-}
-
-func Benchmark1M10BytesReads(b *testing.B) {
-	reads := 1000000
-	readSize := int64(10)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}
-
-func Benchmark1M1024BytesReads(b *testing.B) {
-	reads := 1000000
-	readSize := int64(1024)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}
-
-func Benchmark10k32KBytesReads(b *testing.B) {
-	reads := 10000
-	readSize := int64(32 * 1024)
-	data := make([]byte, readSize)
-	b.SetBytes(readSize * int64(reads))
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		testWithData(data, reads)
-	}
-}