浏览代码

Merge pull request #12505 from ZJU-SEL/remove_job_from_push

remove job from push
Alexander Morozov 10 年之前
父节点
当前提交
2351b87551
共有 3 个文件被更改,包括 33 次插入34 次删除
  1. 16 15
      api/server/server.go
  2. 17 18
      graph/push.go
  3. 0 1
      graph/service.go

+ 16 - 15
api/server/server.go

@@ -858,25 +858,26 @@ func (s *Server) postImagesPush(eng *engine.Engine, version version.Version, w h
 		}
 		}
 	}
 	}
 
 
-	job := eng.Job("push", vars["name"])
-	job.SetenvJson("metaHeaders", metaHeaders)
-	job.SetenvJson("authConfig", authConfig)
-	job.Setenv("tag", r.Form.Get("tag"))
-	if version.GreaterThan("1.0") {
-		job.SetenvBool("json", true)
-		streamJSON(job.Stdout, w, true)
-	} else {
-		job.Stdout.Add(utils.NewWriteFlusher(w))
+	useJSON := version.GreaterThan("1.0")
+	name := vars["name"]
+
+	imagePushConfig := &graph.ImagePushConfig{
+		MetaHeaders: metaHeaders,
+		AuthConfig:  authConfig,
+		Tag:         r.Form.Get("tag"),
+		OutStream:   utils.NewWriteFlusher(w),
+		Json:        useJSON,
+	}
+	if useJSON {
+		w.Header().Set("Content-Type", "application/json")
 	}
 	}
 
 
-	if err := job.Run(); err != nil {
-		if !job.Stdout.Used() {
-			return err
-		}
-		sf := streamformatter.NewStreamFormatter(version.GreaterThan("1.0"))
-		w.Write(sf.FormatError(err))
+	if err := s.daemon.Repositories().Push(name, imagePushConfig); err != nil {
+		sf := streamformatter.NewStreamFormatter(useJSON)
+		return fmt.Errorf(string(sf.FormatError(err)))
 	}
 	}
 	return nil
 	return nil
+
 }
 }
 
 
 func (s *Server) getImagesGet(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 func (s *Server) getImagesGet(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

+ 17 - 18
graph/push.go

@@ -12,7 +12,6 @@ import (
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/distribution/digest"
 	"github.com/docker/distribution/digest"
-	"github.com/docker/docker/engine"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/streamformatter"
 	"github.com/docker/docker/pkg/streamformatter"
@@ -25,6 +24,14 @@ import (
 
 
 var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
 var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
 
 
+type ImagePushConfig struct {
+	MetaHeaders map[string][]string
+	AuthConfig  *registry.AuthConfig
+	Tag         string
+	Json        bool
+	OutStream   io.Writer
+}
+
 // Retrieve the all the images to be uploaded in the correct order
 // Retrieve the all the images to be uploaded in the correct order
 func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
 func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
 	var (
 	var (
@@ -486,15 +493,9 @@ func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *
 }
 }
 
 
 // FIXME: Allow to interrupt current push when new push of same image is done.
 // FIXME: Allow to interrupt current push when new push of same image is done.
-func (s *TagStore) CmdPush(job *engine.Job) error {
-	if n := len(job.Args); n != 1 {
-		return fmt.Errorf("Usage: %s IMAGE", job.Name)
-	}
+func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
 	var (
 	var (
-		localName   = job.Args[0]
-		sf          = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
-		authConfig  = &registry.AuthConfig{}
-		metaHeaders map[string][]string
+		sf = streamformatter.NewStreamFormatter(imagePushConfig.Json)
 	)
 	)
 
 
 	// Resolve the Repository name from fqn to RepositoryInfo
 	// Resolve the Repository name from fqn to RepositoryInfo
@@ -503,10 +504,6 @@ func (s *TagStore) CmdPush(job *engine.Job) error {
 		return err
 		return err
 	}
 	}
 
 
-	tag := job.Getenv("tag")
-	job.GetenvJson("authConfig", authConfig)
-	job.GetenvJson("metaHeaders", &metaHeaders)
-
 	if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
 	if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
 		return err
 		return err
 	}
 	}
@@ -517,16 +514,18 @@ func (s *TagStore) CmdPush(job *engine.Job) error {
 		return err
 		return err
 	}
 	}
 
 
-	r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
+	r, err := registry.NewSession(imagePushConfig.AuthConfig, registry.HTTPRequestFactory(imagePushConfig.MetaHeaders), endpoint, false)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	reposLen := 1
 	reposLen := 1
-	if tag == "" {
+	if imagePushConfig.Tag == "" {
 		reposLen = len(s.Repositories[repoInfo.LocalName])
 		reposLen = len(s.Repositories[repoInfo.LocalName])
 	}
 	}
-	job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
+
+	imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
+
 	// If it fails, try to get the repository
 	// If it fails, try to get the repository
 	localRepo, exists := s.Repositories[repoInfo.LocalName]
 	localRepo, exists := s.Repositories[repoInfo.LocalName]
 	if !exists {
 	if !exists {
@@ -534,7 +533,7 @@ func (s *TagStore) CmdPush(job *engine.Job) error {
 	}
 	}
 
 
 	if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
 	if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
-		err := s.pushV2Repository(r, localRepo, job.Stdout, repoInfo, tag, sf)
+		err := s.pushV2Repository(r, localRepo, imagePushConfig.OutStream, repoInfo, imagePushConfig.Tag, sf)
 		if err == nil {
 		if err == nil {
 			s.eventsService.Log("push", repoInfo.LocalName, "")
 			s.eventsService.Log("push", repoInfo.LocalName, "")
 			return nil
 			return nil
@@ -545,7 +544,7 @@ func (s *TagStore) CmdPush(job *engine.Job) error {
 		}
 		}
 	}
 	}
 
 
-	if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
+	if err := s.pushRepository(r, imagePushConfig.OutStream, repoInfo, localRepo, imagePushConfig.Tag, sf); err != nil {
 		return err
 		return err
 	}
 	}
 	s.eventsService.Log("push", repoInfo.LocalName, "")
 	s.eventsService.Log("push", repoInfo.LocalName, "")

+ 0 - 1
graph/service.go

@@ -13,7 +13,6 @@ func (s *TagStore) Install(eng *engine.Engine) error {
 		"image_inspect": s.CmdLookup,
 		"image_inspect": s.CmdLookup,
 		"image_export":  s.CmdImageExport,
 		"image_export":  s.CmdImageExport,
 		"viz":           s.CmdViz,
 		"viz":           s.CmdViz,
-		"push":          s.CmdPush,
 	} {
 	} {
 		if err := eng.Register(name, handler); err != nil {
 		if err := eng.Register(name, handler); err != nil {
 			return fmt.Errorf("Could not register %q: %v", name, err)
 			return fmt.Errorf("Could not register %q: %v", name, err)