diff --git a/Dockerfile b/Dockerfile index 2a8f9181e6..999a0f9671 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,6 +62,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/Dockerfile.aarch64 b/Dockerfile.aarch64 index fde0c706fb..32bb26504a 100644 --- a/Dockerfile.aarch64 +++ b/Dockerfile.aarch64 @@ -52,6 +52,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/Dockerfile.armhf b/Dockerfile.armhf index b5a4d938a4..85266f9390 100644 --- a/Dockerfile.armhf +++ b/Dockerfile.armhf @@ -45,6 +45,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/Dockerfile.e2e b/Dockerfile.e2e index 4dec3515a7..6c96c033ab 100644 --- a/Dockerfile.e2e +++ b/Dockerfile.e2e @@ -47,6 +47,7 @@ RUN apk add --update \ g++ \ git \ iptables \ + pigz \ tar \ xz \ && rm -rf /var/cache/apk/* diff --git a/Dockerfile.ppc64le b/Dockerfile.ppc64le index 106a3a6f37..15ee5f6d7a 100644 --- a/Dockerfile.ppc64le +++ b/Dockerfile.ppc64le @@ -46,6 +46,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/Dockerfile.s390x b/Dockerfile.s390x index e2f81a5d7d..57b7784420 100644 --- a/Dockerfile.s390x +++ b/Dockerfile.s390x @@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/Dockerfile.simple b/Dockerfile.simple index d048d909cd..578bbb2196 100644 --- a/Dockerfile.simple +++ b/Dockerfile.simple @@ -28,6 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ e2fsprogs \ iptables \ pkg-config \ + pigz \ procps \ xfsprogs \ xz-utils \ diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index a0c25937c8..5f7f562677 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -6,6 +6,7 @@ import ( "bytes" "compress/bzip2" "compress/gzip" + "context" "fmt" "io" "io/ioutil" @@ -13,6 +14,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "syscall" @@ -24,6 +26,17 @@ import ( "github.com/sirupsen/logrus" ) +var unpigzPath string + +func init() { + if path, err := exec.LookPath("unpigz"); err != nil { + logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library") + } else { + logrus.Debugf("Using unpigz binary found at path %s", path) + unpigzPath = path + } +} + type ( // Compression is the state represents if compressed or not. Compression int @@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression { return Uncompressed } -func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) { +func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) { args := []string{"xz", "-d", "-c", "-q"} - return cmdStream(exec.Command(args[0], args[1:]...), archive) + return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive) +} + +func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) { + if unpigzPath == "" { + return gzip.NewReader(buf) + } + + disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ") + if disablePigzEnv != "" { + if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil { + return nil, err + } else if disablePigz { + return gzip.NewReader(buf) + } + } + + return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf) +} + +func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { + return ioutils.NewReadCloserWrapper(readBuf, func() error { + cancel() + return readBuf.Close() + }) } // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. @@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { readBufWrapper := p.NewReadCloserWrapper(buf, buf) return readBufWrapper, nil case Gzip: - gzReader, err := gzip.NewReader(buf) + ctx, cancel := context.WithCancel(context.Background()) + + gzReader, err := gzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) - return readBufWrapper, nil + return wrapReadCloser(readBufWrapper, cancel), nil case Bzip2: bz2Reader := bzip2.NewReader(buf) readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) return readBufWrapper, nil case Xz: - xzReader, chdone, err := xzDecompress(buf) + ctx, cancel := context.WithCancel(context.Background()) + + xzReader, err := xzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) - return ioutils.NewReadCloserWrapper(readBufWrapper, func() error { - <-chdone - return readBufWrapper.Close() - }), nil + return wrapReadCloser(readBufWrapper, cancel), nil default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } @@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error { // cmdStream executes a command, and returns its stdout as a stream. // If the command fails to run or doesn't complete successfully, an error // will be returned, including anything written on stderr. -func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) { - chdone := make(chan struct{}) +func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { cmd.Stdin = input pipeR, pipeW := io.Pipe() cmd.Stdout = pipeW @@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, // Run the command and return the pipe if err := cmd.Start(); err != nil { - return nil, nil, err + return nil, err } // Copy stdout to the returned pipe @@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, } else { pipeW.Close() } - close(chdone) }() - return pipeR, chdone, nil + return pipeR, nil } // NewTempArchive reads the content of src into a temporary file, and returns the contents diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go index 989557c53c..0e94f41c6d 100644 --- a/pkg/archive/archive_test.go +++ b/pkg/archive/archive_test.go @@ -3,6 +3,7 @@ package archive import ( "archive/tar" "bytes" + "compress/gzip" "fmt" "io" "io/ioutil" @@ -15,6 +16,7 @@ import ( "time" "github.com/docker/docker/pkg/idtools" + "github.com/docker/docker/pkg/ioutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) { } } -func testDecompressStream(t *testing.T, ext, compressCommand string) { +func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader { cmd := exec.Command("sh", "-c", fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand)) output, err := cmd.CombinedOutput() @@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) { if err = r.Close(); err != nil { t.Fatalf("Failed to close the decompressed stream: %v ", err) } + + return r } func TestDecompressStreamGzip(t *testing.T) { @@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) { func TestCmdStreamLargeStderr(t *testing.T) { cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) { t.Skip("Failing on Windows CI machines") } badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1") - out, _, err := cmdStream(badCmd, nil) + out, err := cmdStream(badCmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) { func TestCmdStreamGood(t *testing.T) { cmd := exec.Command("sh", "-c", "echo hello; exit 0") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatal(err) } @@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec assert.NoError(t, err) return string(content) } + +func TestDisablePigz(t *testing.T) { + _, err := exec.LookPath("unpigz") + if err != nil { + t.Log("Test will not check full path when Pigz not installed") + } + + os.Setenv("MOBY_DISABLE_PIGZ", "true") + defer os.Unsetenv("MOBY_DISABLE_PIGZ") + + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) +} + +func TestPigz(t *testing.T) { + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + _, err := exec.LookPath("unpigz") + if err == nil { + t.Log("Tested whether Pigz is used, as it installed") + assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader) + } else { + t.Log("Tested whether Pigz is not used, as it not installed") + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) + } +} diff --git a/pkg/ioutils/readers.go b/pkg/ioutils/readers.go index 63f3c07f46..168fa1d2d0 100644 --- a/pkg/ioutils/readers.go +++ b/pkg/ioutils/readers.go @@ -8,18 +8,22 @@ import ( "golang.org/x/net/context" ) -type readCloserWrapper struct { +// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser +// It calls the given callback function when closed. It should be constructed +// with NewReadCloserWrapper +type ReadCloserWrapper struct { io.Reader closer func() error } -func (r *readCloserWrapper) Close() error { +// Close calls back the passed closer function +func (r *ReadCloserWrapper) Close() error { return r.closer() } // NewReadCloserWrapper returns a new io.ReadCloser. func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { - return &readCloserWrapper{ + return &ReadCloserWrapper{ Reader: r, closer: closer, }