浏览代码

move pull and import to a job

Docker-DCO-1.1-Signed-off-by: Victor Vieux <victor.vieux@docker.com> (github: vieux)
Victor Vieux 11 年之前
父节点
当前提交
9dcbdbc4b1
共有 8 个文件被更改,包括 107 次插入45 次删除
  1. 20 18
      api.go
  2. 6 1
      buildfile.go
  3. 5 2
      engine/engine.go
  4. 6 2
      engine/job.go
  5. 3 1
      integration/runtime_test.go
  6. 4 3
      integration/sorter_test.go
  7. 59 18
      server.go
  8. 4 0
      utils/streamformatter.go

+ 20 - 18
api.go

@@ -413,11 +413,11 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
 		return err
 		return err
 	}
 	}
 
 
-	src := r.Form.Get("fromSrc")
-	image := r.Form.Get("fromImage")
-	tag := r.Form.Get("tag")
-	repo := r.Form.Get("repo")
-
+	var (
+		image = r.Form.Get("fromImage")
+		tag   = r.Form.Get("tag")
+		job   *engine.Job
+	)
 	authEncoded := r.Header.Get("X-Registry-Auth")
 	authEncoded := r.Header.Get("X-Registry-Auth")
 	authConfig := &auth.AuthConfig{}
 	authConfig := &auth.AuthConfig{}
 	if authEncoded != "" {
 	if authEncoded != "" {
@@ -431,7 +431,6 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
 	if version > 1.0 {
 	if version > 1.0 {
 		w.Header().Set("Content-Type", "application/json")
 		w.Header().Set("Content-Type", "application/json")
 	}
 	}
-	sf := utils.NewStreamFormatter(version > 1.0)
 	if image != "" { //pull
 	if image != "" { //pull
 		metaHeaders := map[string][]string{}
 		metaHeaders := map[string][]string{}
 		for k, v := range r.Header {
 		for k, v := range r.Header {
@@ -439,22 +438,25 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
 				metaHeaders[k] = v
 				metaHeaders[k] = v
 			}
 			}
 		}
 		}
-		if err := srv.ImagePull(image, tag, w, sf, authConfig, metaHeaders, version > 1.3); err != nil {
-			if sf.Used() {
-				w.Write(sf.FormatError(err))
-				return nil
-			}
-			return err
-		}
+		job = srv.Eng.Job("pull", r.Form.Get("fromImage"), tag)
+		job.SetenvBool("parallel", version > 1.3)
+		job.SetenvJson("metaHeaders", metaHeaders)
+		job.SetenvJson("authConfig", authConfig)
 	} else { //import
 	} else { //import
-		if err := srv.ImageImport(src, repo, tag, r.Body, w, sf); err != nil {
-			if sf.Used() {
-				w.Write(sf.FormatError(err))
-				return nil
-			}
+		job = srv.Eng.Job("import", r.Form.Get("fromSrc"), r.Form.Get("repo"), tag)
+		job.Stdin.Add(r.Body)
+	}
+
+	job.SetenvBool("json", version > 1.0)
+	job.Stdout.Add(w)
+	if err := job.Run(); err != nil {
+		if !job.Stdout.Used() {
 			return err
 			return err
 		}
 		}
+		sf := utils.NewStreamFormatter(version > 1.0)
+		w.Write(sf.FormatError(err))
 	}
 	}
+
 	return nil
 	return nil
 }
 }
 
 

+ 6 - 1
buildfile.go

@@ -84,7 +84,12 @@ func (b *buildFile) CmdFrom(name string) error {
 				resolvedAuth := b.configFile.ResolveAuthConfig(endpoint)
 				resolvedAuth := b.configFile.ResolveAuthConfig(endpoint)
 				pullRegistryAuth = &resolvedAuth
 				pullRegistryAuth = &resolvedAuth
 			}
 			}
-			if err := b.srv.ImagePull(remote, tag, b.outOld, b.sf, pullRegistryAuth, nil, true); err != nil {
+			job := b.srv.Eng.Job("pull", remote, tag)
+			job.SetenvBool("json", b.sf.Json())
+			job.SetenvBool("parallel", true)
+			job.SetenvJson("authConfig", pullRegistryAuth)
+			job.Stdout.Add(b.outOld)
+			if err := job.Run(); err != nil {
 				return err
 				return err
 			}
 			}
 			image, err = b.runtime.repositories.LookupImage(name)
 			image, err = b.runtime.repositories.LookupImage(name)

+ 5 - 2
engine/engine.go

