瀏覽代碼

Merge pull request #7542 from unclejack/refactor_archive_buffering

Refactor archive buffering
Alexandr Morozov 11 年之前
父節點
當前提交
58dc474e65

+ 31 - 11
archive/archive.go

@@ -19,6 +19,7 @@ import (
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
 
 
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
+	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/utils"
 	"github.com/docker/docker/utils"
 )
 )
@@ -80,7 +81,8 @@ func xzDecompress(archive io.Reader) (io.ReadCloser, error) {
 }
 }
 
 
 func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
 func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
-	buf := bufio.NewReader(archive)
+	p := pools.BufioReader32KPool
+	buf := p.Get(archive)
 	bs, err := buf.Peek(10)
 	bs, err := buf.Peek(10)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -88,28 +90,44 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
 	log.Debugf("[tar autodetect] n: %v", bs)
 	log.Debugf("[tar autodetect] n: %v", bs)
 
 
 	compression := DetectCompression(bs)
 	compression := DetectCompression(bs)
-
 	switch compression {
 	switch compression {
 	case Uncompressed:
 	case Uncompressed:
-		return ioutil.NopCloser(buf), nil
+		readBufWrapper := p.NewReadCloserWrapper(buf, buf)
+		return readBufWrapper, nil
 	case Gzip:
 	case Gzip:
-		return gzip.NewReader(buf)
+		gzReader, err := gzip.NewReader(buf)
+		if err != nil {
+			return nil, err
+		}
+		readBufWrapper := p.NewReadCloserWrapper(buf, gzReader)
+		return readBufWrapper, nil
 	case Bzip2:
 	case Bzip2:
-		return ioutil.NopCloser(bzip2.NewReader(buf)), nil
+		bz2Reader := bzip2.NewReader(buf)
+		readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
+		return readBufWrapper, nil
 	case Xz:
 	case Xz:
-		return xzDecompress(buf)
+		xzReader, err := xzDecompress(buf)
+		if err != nil {
+			return nil, err
+		}
+		readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
+		return readBufWrapper, nil
 	default:
 	default:
 		return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
 		return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
 	}
 	}
 }
 }
 
 
 func CompressStream(dest io.WriteCloser, compression Compression) (io.WriteCloser, error) {
 func CompressStream(dest io.WriteCloser, compression Compression) (io.WriteCloser, error) {
-
+	p := pools.BufioWriter32KPool
+	buf := p.Get(dest)
 	switch compression {
 	switch compression {
 	case Uncompressed:
 	case Uncompressed:
-		return utils.NopWriteCloser(dest), nil
+		writeBufWrapper := p.NewWriteCloserWrapper(buf, buf)
+		return writeBufWrapper, nil
 	case Gzip:
 	case Gzip:
-		return gzip.NewWriter(dest), nil
+		gzWriter := gzip.NewWriter(dest)
+		writeBufWrapper := p.NewWriteCloserWrapper(buf, gzWriter)
+		return writeBufWrapper, nil
 	case Bzip2, Xz:
 	case Bzip2, Xz:
 		// archive/bzip2 does not support writing, and there is no xz support at all
 		// archive/bzip2 does not support writing, and there is no xz support at all
 		// However, this is not a problem as docker only currently generates gzipped tars
 		// However, this is not a problem as docker only currently generates gzipped tars
@@ -337,7 +355,8 @@ func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error)
 			options.Includes = []string{"."}
 			options.Includes = []string{"."}
 		}
 		}
 
 
-		twBuf := bufio.NewWriterSize(nil, twBufSize)
+		twBuf := pools.BufioWriter32KPool.Get(nil)
+		defer pools.BufioWriter32KPool.Put(twBuf)
 
 
 		for _, include := range options.Includes {
 		for _, include := range options.Includes {
 			filepath.Walk(filepath.Join(srcPath, include), func(filePath string, f os.FileInfo, err error) error {
 			filepath.Walk(filepath.Join(srcPath, include), func(filePath string, f os.FileInfo, err error) error {
@@ -411,7 +430,8 @@ func Untar(archive io.Reader, dest string, options *TarOptions) error {
 	defer decompressedArchive.Close()
 	defer decompressedArchive.Close()
 
 
 	tr := tar.NewReader(decompressedArchive)
 	tr := tar.NewReader(decompressedArchive)
-	trBuf := bufio.NewReaderSize(nil, trBufSize)
+	trBuf := pools.BufioReader32KPool.Get(nil)
+	defer pools.BufioReader32KPool.Put(trBuf)
 
 
 	var dirs []*tar.Header
 	var dirs []*tar.Header
 
 

+ 3 - 2
archive/changes.go

@@ -1,7 +1,6 @@
 package archive
 package archive
 
 
 import (
 import (
-	"bufio"
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
@@ -14,6 +13,7 @@ import (
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
 
 
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
+	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/pkg/system"
 )
 )
 
 
@@ -345,7 +345,8 @@ func ExportChanges(dir string, changes []Change) (Archive, error) {
 	tw := tar.NewWriter(writer)
 	tw := tar.NewWriter(writer)
 
 
 	go func() {
 	go func() {
-		twBuf := bufio.NewWriterSize(nil, twBufSize)
+		twBuf := pools.BufioWriter32KPool.Get(nil)
+		defer pools.BufioWriter32KPool.Put(twBuf)
 		// In general we log errors here but ignore them because
 		// In general we log errors here but ignore them because
 		// during e.g. a diff operation the container can continue
 		// during e.g. a diff operation the container can continue
 		// mutating the filesystem and we can see transient errors
 		// mutating the filesystem and we can see transient errors

+ 0 - 4
archive/common.go

@@ -1,4 +0,0 @@
-package archive
-
-const twBufSize = 32 * 1024
-const trBufSize = 32 * 1024

+ 4 - 2
archive/diff.go

@@ -1,7 +1,6 @@
 package archive
 package archive
 
 
 import (
 import (
-	"bufio"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
@@ -11,6 +10,8 @@ import (
 	"syscall"
 	"syscall"
 
 
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
 	"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
+
+	"github.com/docker/docker/pkg/pools"
 )
 )
 
 
 // Linux device nodes are a bit weird due to backwards compat with 16 bit device nodes.
 // Linux device nodes are a bit weird due to backwards compat with 16 bit device nodes.
@@ -33,7 +34,8 @@ func ApplyLayer(dest string, layer ArchiveReader) error {
 	}
 	}
 
 
 	tr := tar.NewReader(layer)
 	tr := tar.NewReader(layer)
-	trBuf := bufio.NewReaderSize(nil, trBufSize)
+	trBuf := pools.BufioReader32KPool.Get(tr)
+	defer pools.BufioReader32KPool.Put(trBuf)
 
 
 	var dirs []*tar.Header
 	var dirs []*tar.Header
 
 

+ 3 - 2
daemon/attach.go

@@ -8,6 +8,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/docker/docker/engine"
 	"github.com/docker/docker/engine"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/utils"
 	"github.com/docker/docker/utils"
@@ -195,7 +196,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo
 			if cStdout, err := container.StdoutPipe(); err != nil {
 			if cStdout, err := container.StdoutPipe(); err != nil {
 				log.Errorf("attach: stdout pipe: %s", err)
 				log.Errorf("attach: stdout pipe: %s", err)
 			} else {
 			} else {
-				io.Copy(&utils.NopWriter{}, cStdout)
+				io.Copy(&ioutils.NopWriter{}, cStdout)
 			}
 			}
 		}()
 		}()
 	}
 	}
@@ -234,7 +235,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo
 			if cStderr, err := container.StderrPipe(); err != nil {
 			if cStderr, err := container.StderrPipe(); err != nil {
 				log.Errorf("attach: stdout pipe: %s", err)
 				log.Errorf("attach: stdout pipe: %s", err)
 			} else {
 			} else {
-				io.Copy(&utils.NopWriter{}, cStderr)
+				io.Copy(&ioutils.NopWriter{}, cStderr)
 			}
 			}
 		}()
 		}()
 	}
 	}

+ 8 - 7
daemon/container.go

@@ -24,6 +24,7 @@ import (
 	"github.com/docker/docker/links"
 	"github.com/docker/docker/links"
 	"github.com/docker/docker/nat"
 	"github.com/docker/docker/nat"
 	"github.com/docker/docker/pkg/broadcastwriter"
 	"github.com/docker/docker/pkg/broadcastwriter"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/networkfs/etchosts"
 	"github.com/docker/docker/pkg/networkfs/etchosts"
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
@@ -366,25 +367,25 @@ func (streamConfig *StreamConfig) StdinPipe() (io.WriteCloser, error) {
 func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) {
 func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) {
 	reader, writer := io.Pipe()
 	reader, writer := io.Pipe()
 	streamConfig.stdout.AddWriter(writer, "")
 	streamConfig.stdout.AddWriter(writer, "")
-	return utils.NewBufReader(reader), nil
+	return ioutils.NewBufReader(reader), nil
 }
 }
 
 
 func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) {
 func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) {
 	reader, writer := io.Pipe()
 	reader, writer := io.Pipe()
 	streamConfig.stderr.AddWriter(writer, "")
 	streamConfig.stderr.AddWriter(writer, "")
-	return utils.NewBufReader(reader), nil
+	return ioutils.NewBufReader(reader), nil
 }
 }
 
 
 func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
 func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
 	reader, writer := io.Pipe()
 	reader, writer := io.Pipe()
 	streamConfig.stdout.AddWriter(writer, "stdout")
 	streamConfig.stdout.AddWriter(writer, "stdout")
-	return utils.NewBufReader(reader)
+	return ioutils.NewBufReader(reader)
 }
 }
 
 
 func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser {
 func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser {
 	reader, writer := io.Pipe()
 	reader, writer := io.Pipe()
 	streamConfig.stderr.AddWriter(writer, "stderr")
 	streamConfig.stderr.AddWriter(writer, "stderr")
-	return utils.NewBufReader(reader)
+	return ioutils.NewBufReader(reader)
 }
 }
 
 
 func (container *Container) buildHostnameFile() error {
 func (container *Container) buildHostnameFile() error {
@@ -655,7 +656,7 @@ func (container *Container) ExportRw() (archive.Archive, error) {
 		container.Unmount()
 		container.Unmount()
 		return nil, err
 		return nil, err
 	}
 	}
-	return utils.NewReadCloserWrapper(archive, func() error {
+	return ioutils.NewReadCloserWrapper(archive, func() error {
 			err := archive.Close()
 			err := archive.Close()
 			container.Unmount()
 			container.Unmount()
 			return err
 			return err
@@ -673,7 +674,7 @@ func (container *Container) Export() (archive.Archive, error) {
 		container.Unmount()
 		container.Unmount()
 		return nil, err
 		return nil, err
 	}
 	}
-	return utils.NewReadCloserWrapper(archive, func() error {
+	return ioutils.NewReadCloserWrapper(archive, func() error {
 			err := archive.Close()
 			err := archive.Close()
 			container.Unmount()
 			container.Unmount()
 			return err
 			return err
@@ -809,7 +810,7 @@ func (container *Container) Copy(resource string) (io.ReadCloser, error) {
 		container.Unmount()
 		container.Unmount()
 		return nil, err
 		return nil, err
 	}
 	}
-	return utils.NewReadCloserWrapper(archive, func() error {
+	return ioutils.NewReadCloserWrapper(archive, func() error {
 			err := archive.Close()
 			err := archive.Close()
 			container.Unmount()
 			container.Unmount()
 			return err
 			return err

+ 3 - 2
daemon/daemon.go

@@ -28,6 +28,7 @@ import (
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/broadcastwriter"
 	"github.com/docker/docker/pkg/broadcastwriter"
 	"github.com/docker/docker/pkg/graphdb"
 	"github.com/docker/docker/pkg/graphdb"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/namesgenerator"
 	"github.com/docker/docker/pkg/namesgenerator"
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
 	"github.com/docker/docker/pkg/networkfs/resolvconf"
@@ -201,7 +202,7 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool) err
 	if container.Config.OpenStdin {
 	if container.Config.OpenStdin {
 		container.stdin, container.stdinPipe = io.Pipe()
 		container.stdin, container.stdinPipe = io.Pipe()
 	} else {
 	} else {
-		container.stdinPipe = utils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
+		container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
 	}
 	}
 	// done
 	// done
 	daemon.containers.Add(container.ID, container)
 	daemon.containers.Add(container.ID, container)
@@ -965,7 +966,7 @@ func (daemon *Daemon) Diff(container *Container) (archive.Archive, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return utils.NewReadCloserWrapper(archive, func() error {
+	return ioutils.NewReadCloserWrapper(archive, func() error {
 		err := archive.Close()
 		err := archive.Close()
 		daemon.driver.Put(container.ID)
 		daemon.driver.Put(container.ID)
 		return err
 		return err

+ 2 - 1
engine/engine.go

@@ -10,6 +10,7 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/utils"
 	"github.com/docker/docker/utils"
 )
 )
 
 
@@ -123,7 +124,7 @@ func (eng *Engine) Job(name string, args ...string) *Job {
 		env:    &Env{},
 		env:    &Env{},
 	}
 	}
 	if eng.Logging {
 	if eng.Logging {
-		job.Stderr.Add(utils.NopWriteCloser(eng.Stderr))
+		job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
 	}
 	}
 
 
 	// Catchall is shadowed by specific Register.
 	// Catchall is shadowed by specific Register.

+ 3 - 2
image/image.go

@@ -11,6 +11,7 @@ import (
 
 
 	"github.com/docker/docker/archive"
 	"github.com/docker/docker/archive"
 	"github.com/docker/docker/daemon/graphdriver"
 	"github.com/docker/docker/daemon/graphdriver"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/utils"
 	"github.com/docker/docker/utils"
@@ -198,7 +199,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) {
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		return utils.NewReadCloserWrapper(archive, func() error {
+		return ioutils.NewReadCloserWrapper(archive, func() error {
 			err := archive.Close()
 			err := archive.Close()
 			driver.Put(img.ID)
 			driver.Put(img.ID)
 			return err
 			return err
@@ -218,7 +219,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return utils.NewReadCloserWrapper(archive, func() error {
+	return ioutils.NewReadCloserWrapper(archive, func() error {
 		err := archive.Close()
 		err := archive.Close()
 		driver.Put(img.ID)
 		driver.Put(img.ID)
 		return err
 		return err

+ 2 - 1
integration/runtime_test.go

@@ -20,6 +20,7 @@ import (
 	"github.com/docker/docker/engine"
 	"github.com/docker/docker/engine"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/nat"
 	"github.com/docker/docker/nat"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/reexec"
 	"github.com/docker/docker/reexec"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/runconfig"
@@ -141,7 +142,7 @@ func setupBaseImage() {
 	if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID {
 	if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID {
 		// Retrieve the Image
 		// Retrieve the Image
 		job = eng.Job("pull", unitTestImageName)
 		job = eng.Job("pull", unitTestImageName)
-		job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
+		job.Stdout.Add(ioutils.NopWriteCloser(os.Stdout))
 		if err := job.Run(); err != nil {
 		if err := job.Run(); err != nil {
 			log.Fatalf("Unable to pull the test image: %s", err)
 			log.Fatalf("Unable to pull the test image: %s", err)
 		}
 		}

+ 114 - 0
pkg/ioutils/readers.go

@@ -0,0 +1,114 @@
+package ioutils
+
+import (
+	"bytes"
+	"io"
+	"sync"
+)
+
+type readCloserWrapper struct {
+	io.Reader
+	closer func() error
+}
+
+func (r *readCloserWrapper) Close() error {
+	return r.closer()
+}
+
+func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
+	return &readCloserWrapper{
+		Reader: r,
+		closer: closer,
+	}
+}
+
+type readerErrWrapper struct {
+	reader io.Reader
+	closer func()
+}
+
+func (r *readerErrWrapper) Read(p []byte) (int, error) {
+	n, err := r.reader.Read(p)
+	if err != nil {
+		r.closer()
+	}
+	return n, err
+}
+
+func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
+	return &readerErrWrapper{
+		reader: r,
+		closer: closer,
+	}
+}
+
+type bufReader struct {
+	sync.Mutex
+	buf      *bytes.Buffer
+	reader   io.Reader
+	err      error
+	wait     sync.Cond
+	drainBuf []byte
+}
+
+func NewBufReader(r io.Reader) *bufReader {
+	reader := &bufReader{
+		buf:      &bytes.Buffer{},
+		drainBuf: make([]byte, 1024),
+		reader:   r,
+	}
+	reader.wait.L = &reader.Mutex
+	go reader.drain()
+	return reader
+}
+
+func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) *bufReader {
+	reader := &bufReader{
+		buf:      buffer,
+		drainBuf: drainBuffer,
+		reader:   r,
+	}
+	reader.wait.L = &reader.Mutex
+	go reader.drain()
+	return reader
+}
+
+func (r *bufReader) drain() {
+	for {
+		n, err := r.reader.Read(r.drainBuf)
+		r.Lock()
+		if err != nil {
+			r.err = err
+		} else {
+			r.buf.Write(r.drainBuf[0:n])
+		}
+		r.wait.Signal()
+		r.Unlock()
+		if err != nil {
+			break
+		}
+	}
+}
+
+func (r *bufReader) Read(p []byte) (n int, err error) {
+	r.Lock()
+	defer r.Unlock()
+	for {
+		n, err = r.buf.Read(p)
+		if n > 0 {
+			return n, err
+		}
+		if r.err != nil {
+			return 0, r.err
+		}
+		r.wait.Wait()
+	}
+}
+
+func (r *bufReader) Close() error {
+	closer, ok := r.reader.(io.ReadCloser)
+	if !ok {
+		return nil
+	}
+	return closer.Close()
+}

+ 34 - 0
pkg/ioutils/readers_test.go

@@ -0,0 +1,34 @@
+package ioutils
+
+import (
+	"bytes"
+	"io"
+	"io/ioutil"
+	"testing"
+)
+
+func TestBufReader(t *testing.T) {
+	reader, writer := io.Pipe()
+	bufreader := NewBufReader(reader)
+
+	// Write everything down to a Pipe
+	// Usually, a pipe should block but because of the buffered reader,
+	// the writes will go through
+	done := make(chan bool)
+	go func() {
+		writer.Write([]byte("hello world"))
+		writer.Close()
+		done <- true
+	}()
+
+	// Drain the reader *after* everything has been written, just to verify
+	// it is indeed buffering
+	<-done
+	output, err := ioutil.ReadAll(bufreader)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !bytes.Equal(output, []byte("hello world")) {
+		t.Error(string(output))
+	}
+}

+ 39 - 0
pkg/ioutils/writers.go

@@ -0,0 +1,39 @@
+package ioutils
+
+import "io"
+
+type NopWriter struct{}
+
+func (*NopWriter) Write(buf []byte) (int, error) {
+	return len(buf), nil
+}
+
+type nopWriteCloser struct {
+	io.Writer
+}
+
+func (w *nopWriteCloser) Close() error { return nil }
+
+func NopWriteCloser(w io.Writer) io.WriteCloser {
+	return &nopWriteCloser{w}
+}
+
+type NopFlusher struct{}
+
+func (f *NopFlusher) Flush() {}
+
+type writeCloserWrapper struct {
+	io.Writer
+	closer func() error
+}
+
+func (r *writeCloserWrapper) Close() error {
+	return r.closer()
+}
+
+func NewWriteCloserWrapper(r io.Writer, closer func() error) io.WriteCloser {
+	return &writeCloserWrapper{
+		Writer: r,
+		closer: closer,
+	}
+}

+ 111 - 0
pkg/pools/pools.go

@@ -0,0 +1,111 @@
+// +build go1.3
+
+// Package pools provides a collection of pools which provide various
+// data types with buffers. These can be used to lower the number of
+// memory allocations and reuse buffers.
+//
+// New pools should be added to this package to allow them to be
+// shared across packages.
+//
+// Utility functions which operate on pools should be added to this
+// package to allow them to be reused.
+package pools
+
+import (
+	"bufio"
+	"io"
+	"sync"
+
+	"github.com/docker/docker/pkg/ioutils"
+)
+
+var (
+	// Pool which returns bufio.Reader with a 32K buffer
+	BufioReader32KPool *BufioReaderPool
+	// Pool which returns bufio.Writer with a 32K buffer
+	BufioWriter32KPool *BufioWriterPool
+)
+
+const buffer32K = 32 * 1024
+
+type BufioReaderPool struct {
+	pool sync.Pool
+}
+
+func init() {
+	BufioReader32KPool = newBufioReaderPoolWithSize(buffer32K)
+	BufioWriter32KPool = newBufioWriterPoolWithSize(buffer32K)
+}
+
+// newBufioReaderPoolWithSize is unexported because new pools should be
+// added here to be shared where required.
+func newBufioReaderPoolWithSize(size int) *BufioReaderPool {
+	pool := sync.Pool{
+		New: func() interface{} { return bufio.NewReaderSize(nil, size) },
+	}
+	return &BufioReaderPool{pool: pool}
+}
+
+// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
+func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
+	buf := bufPool.pool.Get().(*bufio.Reader)
+	buf.Reset(r)
+	return buf
+}
+
+// Put puts the bufio.Reader back into the pool.
+func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
+	b.Reset(nil)
+	bufPool.pool.Put(b)
+}
+
+// NewReadCloserWrapper returns a wrapper which puts the bufio.Reader back
+// into the pool and closes the reader if it's an io.ReadCloser.
+func (bufPool *BufioReaderPool) NewReadCloserWrapper(buf *bufio.Reader, r io.Reader) io.ReadCloser {
+	return ioutils.NewReadCloserWrapper(r, func() error {
+		if readCloser, ok := r.(io.ReadCloser); ok {
+			readCloser.Close()
+		}
+		bufPool.Put(buf)
+		return nil
+	})
+}
+
+type BufioWriterPool struct {
+	pool sync.Pool
+}
+
+// newBufioWriterPoolWithSize is unexported because new pools should be
+// added here to be shared where required.
+func newBufioWriterPoolWithSize(size int) *BufioWriterPool {
+	pool := sync.Pool{
+		New: func() interface{} { return bufio.NewWriterSize(nil, size) },
+	}
+	return &BufioWriterPool{pool: pool}
+}
+
+// Get returns a bufio.Writer which writes to w. The buffer size is that of the pool.
+func (bufPool *BufioWriterPool) Get(w io.Writer) *bufio.Writer {
+	buf := bufPool.pool.Get().(*bufio.Writer)
+	buf.Reset(w)
+	return buf
+}
+
+// Put puts the bufio.Writer back into the pool.
+func (bufPool *BufioWriterPool) Put(b *bufio.Writer) {
+	b.Reset(nil)
+	bufPool.pool.Put(b)
+}
+
+// NewWriteCloserWrapper returns a wrapper which puts the bufio.Writer back
+// into the pool and closes the writer if it's an io.Writecloser.
+func (bufPool *BufioWriterPool) NewWriteCloserWrapper(buf *bufio.Writer, w io.Writer) io.WriteCloser {
+	return ioutils.NewWriteCloserWrapper(w, func() error {
+		buf.Flush()
+		if writeCloser, ok := w.(io.WriteCloser); ok {
+			writeCloser.Close()
+		}
+		bufPool.Put(buf)
+		return nil
+	})
+}

+ 73 - 0
pkg/pools/pools_nopool.go

@@ -0,0 +1,73 @@
+// +build !go1.3
+
+package pools
+
+import (
+	"bufio"
+	"io"
+
+	"github.com/docker/docker/pkg/ioutils"
+)
+
+var (
+	BufioReader32KPool *BufioReaderPool
+	BufioWriter32KPool *BufioWriterPool
+)
+
+const buffer32K = 32 * 1024
+
+type BufioReaderPool struct {
+	size int
+}
+
+func init() {
+	BufioReader32KPool = newBufioReaderPoolWithSize(buffer32K)
+	BufioWriter32KPool = newBufioWriterPoolWithSize(buffer32K)
+}
+
+func newBufioReaderPoolWithSize(size int) *BufioReaderPool {
+	return &BufioReaderPool{size: size}
+}
+
+func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
+	return bufio.NewReaderSize(r, bufPool.size)
+}
+
+func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
+	b.Reset(nil)
+}
+
+func (bufPool *BufioReaderPool) NewReadCloserWrapper(buf *bufio.Reader, r io.Reader) io.ReadCloser {
+	return ioutils.NewReadCloserWrapper(r, func() error {
+		if readCloser, ok := r.(io.ReadCloser); ok {
+			return readCloser.Close()
+		}
+		return nil
+	})
+}
+
+type BufioWriterPool struct {
+	size int
+}
+
+func newBufioWriterPoolWithSize(size int) *BufioWriterPool {
+	return &BufioWriterPool{size: size}
+}
+
+func (bufPool *BufioWriterPool) Get(w io.Writer) *bufio.Writer {
+	return bufio.NewWriterSize(w, bufPool.size)
+}
+
+func (bufPool *BufioWriterPool) Put(b *bufio.Writer) {
+	b.Reset(nil)
+}
+
+func (bufPool *BufioWriterPool) NewWriteCloserWrapper(buf *bufio.Writer, w io.Writer) io.WriteCloser {
+	return ioutils.NewWriteCloserWrapper(w, func() error {
+		buf.Flush()
+		if writeCloser, ok := w.(io.WriteCloser); ok {
+			return writeCloser.Close()
+		}
+		return nil
+	})
+}

+ 2 - 96
utils/utils.go

@@ -20,6 +20,7 @@ import (
 	"syscall"
 	"syscall"
 
 
 	"github.com/docker/docker/dockerversion"
 	"github.com/docker/docker/dockerversion"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/log"
 	"github.com/docker/docker/pkg/log"
 )
 )
 
 
@@ -157,81 +158,6 @@ func DockerInitPath(localCopy string) string {
 	return ""
 	return ""
 }
 }
 
 
-type NopWriter struct{}
-
-func (*NopWriter) Write(buf []byte) (int, error) {
-	return len(buf), nil
-}
-
-type nopWriteCloser struct {
-	io.Writer
-}
-
-func (w *nopWriteCloser) Close() error { return nil }
-
-func NopWriteCloser(w io.Writer) io.WriteCloser {
-	return &nopWriteCloser{w}
-}
-
-type bufReader struct {
-	sync.Mutex
-	buf    *bytes.Buffer
-	reader io.Reader
-	err    error
-	wait   sync.Cond
-}
-
-func NewBufReader(r io.Reader) *bufReader {
-	reader := &bufReader{
-		buf:    &bytes.Buffer{},
-		reader: r,
-	}
-	reader.wait.L = &reader.Mutex
-	go reader.drain()
-	return reader
-}
-
-func (r *bufReader) drain() {
-	buf := make([]byte, 1024)
-	for {
-		n, err := r.reader.Read(buf)
-		r.Lock()
-		if err != nil {
-			r.err = err
-		} else {
-			r.buf.Write(buf[0:n])
-		}
-		r.wait.Signal()
-		r.Unlock()
-		if err != nil {
-			break
-		}
-	}
-}
-
-func (r *bufReader) Read(p []byte) (n int, err error) {
-	r.Lock()
-	defer r.Unlock()
-	for {
-		n, err = r.buf.Read(p)
-		if n > 0 {
-			return n, err
-		}
-		if r.err != nil {
-			return 0, r.err
-		}
-		r.wait.Wait()
-	}
-}
-
-func (r *bufReader) Close() error {
-	closer, ok := r.reader.(io.ReadCloser)
-	if !ok {
-		return nil
-	}
-	return closer.Close()
-}
-
 func GetTotalUsedFds() int {
 func GetTotalUsedFds() int {
 	if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
 	if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
 		log.Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)
 		log.Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)
@@ -340,10 +266,6 @@ func CopyDirectory(source, dest string) error {
 	return nil
 	return nil
 }
 }
 
 
-type NopFlusher struct{}
-
-func (f *NopFlusher) Flush() {}
-
 type WriteFlusher struct {
 type WriteFlusher struct {
 	sync.Mutex
 	sync.Mutex
 	w       io.Writer
 	w       io.Writer
@@ -370,7 +292,7 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher {
 	if f, ok := w.(http.Flusher); ok {
 	if f, ok := w.(http.Flusher); ok {
 		flusher = f
 		flusher = f
 	} else {
 	} else {
-		flusher = &NopFlusher{}
+		flusher = &ioutils.NopFlusher{}
 	}
 	}
 	return &WriteFlusher{w: w, flusher: flusher}
 	return &WriteFlusher{w: w, flusher: flusher}
 }
 }
@@ -527,22 +449,6 @@ func CopyFile(src, dst string) (int64, error) {
 	return io.Copy(df, sf)
 	return io.Copy(df, sf)
 }
 }
 
 
-type readCloserWrapper struct {
-	io.Reader
-	closer func() error
-}
-
-func (r *readCloserWrapper) Close() error {
-	return r.closer()
-}
-
-func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
-	return &readCloserWrapper{
-		Reader: r,
-		closer: closer,
-	}
-}
-
 // ReplaceOrAppendValues returns the defaults with the overrides either
 // ReplaceOrAppendValues returns the defaults with the overrides either
 // replaced by env key or appended to the list
 // replaced by env key or appended to the list
 func ReplaceOrAppendEnvValues(defaults, overrides []string) []string {
 func ReplaceOrAppendEnvValues(defaults, overrides []string) []string {

+ 0 - 29
utils/utils_test.go

@@ -1,39 +1,10 @@
 package utils
 package utils
 
 
 import (
 import (
-	"bytes"
-	"io"
-	"io/ioutil"
 	"os"
 	"os"
 	"testing"
 	"testing"
 )
 )
 
 
-func TestBufReader(t *testing.T) {
-	reader, writer := io.Pipe()
-	bufreader := NewBufReader(reader)
-
-	// Write everything down to a Pipe
-	// Usually, a pipe should block but because of the buffered reader,
-	// the writes will go through
-	done := make(chan bool)
-	go func() {
-		writer.Write([]byte("hello world"))
-		writer.Close()
-		done <- true
-	}()
-
-	// Drain the reader *after* everything has been written, just to verify
-	// it is indeed buffering
-	<-done
-	output, err := ioutil.ReadAll(bufreader)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if !bytes.Equal(output, []byte("hello world")) {
-		t.Error(string(output))
-	}
-}
-
 func TestCheckLocalDns(t *testing.T) {
 func TestCheckLocalDns(t *testing.T) {
 	for resolv, result := range map[string]bool{`# Dynamic
 	for resolv, result := range map[string]bool{`# Dynamic
 nameserver 10.0.2.3
 nameserver 10.0.2.3