Browse Source

Push flow

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
Derek McGowan 10 years ago
parent
commit
188b56c836
6 changed files with 254 additions and 2 deletions
  1. 22 1
      api/client/commands.go
  2. 19 0
      api/server/server.go
  3. 116 0
      graph/manifest.go
  4. 90 0
      graph/push.go
  5. 1 0
      graph/service.go
  6. 6 1
      registry/session_v2.go

+ 22 - 1
api/client/commands.go

@@ -43,6 +43,7 @@ import (
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/utils"
+	"github.com/docker/libtrust"
 )
 
 const (
@@ -1215,6 +1216,26 @@ func (cli *DockerCli) CmdPush(args ...string) error {
 
 	v := url.Values{}
 	v.Set("tag", tag)
+
+	body, _, err := readBody(cli.call("GET", "/images/"+remote+"/manifest?"+v.Encode(), nil, false))
+	if err != nil {
+		return err
+	}
+
+	js, err := libtrust.NewJSONSignature(body)
+	if err != nil {
+		return err
+	}
+	err = js.Sign(cli.key)
+	if err != nil {
+		return err
+	}
+
+	signedBody, err := js.PrettySignature("signatures")
+	if err != nil {
+		return err
+	}
+
 	push := func(authConfig registry.AuthConfig) error {
 		buf, err := json.Marshal(authConfig)
 		if err != nil {
@@ -1224,7 +1245,7 @@ func (cli *DockerCli) CmdPush(args ...string) error {
 			base64.URLEncoding.EncodeToString(buf),
 		}
 
-		return cli.stream("POST", "/images/"+remote+"/push?"+v.Encode(), nil, cli.out, map[string][]string{
+		return cli.stream("POST", "/images/"+remote+"/push?"+v.Encode(), bytes.NewReader(signedBody), cli.out, map[string][]string{
 			"X-Registry-Auth": registryAuthHeader,
 		})
 	}

+ 19 - 0
api/server/server.go

@@ -608,6 +608,18 @@ func getImagesSearch(eng *engine.Engine, version version.Version, w http.Respons
 	return job.Run()
 }
 
+func getImageManifest(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if err := parseForm(r); err != nil {
+		return err
+	}
+
+	job := eng.Job("image_manifest", vars["name"])
+	job.Setenv("tag", r.Form.Get("tag"))
+	job.Stdout.Add(utils.NewWriteFlusher(w))
+
+	return job.Run()
+}
+
 func postImagesPush(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if vars == nil {
 		return fmt.Errorf("Missing parameter")
@@ -639,9 +651,15 @@ func postImagesPush(eng *engine.Engine, version version.Version, w http.Response
 		}
 	}
 
+	manifest, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		return err
+	}
+
 	job := eng.Job("push", vars["name"])
 	job.SetenvJson("metaHeaders", metaHeaders)
 	job.SetenvJson("authConfig", authConfig)
+	job.Setenv("manifest", string(manifest))
 	job.Setenv("tag", r.Form.Get("tag"))
 	if version.GreaterThan("1.0") {
 		job.SetenvBool("json", true)
@@ -1294,6 +1312,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st
 			"/images/viz":                     getImagesViz,
 			"/images/search":                  getImagesSearch,
 			"/images/get":                     getImagesGet,
+			"/images/{name:.*}/manifest":      getImageManifest,
 			"/images/{name:.*}/get":           getImagesGet,
 			"/images/{name:.*}/history":       getImagesHistory,
 			"/images/{name:.*}/json":          getImagesByName,

+ 116 - 0
graph/manifest.go

@@ -0,0 +1,116 @@
+package graph
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"path"
+
+	"github.com/docker/docker/engine"
+	"github.com/docker/docker/pkg/tarsum"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/runconfig"
+)
+
+func (s *TagStore) CmdManifest(job *engine.Job) engine.Status {
+	if len(job.Args) != 1 {
+		return job.Errorf("usage: %s NAME", job.Name)
+	}
+	name := job.Args[0]
+	tag := job.Getenv("tag")
+	if tag == "" {
+		tag = "latest"
+	}
+
+	// Resolve the Repository name from fqn to endpoint + name
+	_, remoteName, err := registry.ResolveRepositoryName(name)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	manifest := &registry.ManifestData{
+		Name:          remoteName,
+		Tag:           tag,
+		SchemaVersion: 1,
+	}
+	localRepo, exists := s.Repositories[name]
+	if !exists {
+		return job.Errorf("Repo does not exist: %s", name)
+	}
+
+	layerId, exists := localRepo[tag]
+	if !exists {
+		return job.Errorf("Tag does not exist for %s: %s", name, tag)
+	}
+	tarsums := make([]string, 0, 4)
+	layersSeen := make(map[string]bool)
+
+	layer, err := s.graph.Get(layerId)
+	if err != nil {
+		return job.Error(err)
+	}
+	if layer.Config == nil {
+		return job.Errorf("Missing layer configuration")
+	}
+	manifest.Architecture = layer.Architecture
+	var metadata runconfig.Config
+	metadata = *layer.Config
+	history := make([]string, 0, cap(tarsums))
+
+	for ; layer != nil; layer, err = layer.GetParent() {
+		if err != nil {
+			return job.Error(err)
+		}
+
+		if layersSeen[layer.ID] {
+			break
+		}
+		if layer.Config != nil && metadata.Image != layer.ID {
+			err = runconfig.Merge(&metadata, layer.Config)
+			if err != nil {
+				return job.Error(err)
+			}
+		}
+
+		archive, err := layer.TarLayer()
+		if err != nil {
+			return job.Error(err)
+		}
+
+		tarSum, err := tarsum.NewTarSum(archive, true, tarsum.Version0)
+		if err != nil {
+			return job.Error(err)
+		}
+		if _, err := io.Copy(ioutil.Discard, tarSum); err != nil {
+			return job.Error(err)
+		}
+
+		tarId := tarSum.Sum(nil)
+		// Save tarsum to image json
+
+		tarsums = append(tarsums, tarId)
+
+		layersSeen[layer.ID] = true
+		jsonData, err := ioutil.ReadFile(path.Join(s.graph.Root, layer.ID, "json"))
+		if err != nil {
+			return job.Error(fmt.Errorf("Cannot retrieve the path for {%s}: %s", layer.ID, err))
+		}
+		history = append(history, string(jsonData))
+	}
+
+	manifest.BlobSums = tarsums
+	manifest.History = history
+
+	manifestBytes, err := json.MarshalIndent(manifest, "", "   ")
+	if err != nil {
+		return job.Error(err)
+	}
+
+	_, err = job.Stdout.Write(manifestBytes)
+	if err != nil {
+		return job.Error(err)
+	}
+
+	return engine.StatusOK
+}

+ 90 - 0
graph/push.go

@@ -1,15 +1,18 @@
 package graph
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"io/ioutil"
 	"os"
 	"path"
+	"strings"
 	"sync"
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/engine"
+	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/archive"
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/utils"
@@ -267,6 +270,7 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
 	}
 
 	tag := job.Getenv("tag")
+	manifestBytes := job.Getenv("manifest")
 	job.GetenvJson("authConfig", authConfig)
 	job.GetenvJson("metaHeaders", &metaHeaders)
 
@@ -286,6 +290,92 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
 		return job.Error(err2)
 	}
 
+	var isOfficial bool
+	if endpoint.String() == registry.IndexServerAddress() {
+		isOfficial = isOfficialName(remoteName)
+		if isOfficial && strings.IndexRune(remoteName, '/') == -1 {
+			remoteName = "library/" + remoteName
+		}
+	}
+
+	if len(tag) == 0 {
+		tag = DEFAULTTAG
+	}
+	if isOfficial || endpoint.Version == registry.APIVersion2 {
+		j := job.Eng.Job("trust_update_base")
+		if err = j.Run(); err != nil {
+			return job.Errorf("error updating trust base graph: %s", err)
+		}
+
+		repoData, err := r.PushImageJSONIndex(remoteName, []*registry.ImgData{}, false, nil)
+		if err != nil {
+			return job.Error(err)
+		}
+
+		// try via manifest
+		manifest, verified, err := s.verifyManifest(job.Eng, []byte(manifestBytes))
+		if err != nil {
+			return job.Errorf("error verifying manifest: %s", err)
+		}
+
+		if len(manifest.FSLayers) != len(manifest.History) {
+			return job.Errorf("length of history not equal to number of layers")
+		}
+
+		if !verified {
+			log.Debugf("Pushing unverified image")
+		}
+
+		for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
+			var (
+				sumStr  = manifest.FSLayers[i].BlobSum
+				imgJSON = []byte(manifest.History[i].V1Compatibility)
+			)
+
+			sumParts := strings.SplitN(sumStr, ":", 2)
+			if len(sumParts) < 2 {
+				return job.Errorf("Invalid checksum: %s", sumStr)
+			}
+			manifestSum := sumParts[1]
+
+			// for each layer, check if it exists ...
+			// XXX wait this requires having the TarSum of the layer.tar first
+			// skip this step for now. Just push the layer every time for this naive implementation
+			//shouldPush, err := r.PostV2ImageMountBlob(imageName, sumType, sum string, token []string)
+
+			img, err := image.NewImgJSON(imgJSON)
+			if err != nil {
+				return job.Errorf("Failed to parse json: %s", err)
+			}
+
+			img, err = s.graph.Get(img.ID)
+			if err != nil {
+				return job.Error(err)
+			}
+
+			arch, err := img.TarLayer()
+			if err != nil {
+				return job.Errorf("Could not get tar layer: %s", err)
+			}
+
+			_, err = r.PutV2ImageBlob(remoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), job.Stdout, sf, false, utils.TruncateID(img.ID), "Pushing"), repoData.Tokens)
+			if err != nil {
+				job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
+				return job.Error(err)
+			}
+			job.Stdout.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
+		}
+
+		// push the manifest
+		err = r.PutV2ImageManifest(remoteName, tag, bytes.NewReader([]byte(manifestBytes)), repoData.Tokens)
+		if err != nil {
+			return job.Error(err)
+		}
+
+		// done, no fallback to V1
+		return engine.StatusOK
+	}
+
 	if err != nil {
 		reposLen := 1
 		if tag == "" {

+ 1 - 0
graph/service.go

@@ -25,6 +25,7 @@ func (s *TagStore) Install(eng *engine.Engine) error {
 		"import":         s.CmdImport,
 		"pull":           s.CmdPull,
 		"push":           s.CmdPush,
+		"image_manifest": s.CmdManifest,
 	} {
 		if err := eng.Register(name, handler); err != nil {
 			return fmt.Errorf("Could not register %q: %v", name, err)

+ 6 - 1
registry/session_v2.go

@@ -267,7 +267,7 @@ func (r *Session) GetV2ImageBlobReader(imageName, sumType, sum string, token []s
 // Push the image to the server for storage.
 // 'layer' is an uncompressed reader of the blob to be pushed.
 // The server will generate it's own checksum calculation.
-func (r *Session) PutV2ImageBlob(imageName, sumType string, blobRdr io.Reader, token []string) (serverChecksum string, err error) {
+func (r *Session) PutV2ImageBlob(imageName, sumType, sumStr string, blobRdr io.Reader, token []string) (serverChecksum string, err error) {
 	vars := map[string]string{
 		"imagename": imageName,
 		"sumtype":   sumType,
@@ -285,6 +285,7 @@ func (r *Session) PutV2ImageBlob(imageName, sumType string, blobRdr io.Reader, t
 		return "", err
 	}
 	setTokenAuth(req, token)
+	req.Header.Set("X-Tarsum", sumStr)
 	res, _, err := r.doRequest(req)
 	if err != nil {
 		return "", err
@@ -309,6 +310,10 @@ func (r *Session) PutV2ImageBlob(imageName, sumType string, blobRdr io.Reader, t
 		return "", fmt.Errorf("unable to decode PutV2ImageBlob JSON response: %s", err)
 	}
 
+	if sumInfo.Checksum != sumStr {
+		return "", fmt.Errorf("failed checksum comparison. serverChecksum: %q, localChecksum: %q", sumInfo.Checksum, sumStr)
+	}
+
 	// XXX this is a json struct from the registry, with its checksum
 	return sumInfo.Checksum, nil
 }