@@ -137,6 +137,9 @@ func (eng *Engine) Job(name string, args ...string) *Job {
 }
 }
 
 
 func (eng *Engine) Logf(format string, args ...interface{}) (n int, err error) {
 func (eng *Engine) Logf(format string, args ...interface{}) (n int, err error) {
-	prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n"))
-	return fmt.Fprintf(eng.Stderr, prefixedFormat, args...)
+	if os.Getenv("TEST") == "" {
+		prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n"))
+		return fmt.Fprintf(eng.Stderr, prefixedFormat, args...)
+	}
+	return 0, nil
 }
 }

+ 6 - 2
engine/job.go

@@ -3,6 +3,7 @@ package engine
 import (
 import (
 	"fmt"
 	"fmt"
 	"io"
 	"io"
+	"os"
 	"strings"
 	"strings"
 	"time"
 	"time"
 )
 )
@@ -176,8 +177,11 @@ func (job *Job) Environ() map[string]string {
 }
 }
 
 
 func (job *Job) Logf(format string, args ...interface{}) (n int, err error) {
 func (job *Job) Logf(format string, args ...interface{}) (n int, err error) {
-	prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n"))
-	return fmt.Fprintf(job.Stderr, prefixedFormat, args...)
+	if os.Getenv("TEST") == "" {
+		prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n"))
+		return fmt.Fprintf(job.Stderr, prefixedFormat, args...)
+	}
+	return 0, nil
 }
 }
 
 
 func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {
 func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {

+ 3 - 1
integration/runtime_test.go

@@ -137,7 +137,9 @@ func setupBaseImage() {
 	// If the unit test is not found, try to download it.
 	// If the unit test is not found, try to download it.
 	if img, err := srv.ImageInspect(unitTestImageName); err != nil || img.ID != unitTestImageID {
 	if img, err := srv.ImageInspect(unitTestImageName); err != nil || img.ID != unitTestImageID {
 		// Retrieve the Image
 		// Retrieve the Image
-		if err := srv.ImagePull(unitTestImageName, "", os.Stdout, utils.NewStreamFormatter(false), nil, nil, true); err != nil {
+		job = eng.Job("pull", unitTestImageName)
+		job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
+		if err := job.Run(); err != nil {
 			log.Fatalf("Unable to pull the test image: %s", err)
 			log.Fatalf("Unable to pull the test image: %s", err)
 		}
 		}
 	}
 	}

+ 4 - 3
integration/sorter_test.go

@@ -2,8 +2,6 @@ package docker
 
 
 import (
 import (
 	"github.com/dotcloud/docker"
 	"github.com/dotcloud/docker"
-	"github.com/dotcloud/docker/utils"
-	"io/ioutil"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -53,5 +51,8 @@ func generateImage(name string, srv *docker.Server) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	return srv.ImageImport("-", "repo", name, archive, ioutil.Discard, utils.NewStreamFormatter(true))
+	job := srv.Eng.Job("import", "-", "repo", name)
+	job.Stdin.Add(archive)
+	job.SetenvBool("json", true)
+	return job.Run()
 }
 }

+ 59 - 18
server.go

@@ -97,6 +97,8 @@ func jobInitApi(job *engine.Job) engine.Status {
 		"top":              srv.ContainerTop,
 		"top":              srv.ContainerTop,
 		"load":             srv.ImageLoad,
 		"load":             srv.ImageLoad,
 		"build":            srv.Build,
 		"build":            srv.Build,
+		"pull":             srv.ImagePull,
+		"import":           srv.ImageImport,
 	} {
 	} {
 		if err := job.Eng.Register(name, handler); err != nil {
 		if err := job.Eng.Register(name, handler); err != nil {
 			job.Error(err)
 			job.Error(err)
@@ -1312,8 +1314,25 @@ func (srv *Server) poolRemove(kind, key string) error {
 	return nil
 	return nil
 }
 }
 
 
-func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string, parallel bool) error {
-	out = utils.NewWriteFlusher(out)
+func (srv *Server) ImagePull(job *engine.Job) engine.Status {
+	if n := len(job.Args); n != 1 && n != 2 {
+		job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
+		return engine.StatusErr
+	}
+	var (
+		localName   = job.Args[0]
+		tag         string
+		sf          = utils.NewStreamFormatter(job.GetenvBool("json"))
+		out         = utils.NewWriteFlusher(job.Stdout)
+		authConfig  *auth.AuthConfig
+		metaHeaders map[string][]string
+	)
+	if len(job.Args) > 1 {
+		tag = job.Args[1]
+	}
+
+	job.GetenvJson("authConfig", authConfig)
+	job.GetenvJson("metaHeaders", metaHeaders)
 
 
 	c, err := srv.poolAdd("pull", localName+":"+tag)
 	c, err := srv.poolAdd("pull", localName+":"+tag)
 	if err != nil {
 	if err != nil {
@@ -1321,21 +1340,24 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
 			// Another pull of the same repository is already taking place; just wait for it to finish
 			// Another pull of the same repository is already taking place; just wait for it to finish
 			out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
 			out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
 			<-c
 			<-c
-			return nil
+			return engine.StatusOK
 		}
 		}
-		return err
+		job.Error(err)
+		return engine.StatusErr
 	}
 	}
 	defer srv.poolRemove("pull", localName+":"+tag)
 	defer srv.poolRemove("pull", localName+":"+tag)
 
 
 	// Resolve the Repository name from fqn to endpoint + name
 	// Resolve the Repository name from fqn to endpoint + name
 	endpoint, remoteName, err := registry.ResolveRepositoryName(localName)
 	endpoint, remoteName, err := registry.ResolveRepositoryName(localName)
 	if err != nil {
 	if err != nil {
-		return err
+		job.Error(err)
+		return engine.StatusErr
 	}
 	}
 
 
 	r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
 	r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint)
 	if err != nil {
 	if err != nil {
-		return err
+		job.Error(err)
+		return engine.StatusErr
 	}
 	}
 
 
 	if endpoint == auth.IndexServerAddress() {
 	if endpoint == auth.IndexServerAddress() {
@@ -1343,11 +1365,12 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
 		localName = remoteName
 		localName = remoteName
 	}
 	}
 
 
-	if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, parallel); err != nil {
-		return err
+	if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
+		job.Error(err)
+		return engine.StatusErr
 	}
 	}
 
 
-	return nil
+	return engine.StatusOK
 }
 }
 
 
 // Retrieve the all the images to be uploaded in the correct order
 // Retrieve the all the images to be uploaded in the correct order
