فهرست منبع

remove job from pull and import

Closes #12396

Signed-off-by: Simei He <hesimei@zju.edu.cn>

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Simei He 10 سال پیش
والد
کامیت
6e38a53f96
7فایلهای تغییر یافته به همراه89 افزوده شده و 71 حذف شده
  1. 35 19
      api/server/server.go
  2. 12 6
      builder/internals.go
  3. 18 19
      graph/import.go
  4. 15 22
      graph/pull.go
  5. 0 2
      graph/service.go
  6. 1 0
      integration-cli/utils.go
  7. 8 3
      integration/runtime_test.go

+ 35 - 19
api/server/server.go

@@ -639,7 +639,6 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
 		image = r.Form.Get("fromImage")
 		repo  = r.Form.Get("repo")
 		tag   = r.Form.Get("tag")
-		job   *engine.Job
 	)
 	authEncoded := r.Header.Get("X-Registry-Auth")
 	authConfig := &registry.AuthConfig{}
@@ -651,6 +650,9 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
 			authConfig = &registry.AuthConfig{}
 		}
 	}
+
+	d := getDaemon(eng)
+
 	if image != "" { //pull
 		if tag == "" {
 			image, tag = parsers.ParseRepositoryTag(image)
@@ -661,31 +663,45 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
 				metaHeaders[k] = v
 			}
 		}
-		job = eng.Job("pull", image, tag)
-		job.SetenvBool("parallel", version.GreaterThan("1.3"))
-		job.SetenvJson("metaHeaders", metaHeaders)
-		job.SetenvJson("authConfig", authConfig)
+
+		imagePullConfig := &graph.ImagePullConfig{
+			Parallel:    version.GreaterThan("1.3"),
+			MetaHeaders: metaHeaders,
+			AuthConfig:  authConfig,
+			OutStream:   utils.NewWriteFlusher(w),
+		}
+		if version.GreaterThan("1.0") {
+			imagePullConfig.Json = true
+			w.Header().Set("Content-Type", "application/json")
+		} else {
+			imagePullConfig.Json = false
+		}
+
+		if err := d.Repositories().Pull(image, tag, imagePullConfig, eng); err != nil {
+			return err
+		}
 	} else { //import
 		if tag == "" {
 			repo, tag = parsers.ParseRepositoryTag(repo)
 		}
-		job = eng.Job("import", r.Form.Get("fromSrc"), repo, tag)
-		job.Stdin.Add(r.Body)
-		job.SetenvList("changes", r.Form["changes"])
-	}
 
