소스 검색

pkg/ioutils: avoid huge Buffer growth in bufreader

Signed-off-by: Cristian Staretu <cristian.staretu@gmail.com>
unclejack 10 년 전
부모
커밋
e5ea2b2357
1개의 변경된 파일111개의 추가작업 그리고 8개의 파일을 삭제
  1. 111 8
      pkg/ioutils/readers.go

+ 111 - 8
pkg/ioutils/readers.go

@@ -2,8 +2,11 @@ package ioutils
 
 
 import (
 import (
 	"bytes"
 	"bytes"
+	"crypto/rand"
 	"io"
 	"io"
+	"math/big"
 	"sync"
 	"sync"
+	"time"
 )
 )
 
 
 type readCloserWrapper struct {
 type readCloserWrapper struct {
@@ -42,20 +45,40 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
 	}
 	}
 }
 }
 
 
+// bufReader allows the underlying reader to continue to produce
+// output by pre-emptively reading from the wrapped reader.
+// This is achieved by buffering this data in bufReader's
+// expanding buffer.
 type bufReader struct {
 type bufReader struct {
 	sync.Mutex
 	sync.Mutex
-	buf      *bytes.Buffer
-	reader   io.Reader
-	err      error
-	wait     sync.Cond
-	drainBuf []byte
+	buf                  *bytes.Buffer
+	reader               io.Reader
+	err                  error
+	wait                 sync.Cond
+	drainBuf             []byte
+	reuseBuf             []byte
+	maxReuse             int64
+	resetTimeout         time.Duration
+	bufLenResetThreshold int64
+	maxReadDataReset     int64
 }
 }
 
 
 func NewBufReader(r io.Reader) *bufReader {
 func NewBufReader(r io.Reader) *bufReader {
+	var timeout int
+	if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil {
+		timeout = int(randVal.Int64()) + 180
+	} else {
+		timeout = 300
+	}
 	reader := &bufReader{
 	reader := &bufReader{
-		buf:      &bytes.Buffer{},
-		drainBuf: make([]byte, 1024),
-		reader:   r,
+		buf:                  &bytes.Buffer{},
+		drainBuf:             make([]byte, 1024),
+		reuseBuf:             make([]byte, 4096),
+		maxReuse:             1000,
+		resetTimeout:         time.Second * time.Duration(timeout),
+		bufLenResetThreshold: 100 * 1024,
+		maxReadDataReset:     10 * 1024 * 1024,
+		reader:               r,
 	}
 	}
 	reader.wait.L = &reader.Mutex
 	reader.wait.L = &reader.Mutex
 	go reader.drain()
 	go reader.drain()
@@ -74,14 +97,94 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *
 }
 }
 
 
 func (r *bufReader) drain() {
 func (r *bufReader) drain() {
+	var (
+		duration       time.Duration
+		lastReset      time.Time
+		now            time.Time
+		reset          bool
+		bufLen         int64
+		dataSinceReset int64
+		maxBufLen      int64
+		reuseBufLen    int64
+		reuseCount     int64
+	)
+	reuseBufLen = int64(len(r.reuseBuf))
+	lastReset = time.Now()
 	for {
 	for {
 		n, err := r.reader.Read(r.drainBuf)
 		n, err := r.reader.Read(r.drainBuf)
+		dataSinceReset += int64(n)
 		r.Lock()
 		r.Lock()
+		bufLen = int64(r.buf.Len())
+		if bufLen > maxBufLen {
+			maxBufLen = bufLen
+		}
+
+		// Avoid unbounded growth of the buffer over time.
+		// This has been discovered to be the only non-intrusive
+		// solution to the unbounded growth of the buffer.
+		// Alternative solutions such as compression, multiple
+		// buffers, channels and other similar pieces of code
+		// were reducing throughput, overall Docker performance
+		// or simply crashed Docker.
+		// This solution releases the buffer when specific
+		// conditions are met to avoid the continuous resizing
+		// of the buffer for long lived containers.
+		//
+		// Move data to the front of the buffer if it's
+		// smaller than what reuseBuf can store
+		if bufLen > 0 && reuseBufLen >= bufLen {
+			n, _ := r.buf.Read(r.reuseBuf)
+			r.buf.Write(r.reuseBuf[0:n])
+			// Take action if the buffer has been reused too many
+			// times and if there's data in the buffer.
+			// The timeout is also used as means to avoid doing
+			// these operations more often or less often than
+			// required.
+			// The various conditions try to detect heavy activity
+			// in the buffer which might be indicators of heavy
+			// growth of the buffer.
+		} else if reuseCount >= r.maxReuse && bufLen > 0 {
+			now = time.Now()
+			duration = now.Sub(lastReset)
+			timeoutReached := duration >= r.resetTimeout
+
+			// The timeout has been reached and the
+			// buffered data couldn't be moved to the front
+			// of the buffer, so the buffer gets reset.
+			if timeoutReached && bufLen > reuseBufLen {
+				reset = true
+			}
+			// The amount of buffered data is too high now,
+			// reset the buffer.
+			if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
+				reset = true
+			}
+			// Reset the buffer if a certain amount of
+			// data has gone through the buffer since the
+			// last reset.
+			if timeoutReached && dataSinceReset >= r.maxReadDataReset {
+				reset = true
+			}
+			// The buffered data is moved to a fresh buffer,
+			// swap the old buffer with the new one and
+			// reset all counters.
+			if reset {
+				newbuf := &bytes.Buffer{}
+				newbuf.ReadFrom(r.buf)
+				r.buf = newbuf
+				lastReset = now
+				reset = false
+				dataSinceReset = 0
+				maxBufLen = 0
+				reuseCount = 0
+			}
+		}
 		if err != nil {
 		if err != nil {
 			r.err = err
 			r.err = err
 		} else {
 		} else {
 			r.buf.Write(r.drainBuf[0:n])
 			r.buf.Write(r.drainBuf[0:n])
 		}
 		}
+		reuseCount++
 		r.wait.Signal()
 		r.wait.Signal()
 		r.Unlock()
 		r.Unlock()
 		if err != nil {
 		if err != nil {