@@ -1551,16 +1574,31 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo
 	return nil
 	return nil
 }
 }
 
 
-func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Writer, sf *utils.StreamFormatter) error {
-	var archive io.Reader
-	var resp *http.Response
+func (srv *Server) ImageImport(job *engine.Job) engine.Status {
+	if n := len(job.Args); n != 2 && n != 3 {
+		job.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
+		return engine.StatusErr
+	}
+	var (
+		src     = job.Args[0]
+		repo    = job.Args[1]
+		tag     string
+		sf      = utils.NewStreamFormatter(job.GetenvBool("json"))
+		out     = utils.NewWriteFlusher(job.Stdout)
+		archive io.Reader
+		resp    *http.Response
+	)
+	if len(job.Args) > 2 {
+		tag = job.Args[2]
+	}
 
 
 	if src == "-" {
 	if src == "-" {
-		archive = in
+		archive = job.Stdin
 	} else {
 	} else {
 		u, err := url.Parse(src)
 		u, err := url.Parse(src)
 		if err != nil {
 		if err != nil {
-			return err
+			job.Error(err)
+			return engine.StatusErr
 		}
 		}
 		if u.Scheme == "" {
 		if u.Scheme == "" {
 			u.Scheme = "http"
 			u.Scheme = "http"
@@ -1572,22 +1610,25 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
 		// If curl is not available, fallback to http.Get()
 		// If curl is not available, fallback to http.Get()
 		resp, err = utils.Download(u.String())
 		resp, err = utils.Download(u.String())
 		if err != nil {
 		if err != nil {
-			return err
+			job.Error(err)
+			return engine.StatusErr
 		}
 		}
 		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
 		archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
 	}
 	}
 	img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
 	img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
 	if err != nil {
 	if err != nil {
-		return err
+		job.Error(err)
+		return engine.StatusErr
 	}
 	}
 	// Optionally register the image at REPO/TAG
 	// Optionally register the image at REPO/TAG
 	if repo != "" {
 	if repo != "" {
 		if err := srv.runtime.repositories.Set(repo, tag, img.ID, true); err != nil {
 		if err := srv.runtime.repositories.Set(repo, tag, img.ID, true); err != nil {
-			return err
+			job.Error(err)
+			return engine.StatusErr
 		}
 		}
 	}
 	}
 	out.Write(sf.FormatStatus("", img.ID))
 	out.Write(sf.FormatStatus("", img.ID))
-	return nil
+	return engine.StatusOK
 }
 }
 
 
 func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
 func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {

+ 4 - 0
utils/streamformatter.go

@@ -82,3 +82,7 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgr
 func (sf *StreamFormatter) Used() bool {
 func (sf *StreamFormatter) Used() bool {
 	return sf.used
 	return sf.used
 }
 }
+
+func (sf *StreamFormatter) Json() bool {
+	return sf.json
+}