-	if version.GreaterThan("1.0") {
-		job.SetenvBool("json", true)
-		streamJSON(job, w, true)
-	} else {
-		job.Stdout.Add(utils.NewWriteFlusher(w))
-	}
-	if err := job.Run(); err != nil {
-		if !job.Stdout.Used() {
+		src := r.Form.Get("fromSrc")
+		imageImportConfig := &graph.ImageImportConfig{
+			Changes:   r.Form["changes"],
+			InConfig:  r.Body,
+			OutStream: utils.NewWriteFlusher(w),
+		}
+		if version.GreaterThan("1.0") {
+			imageImportConfig.Json = true
+			w.Header().Set("Content-Type", "application/json")
+		} else {
+			imageImportConfig.Json = false
+		}
+
+		if err := d.Repositories().Import(src, repo, tag, imageImportConfig, eng); err != nil {
 			return err
 		}
-		sf := streamformatter.NewStreamFormatter(version.GreaterThan("1.0"))
-		w.Write(sf.FormatError(err))
+
 	}
 
 	return nil

+ 12 - 6
builder/internals.go

@@ -22,6 +22,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/builder/parser"
 	"github.com/docker/docker/daemon"
+	"github.com/docker/docker/graph"
 	imagepkg "github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/archive"
 	"github.com/docker/docker/pkg/chrootarchive"
@@ -434,7 +435,7 @@ func (b *Builder) pullImage(name string) (*imagepkg.Image, error) {
 	if tag == "" {
 		tag = "latest"
 	}
-	job := b.Engine.Job("pull", remote, tag)
+
 	pullRegistryAuth := b.AuthConfig
 	if len(b.AuthConfigFile.Configs) > 0 {
 		// The request came with a full auth config file, we prefer to use that
@@ -445,13 +446,18 @@ func (b *Builder) pullImage(name string) (*imagepkg.Image, error) {
 		resolvedAuth := b.AuthConfigFile.ResolveAuthConfig(repoInfo.Index)
 		pullRegistryAuth = &resolvedAuth
 	}
-	job.SetenvBool("json", b.StreamFormatter.Json())
-	job.SetenvBool("parallel", true)
-	job.SetenvJson("authConfig", pullRegistryAuth)
-	job.Stdout.Add(ioutils.NopWriteCloser(b.OutOld))
-	if err := job.Run(); err != nil {
+
+	imagePullConfig := &graph.ImagePullConfig{
+		Parallel:   true,
+		AuthConfig: pullRegistryAuth,
+		OutStream:  ioutils.NopWriteCloser(b.OutOld),
+		Json:       b.StreamFormatter.Json(),
+	}
+
+	if err := b.Daemon.Repositories().Pull(remote, tag, imagePullConfig, b.Engine); err != nil {
 		return nil, err
 	}
+
 	image, err := b.Daemon.Repositories().LookupImage(name)
 	if err != nil {
 		return nil, err

+ 18 - 19
graph/import.go

@@ -3,7 +3,7 @@ package graph
 import (
 	"bytes"
 	"encoding/json"
-	"fmt"
+	"io"
 	"net/http"
 	"net/url"
 
@@ -16,26 +16,25 @@ import (
 	"github.com/docker/docker/utils"
 )
 
-func (s *TagStore) CmdImport(job *engine.Job) error {
-	if n := len(job.Args); n != 2 && n != 3 {
-		return fmt.Errorf("Usage: %s SRC REPO [TAG]", job.Name)
-	}
+type ImageImportConfig struct {
+	Changes   []string
+	InConfig  io.ReadCloser
+	Json      bool
+	OutStream io.Writer
+	//OutStream WriteFlusher
+}
+
+func (s *TagStore) Import(src string, repo string, tag string, imageImportConfig *ImageImportConfig, eng *engine.Engine) error {
 	var (
-		src          = job.Args[0]
-		repo         = job.Args[1]
-		tag          string
-		sf           = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
+		sf           = streamformatter.NewStreamFormatter(imageImportConfig.Json)
 		archive      archive.ArchiveReader
 		resp         *http.Response
 		stdoutBuffer = bytes.NewBuffer(nil)
 		newConfig    runconfig.Config
 	)
-	if len(job.Args) > 2 {
-		tag = job.Args[2]
-	}
 
 	if src == "-" {
-		archive = job.Stdin
+		archive = imageImportConfig.InConfig
 	} else {
 		u, err := url.Parse(src)
 		if err != nil {
@@ -46,14 +45,14 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
 			u.Host = src
 			u.Path = ""
 		}
-		job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u))
+		imageImportConfig.OutStream.Write(sf.FormatStatus("", "Downloading from %s", u))
 		resp, err = httputils.Download(u.String())
 		if err != nil {
 			return err
 		}
 		progressReader := progressreader.New(progressreader.Config{
 			In:        resp.Body,
-			Out:       job.Stdout,
+			Out:       imageImportConfig.OutStream,
 			Formatter: sf,
 			Size:      int(resp.ContentLength),
 			NewLines:  true,
@@ -64,11 +63,11 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
 		archive = progressReader
 	}
 
-	buildConfigJob := job.Eng.Job("build_config")
+	buildConfigJob := eng.Job("build_config")
 	buildConfigJob.Stdout.Add(stdoutBuffer)
-	buildConfigJob.Setenv("changes", job.Getenv("changes"))
+	buildConfigJob.SetenvList("changes", imageImportConfig.Changes)
 	// FIXME this should be remove when we remove deprecated config param
-	buildConfigJob.Setenv("config", job.Getenv("config"))
+	//buildConfigJob.Setenv("config", job.Getenv("config"))
 
 	if err := buildConfigJob.Run(); err != nil {
 		return err
@@ -87,7 +86,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
 			return err
 		}
 	}
-	job.Stdout.Write(sf.FormatStatus("", img.ID))
+	imageImportConfig.OutStream.Write(sf.FormatStatus("", img.ID))
 	logID := img.ID
 	if tag != "" {
 		logID = utils.ImageReference(logID, tag)

+ 15 - 22
graph/pull.go

@@ -21,37 +21,30 @@ import (
 	"github.com/docker/docker/utils"
 )
 
-func (s *TagStore) CmdPull(job *engine.Job) error {
-	if n := len(job.Args); n != 1 && n != 2 {
-		return fmt.Errorf("Usage: %s IMAGE [TAG|DIGEST]", job.Name)
-	}
+type ImagePullConfig struct {
+	Parallel    bool
+	MetaHeaders map[string][]string
+	AuthConfig  *registry.AuthConfig
+	Json        bool
+	OutStream   io.Writer
+}
 
+func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConfig, eng *engine.Engine) error {
 	var (
-		localName   = job.Args[0]
-		tag         string
-		sf          = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
-		authConfig  = &registry.AuthConfig{}
-		metaHeaders map[string][]string
+		sf = streamformatter.NewStreamFormatter(imagePullConfig.Json)
 	)
 
 	// Resolve the Repository name from fqn to RepositoryInfo
-	repoInfo, err := s.registryService.ResolveRepository(localName)
+	repoInfo, err := s.registryService.ResolveRepository(image)
 	if err != nil {
 		return err
 	}
 
-	if len(job.Args) > 1 {
-		tag = job.Args[1]
-	}
-
-	job.GetenvJson("authConfig", authConfig)
-	job.GetenvJson("metaHeaders", &metaHeaders)
-
 	c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag))
 	if err != nil {
 		if c != nil {
 			// Another pull of the same repository is already taking place; just wait for it to finish
-			job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
+			imagePullConfig.OutStream.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
 			<-c
 			return nil
 		}
@@ -65,7 +58,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
 		return err
 	}
 
-	r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
+	r, err := registry.NewSession(imagePullConfig.AuthConfig, registry.HTTPRequestFactory(imagePullConfig.MetaHeaders), endpoint, true)
 	if err != nil {
 		return err
 	}
@@ -77,14 +70,14 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
 
 	if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
 		if repoInfo.Official {
-			j := job.Eng.Job("trust_update_base")
+			j := eng.Job("trust_update_base")
 			if err = j.Run(); err != nil {
 				logrus.Errorf("error updating trust base graph: %s", err)
 			}
 		}
 
 		logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
-		if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil {
+		if err := s.pullV2Repository(eng, r, imagePullConfig.OutStream, repoInfo, tag, sf, imagePullConfig.Parallel); err == nil {
 			s.eventsService.Log("pull", logName, "")
 			return nil
 		} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
@@ -95,7 +88,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
 	}
 
 	logrus.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName)
-	if err = s.pullRepository(r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err != nil {
+	if err = s.pullRepository(r, imagePullConfig.OutStream, repoInfo, tag, sf, imagePullConfig.Parallel); err != nil {
 		return err
 	}
 

+ 0 - 2
graph/service.go

@@ -19,8 +19,6 @@ func (s *TagStore) Install(eng *engine.Engine) error {
 		"image_export":   s.CmdImageExport,
 		"viz":            s.CmdViz,
 		"load":           s.CmdLoad,
-		"import":         s.CmdImport,
-		"pull":           s.CmdPull,
 		"push":           s.CmdPush,
 	} {
 		if err := eng.Register(name, handler); err != nil {

+ 1 - 0
integration-cli/utils.go

@@ -143,6 +143,7 @@ func runCommandPipelineWithOutput(cmds ...*exec.Cmd) (output string, exitCode in
 		if i > 0 {
 			prevCmd := cmds[i-1]
 			cmd.Stdin, err = prevCmd.StdoutPipe()
+
 			if err != nil {
 				return "", 0, fmt.Errorf("cannot set stdout pipe for %s: %v", cmd.Path, err)
 			}

+ 8 - 3
integration/runtime_test.go

@@ -27,6 +27,7 @@ import (
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/reexec"
 	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/registry"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/utils"
 )
@@ -132,9 +133,13 @@ func setupBaseImage() {
 	// If the unit test is not found, try to download it.
 	if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID {
 		// Retrieve the Image
-		job = eng.Job("pull", unitTestImageName)
-		job.Stdout.Add(ioutils.NopWriteCloser(os.Stdout))
-		if err := job.Run(); err != nil {
+		imagePullConfig := &graph.ImagePullConfig{
+			Parallel:   true,
+			OutStream:  ioutils.NopWriteCloser(os.Stdout),
+			AuthConfig: &registry.AuthConfig{},
+		}
+		d := getDaemon(eng)
+		if err := d.Repositories().Pull(unitTestImageName, "", imagePullConfig, eng); err != nil {
 			logrus.Fatalf("Unable to pull the test image: %s", err)
 		}
 	}