Przeglądaj źródła

Merge pull request #16932 from c0b/fix-race-16924

Fix race #16924 [panic: runtime error: slice bounds out of range] docker daemon crash of racing
Vincent Batts 9 lat temu
rodzic
commit
d35a1f2868

+ 25 - 45
pkg/archive/archive.go

@@ -20,6 +20,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/fileutils"
 	"github.com/docker/docker/pkg/fileutils"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/idtools"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/promise"
 	"github.com/docker/docker/pkg/promise"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/pkg/system"
@@ -116,10 +117,10 @@ func DetectCompression(source []byte) Compression {
 	return Uncompressed
 	return Uncompressed
 }
 }
 
 
-func xzDecompress(archive io.Reader) (io.ReadCloser, error) {
+func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) {
 	args := []string{"xz", "-d", "-c", "-q"}
 	args := []string{"xz", "-d", "-c", "-q"}
 
 
-	return CmdStream(exec.Command(args[0], args[1:]...), archive)
+	return cmdStream(exec.Command(args[0], args[1:]...), archive)
 }
 }
 
 
 // DecompressStream decompress the archive and returns a ReaderCloser with the decompressed archive.
 // DecompressStream decompress the archive and returns a ReaderCloser with the decompressed archive.
@@ -148,12 +149,15 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
 		readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
 		readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
 		return readBufWrapper, nil
 		return readBufWrapper, nil
 	case Xz:
 	case Xz:
-		xzReader, err := xzDecompress(buf)
+		xzReader, chdone, err := xzDecompress(buf)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 		readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
 		readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
-		return readBufWrapper, nil
+		return ioutils.NewReadCloserWrapper(readBufWrapper, func() error {
+			<-chdone
+			return readBufWrapper.Close()
+		}), nil
 	default:
 	default:
 		return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
 		return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
 	}
 	}
@@ -925,57 +929,33 @@ func CopyFileWithTar(src, dst string) (err error) {
 	return defaultArchiver.CopyFileWithTar(src, dst)
 	return defaultArchiver.CopyFileWithTar(src, dst)
 }
 }
 
 
-// CmdStream executes a command, and returns its stdout as a stream.
+// cmdStream executes a command, and returns its stdout as a stream.
 // If the command fails to run or doesn't complete successfully, an error
 // If the command fails to run or doesn't complete successfully, an error
 // will be returned, including anything written on stderr.
 // will be returned, including anything written on stderr.
-func CmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
-	if input != nil {
-		stdin, err := cmd.StdinPipe()
-		if err != nil {
-			return nil, err
-		}
-		// Write stdin if any
-		go func() {
-			io.Copy(stdin, input)
-			stdin.Close()
-		}()
-	}
-	stdout, err := cmd.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-	stderr, err := cmd.StderrPipe()
-	if err != nil {
-		return nil, err
-	}
+func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) {
+	chdone := make(chan struct{})
+	cmd.Stdin = input
 	pipeR, pipeW := io.Pipe()
 	pipeR, pipeW := io.Pipe()
-	errChan := make(chan []byte)
-	// Collect stderr, we will use it in case of an error
-	go func() {
-		errText, e := ioutil.ReadAll(stderr)
-		if e != nil {
-			errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
-		}
-		errChan <- errText
-	}()
+	cmd.Stdout = pipeW
+	var errBuf bytes.Buffer
+	cmd.Stderr = &errBuf
+
+	// Run the command and return the pipe
+	if err := cmd.Start(); err != nil {
+		return nil, nil, err
+	}
+
 	// Copy stdout to the returned pipe
 	// Copy stdout to the returned pipe
 	go func() {
 	go func() {
-		_, err := io.Copy(pipeW, stdout)
-		if err != nil {
-			pipeW.CloseWithError(err)
-		}
-		errText := <-errChan
 		if err := cmd.Wait(); err != nil {
 		if err := cmd.Wait(); err != nil {
-			pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errText))
+			pipeW.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
 		} else {
 		} else {
 			pipeW.Close()
 			pipeW.Close()
 		}
 		}
+		close(chdone)
 	}()
 	}()
-	// Run the command and return the pipe
-	if err := cmd.Start(); err != nil {
-		return nil, err
-	}
-	return pipeR, nil
+
+	return pipeR, chdone, nil
 }
 }
 
 
 // NewTempArchive reads the content of src into a temporary file, and returns the contents
 // NewTempArchive reads the content of src into a temporary file, and returns the contents

+ 3 - 3
pkg/archive/archive_test.go

@@ -160,7 +160,7 @@ func TestExtensionXz(t *testing.T) {
 
 
 func TestCmdStreamLargeStderr(t *testing.T) {
 func TestCmdStreamLargeStderr(t *testing.T) {
 	cmd := exec.Command("/bin/sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
 	cmd := exec.Command("/bin/sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
-	out, err := CmdStream(cmd, nil)
+	out, _, err := cmdStream(cmd, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("Failed to start command: %s", err)
 		t.Fatalf("Failed to start command: %s", err)
 	}
 	}
@@ -181,7 +181,7 @@ func TestCmdStreamLargeStderr(t *testing.T) {
 
 
 func TestCmdStreamBad(t *testing.T) {
 func TestCmdStreamBad(t *testing.T) {
 	badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
 	badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
-	out, err := CmdStream(badCmd, nil)
+	out, _, err := cmdStream(badCmd, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("Failed to start command: %s", err)
 		t.Fatalf("Failed to start command: %s", err)
 	}
 	}
@@ -196,7 +196,7 @@ func TestCmdStreamBad(t *testing.T) {
 
 
 func TestCmdStreamGood(t *testing.T) {
 func TestCmdStreamGood(t *testing.T) {
 	cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0")
 	cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0")
-	out, err := CmdStream(cmd, nil)
+	out, _, err := cmdStream(cmd, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 6 - 0
pkg/chrootarchive/archive_unix.go

@@ -8,6 +8,7 @@ import (
 	"flag"
 	"flag"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"io/ioutil"
 	"os"
 	"os"
 	"runtime"
 	"runtime"
 	"syscall"
 	"syscall"
@@ -79,6 +80,11 @@ func invokeUnpack(decompressedArchive io.Reader, dest string, options *archive.T
 	w.Close()
 	w.Close()
 
 
 	if err := cmd.Wait(); err != nil {
 	if err := cmd.Wait(); err != nil {
+		// when `xz -d -c -q | docker-untar ...` failed on docker-untar side,
+		// we need to exhaust `xz`'s output, otherwise the `xz` side will be
+		// pending on write pipe forever
+		io.Copy(ioutil.Discard, decompressedArchive)
+
 		return fmt.Errorf("Untar re-exec error: %v: output: %s", err, output)
 		return fmt.Errorf("Untar re-exec error: %v: output: %s", err, output)
 	}
 	}
 	return nil
 	return nil