Browse Source

'docker pull' and 'docker put' automatically detect tar compression (gzip, bzip2 or uncompressed). -j and -z flags are no longer required.

Solomon Hykes 12 years ago
parent
commit
f437f5b8b4
10 changed files with 190 additions and 90 deletions
  1. 2 2
      README.md
  2. 1 1
      fake/fake.go
  3. 2 1
      filesystem.go
  4. 23 0
      future/future.go
  5. 71 0
      image/archive.go
  6. 54 0
      image/archive_test.go
  7. 4 10
      image/image.go
  8. 27 36
      image/layers.go
  9. 6 21
      server/server.go
  10. 0 19
      utils.go

+ 2 - 2
README.md

@@ -157,12 +157,12 @@ Step by step host setup
 3. Type the following commands:
 3. Type the following commands:
 
 
         apt-get update
         apt-get update
-        apt-get install lxc wget
+        apt-get install lxc wget bsdtar
 
 
 4. Download the latest version of the [docker binaries](https://dl.dropbox.com/u/20637798/docker.tar.gz) (`wget https://dl.dropbox.com/u/20637798/docker.tar.gz`) (warning: this may not be the most up-to-date build)
 4. Download the latest version of the [docker binaries](https://dl.dropbox.com/u/20637798/docker.tar.gz) (`wget https://dl.dropbox.com/u/20637798/docker.tar.gz`) (warning: this may not be the most up-to-date build)
 5. Extract the contents of the tar file `tar -xf docker.tar.gz`
 5. Extract the contents of the tar file `tar -xf docker.tar.gz`
 6. Launch the docker daemon `./dockerd`
 6. Launch the docker daemon `./dockerd`
-7. Download a base image by running 'docker pull -j base'
+7. Download a base image by running 'docker pull base'
 
 
 
 
 Client installation
 Client installation

+ 1 - 1
fake/fake.go

@@ -14,7 +14,7 @@ func FakeTar() (io.Reader, error) {
 	content := []byte("Hello world!\n")
 	content := []byte("Hello world!\n")
 	buf := new(bytes.Buffer)
 	buf := new(bytes.Buffer)
 	tw := tar.NewWriter(buf)
 	tw := tar.NewWriter(buf)
-	for _, name := range []string {"/etc/postgres/postgres.conf", "/etc/passwd", "/var/log/postgres", "/var/log/postgres/postgres.conf"} {
+	for _, name := range []string {"hello", "etc/postgres/postgres.conf", "etc/passwd", "var/log/postgres/postgres.conf"} {
 		hdr := new(tar.Header)
 		hdr := new(tar.Header)
 		hdr.Size = int64(len(content))
 		hdr.Size = int64(len(content))
 		hdr.Name = name
 		hdr.Name = name

+ 2 - 1
filesystem.go

@@ -10,6 +10,7 @@ import (
 	"strings"
 	"strings"
 	"syscall"
 	"syscall"
 	"time"
 	"time"
+	"github.com/dotcloud/docker/image"
 )
 )
 
 
 type Filesystem struct {
 type Filesystem struct {
@@ -104,7 +105,7 @@ func (fs *Filesystem) Tar() (io.Reader, error) {
 	if err := fs.EnsureMounted(); err != nil {
 	if err := fs.EnsureMounted(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return Tar(fs.RootFS)
+	return image.Tar(fs.RootFS, image.Uncompressed)
 }
 }
 
 
 func (fs *Filesystem) EnsureMounted() error {
 func (fs *Filesystem) EnsureMounted() error {

+ 23 - 0
future/future.go

@@ -61,3 +61,26 @@ func Go(f func() error) chan error {
 	return ch
 	return ch
 }
 }
 
 
+// Pv wraps an io.Reader such that it is passed through unchanged,
+// but logs the number of bytes copied (comparable to the unix command pv)
+func Pv(src io.Reader, info io.Writer) io.Reader {
+	var totalBytes int
+	data := make([]byte, 2048)
+	r, w := io.Pipe()
+	go func() {
+		for {
+			if n, err := src.Read(data); err != nil {
+				w.CloseWithError(err)
+				return
+			} else {
+				totalBytes += n
+				fmt.Fprintf(info, "--> %d bytes\n", totalBytes)
+				if _, err = w.Write(data[:n]); err != nil {
+					return
+				}
+			}
+		}
+	}()
+	return r
+}
+

+ 71 - 0
image/archive.go

@@ -0,0 +1,71 @@
+package image
+
+import (
+	"io"
+	"io/ioutil"
+	"os/exec"
+	"errors"
+)
+
+type Compression uint32
+
+const (
+	Uncompressed	Compression = iota
+	Bzip2
+	Gzip
+)
+
+func (compression *Compression) Flag() string {
+	switch *compression {
+		case Bzip2: return "j"
+		case Gzip: return "z"
+	}
+	return ""
+}
+
+func Tar(path string, compression Compression) (io.Reader, error) {
+	cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-c" + compression.Flag(), ".")
+	return CmdStream(cmd)
+}
+
+func Untar(archive io.Reader, path string) error {
+	cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-x")
+	cmd.Stdin = archive
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		return errors.New(err.Error() + ": " + string(output))
+	}
+	return nil
+}
+
+func CmdStream(cmd *exec.Cmd) (io.Reader, error) {
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return nil, err
+	}
+	pipeR, pipeW := io.Pipe()
+	go func() {
+		_, err := io.Copy(pipeW, stdout)
+		if err != nil {
+			pipeW.CloseWithError(err)
+		}
+		errText, e := ioutil.ReadAll(stderr)
+		if e != nil {
+			errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
+		}
+		if err := cmd.Wait(); err != nil {
+			// FIXME: can this block if stderr outputs more than the size of StderrPipe()'s buffer?
+			pipeW.CloseWithError(errors.New(err.Error() + ": " + string(errText)))
+		} else {
+			pipeW.Close()
+		}
+	}()
+	if err := cmd.Start(); err != nil {
+		return nil, err
+	}
+	return pipeR, nil
+}

+ 54 - 0
image/archive_test.go

@@ -0,0 +1,54 @@
+package image
+
+import (
+	"testing"
+	"os"
+	"os/exec"
+	"io/ioutil"
+)
+
+func TestCmdStreamBad(t *testing.T) {
+	badCmd := exec.Command("/bin/sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
+	out, err := CmdStream(badCmd)
+	if err != nil {
+		t.Fatalf("Failed to start command: " + err.Error())
+	}
+	if output, err := ioutil.ReadAll(out); err == nil {
+		t.Fatalf("Command should have failed")
+	} else if err.Error() != "exit status 1: error couldn't reverse the phase pulser\n" {
+		t.Fatalf("Wrong error value (%s)", err.Error())
+	} else if s := string(output); s != "hello\n" {
+		t.Fatalf("Command output should be '%s', not '%s'", "hello\\n", output)
+	}
+}
+
+func TestCmdStreamGood(t *testing.T) {
+	cmd := exec.Command("/bin/sh", "-c", "echo hello; exit 0")
+	out, err := CmdStream(cmd)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if output, err := ioutil.ReadAll(out); err != nil {
+		t.Fatalf("Command should not have failed (err=%s)", err)
+	} else if s := string(output); s != "hello\n" {
+		t.Fatalf("Command output should be '%s', not '%s'", "hello\\n", output)
+	}
+}
+
+func TestTarUntar(t *testing.T) {
+	archive, err := Tar(".", Uncompressed)
+	if err != nil {
+		t.Fatal(err)
+	}
+	tmp, err := ioutil.TempDir("", "docker-test-untar")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmp)
+	if err := Untar(archive, tmp); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := os.Stat(tmp); err != nil {
+		t.Fatalf("Error stating %s: %s", tmp, err.Error())
+	}
+}

+ 4 - 10
image/image.go

@@ -44,16 +44,10 @@ func New(root string) (*Store, error) {
 	}, nil
 	}, nil
 }
 }
 
 
-type Compression uint32
-
-const (
-	Uncompressed	Compression = iota
-	Bzip2
-	Gzip
-)
-
-func (store *Store) Import(name string, archive io.Reader, stderr io.Writer, parent *Image, compression Compression) (*Image, error) {
-	layer, err := store.Layers.AddLayer(archive, stderr, compression)
+// Import creates a new image from the contents of `archive` and registers it in the store as `name`.
+// If `parent` is not nil, it will registered as the parent of the new image.
+func (store *Store) Import(name string, archive io.Reader, parent *Image) (*Image, error) {
+	layer, err := store.Layers.AddLayer(archive)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 27 - 36
image/layers.go

@@ -7,7 +7,6 @@ import (
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
-	"os/exec"
 	"github.com/dotcloud/docker/future"
 	"github.com/dotcloud/docker/future"
 )
 )
 
 
@@ -82,50 +81,42 @@ func (store *LayerStore) layerPath(id string) string {
 }
 }
 
 
 
 
-func (store *LayerStore) AddLayer(archive io.Reader, stderr io.Writer, compression Compression) (string, error) {
+func (store *LayerStore) AddLayer(archive io.Reader) (string, error) {
+	errors := make(chan error)
+	// Untar
 	tmp, err := store.Mktemp()
 	tmp, err := store.Mktemp()
 	defer os.RemoveAll(tmp)
 	defer os.RemoveAll(tmp)
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	}
 	}
-	extractFlags := "-x"
-	if compression == Bzip2 {
-		extractFlags += "j"
-	} else if compression == Gzip {
-		extractFlags += "z"
-	}
-	untarCmd := exec.Command("tar", "-C", tmp, extractFlags)
-	untarW, err := untarCmd.StdinPipe()
-	if err != nil {
-		return "", err
-	}
-	untarStderr, err := untarCmd.StderrPipe()
-	if err != nil {
-		return "", err
-	}
-	go io.Copy(stderr, untarStderr)
-	untarStdout, err := untarCmd.StdoutPipe()
-	if err != nil {
-		return "", err
-	}
-	go io.Copy(stderr, untarStdout)
-	untarCmd.Start()
+	untarR, untarW := io.Pipe()
+	go func() {
+		errors <- Untar(untarR, tmp)
+	}()
+	// Compute ID
+	var id string
 	hashR, hashW := io.Pipe()
 	hashR, hashW := io.Pipe()
-	job_copy := future.Go(func() error {
-		_, err := io.Copy(io.MultiWriter(hashW, untarW), archive)
-		hashW.Close()
-		untarW.Close()
-		return err
-	})
-	id, err := future.ComputeId(hashR)
+	go func() {
+		_id, err := future.ComputeId(hashR)
+		id = _id
+		errors <- err
+	}()
+	// Duplicate archive to each stream
+	_, err = io.Copy(io.MultiWriter(hashW, untarW), archive)
+	hashW.Close()
+	untarW.Close()
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	}
 	}
-	if err := untarCmd.Wait(); err != nil {
-		return "", err
-	}
-	if err := <-job_copy; err != nil {
-		return "", err
+	// Wait for goroutines
+	for i:=0; i<2; i+=1 {
+		select {
+			case err := <-errors: {
+				if err != nil {
+					return "", err
+				}
+			}
+		}
 	}
 	}
 	layer := store.layerPath(id)
 	layer := store.layerPath(id)
 	if !store.Exists(id) {
 	if !store.Exists(id) {

+ 6 - 21
server/server.go

@@ -348,17 +348,9 @@ func (srv *Server) CmdKill(stdin io.ReadCloser, stdout io.Writer, args ...string
 
 
 func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
 func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
 	cmd := rcli.Subcmd(stdout, "pull", "[OPTIONS] NAME", "Download a new image from a remote location")
 	cmd := rcli.Subcmd(stdout, "pull", "[OPTIONS] NAME", "Download a new image from a remote location")
-	fl_bzip2 := cmd.Bool("j", false, "Bzip2 compression")
-	fl_gzip := cmd.Bool("z", false, "Gzip compression")
 	if err := cmd.Parse(args); err != nil {
 	if err := cmd.Parse(args); err != nil {
 		return nil
 		return nil
 	}
 	}
-	var compression image.Compression
-	if *fl_bzip2 {
-		compression = image.Bzip2
-	} else if *fl_gzip {
-		compression = image.Gzip
-	}
 	name := cmd.Arg(0)
 	name := cmd.Arg(0)
 	if name == "" {
 	if name == "" {
 		return errors.New("Not enough arguments")
 		return errors.New("Not enough arguments")
@@ -375,12 +367,13 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string
 		u.Host = "s3.amazonaws.com"
 		u.Host = "s3.amazonaws.com"
 		u.Path = path.Join("/docker.io/images", u.Path)
 		u.Path = path.Join("/docker.io/images", u.Path)
 	}
 	}
-	fmt.Fprintf(stdout, "Downloading %s from %s...\n", name, u.String())
+	fmt.Fprintf(stdout, "Downloading from %s\n", u.String())
 	resp, err := http.Get(u.String())
 	resp, err := http.Get(u.String())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	img, err := srv.images.Import(name, resp.Body, stdout, nil, compression)
+	fmt.Fprintf(stdout, "Unpacking to %s\n", name)
+	img, err := srv.images.Import(name, resp.Body, nil)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -390,22 +383,14 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string
 
 
 func (srv *Server) CmdPut(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
 func (srv *Server) CmdPut(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
 	cmd := rcli.Subcmd(stdout, "put", "[OPTIONS] NAME", "Import a new image from a local archive.")
 	cmd := rcli.Subcmd(stdout, "put", "[OPTIONS] NAME", "Import a new image from a local archive.")
-	fl_bzip2 := cmd.Bool("j", false, "Bzip2 compression")
-	fl_gzip := cmd.Bool("z", false, "Gzip compression")
 	if err := cmd.Parse(args); err != nil {
 	if err := cmd.Parse(args); err != nil {
 		return nil
 		return nil
 	}
 	}
-	var compression image.Compression
-	if *fl_bzip2 {
-		compression = image.Bzip2
-	} else if *fl_gzip {
-		compression = image.Gzip
-	}
 	name := cmd.Arg(0)
 	name := cmd.Arg(0)
 	if name == "" {
 	if name == "" {
 		return errors.New("Not enough arguments")
 		return errors.New("Not enough arguments")
 	}
 	}
-	img, err := srv.images.Import(name, stdin, stdout, nil, compression)
+	img, err := srv.images.Import(name, stdin, nil)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -558,13 +543,13 @@ func (srv *Server) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...stri
 	}
 	}
 	if container := srv.containers.Get(containerName); container != nil {
 	if container := srv.containers.Get(containerName); container != nil {
 		// FIXME: freeze the container before copying it to avoid data corruption?
 		// FIXME: freeze the container before copying it to avoid data corruption?
-		rwTar, err := docker.Tar(container.Filesystem.RWPath)
+		rwTar, err := image.Tar(container.Filesystem.RWPath, image.Uncompressed)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
 		// Create a new image from the container's base layers + a new layer from container changes
 		// Create a new image from the container's base layers + a new layer from container changes
 		parentImg := srv.images.Find(container.GetUserData("image"))
 		parentImg := srv.images.Find(container.GetUserData("image"))
-		img, err := srv.images.Import(imgName, rwTar, stdout, parentImg, image.Uncompressed)
+		img, err := srv.images.Import(imgName, rwTar, parentImg)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 0 - 19
utils.go

@@ -17,25 +17,6 @@ func Trunc(s string, maxlen int) string {
 	return s[:maxlen]
 	return s[:maxlen]
 }
 }
 
 
-// Tar generates a tar archive from a filesystem path, and returns it as a stream.
-// Path must point to a directory.
-
-func Tar(path string) (io.Reader, error) {
-	cmd := exec.Command("tar", "-C", path, "-c", ".")
-	output, err := cmd.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-	if err := cmd.Start(); err != nil {
-		return nil, err
-	}
-	// FIXME: errors will not be passed because we don't wait for the command.
-	// Instead, consumers will hit EOF right away.
-	// This can be fixed by waiting for the process to exit, or for the first write
-	// on stdout, whichever comes first.
-	return output, nil
-}
-
 // Figure out the absolute path of our own binary
 // Figure out the absolute path of our own binary
 func SelfPath() string {
 func SelfPath() string {
 	path, err := exec.LookPath(os.Args[0])
 	path, err := exec.LookPath(os.Args[0])