Browse Source

Add distribution package

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
Aaron Lehmann 9 years ago
parent
commit
694df3ff9f

+ 38 - 0
distribution/fixtures/validate_manifest/bad_manifest

@@ -0,0 +1,38 @@
+{
+   "schemaVersion": 2,
+   "name": "library/hello-world",
+   "tag": "latest",
+   "architecture": "amd64",
+   "fsLayers": [
+      {
+         "blobSum": "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
+      },
+      {
+         "blobSum": "sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"
+      }
+   ],
+   "history": [
+      {
+         "v1Compatibility": "{\"id\":\"af340544ed62de0680f441c71fa1a80cb084678fed42bae393e543faea3a572c\",\"parent\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.608577814Z\",\"container\":\"c2b715156f640c7ac7d98472ea24335aba5432a1323a3bb722697e6d37ef794f\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) CMD [\\\"/hello\\\"]\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/hello\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":0}\n"
+      },
+      {
+         "v1Compatibility": "{\"id\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.241352727Z\",\"container\":\"9aeb0006ffa72a8287564caaea87625896853701459261d3b569e320c0c9d5dc\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) COPY file:4abd3bff60458ca3b079d7b131ce26b2719055a030dfa96ff827da2b7c7038a7 in /\"],\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":960}\n"
+      }
+   ],
+   "signatures": [
+      {
+         "header": {
+            "jwk": {
+               "crv": "P-256",
+               "kid": "OIH7:HQFS:44FK:45VB:3B53:OIAG:TPL4:ATF5:6PNE:MGHN:NHQX:2GE4",
+               "kty": "EC",
+               "x": "Cu_UyxwLgHzE9rvlYSmvVdqYCXY42E9eNhBb0xNv0SQ",
+               "y": "zUsjWJkeKQ5tv7S-hl1Tg71cd-CqnrtiiLxSi6N_yc8"
+            },
+            "alg": "ES256"
+         },
+         "signature": "Y6xaFz9Sy-OtcnKQS1Ilq3Dh8cu4h3nBTJCpOTF1XF7vKtcxxA_xMP8-SgDo869SJ3VsvgPL9-Xn-OoYG2rb1A",
+         "protected": "eyJmb3JtYXRMZW5ndGgiOjMxOTcsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS0wOS0xMVQwNDoxMzo0OFoifQ"
+      }
+   ]
+}

+ 46 - 0
distribution/fixtures/validate_manifest/extra_data_manifest

@@ -0,0 +1,46 @@
+{
+   "schemaVersion": 1,
+   "name": "library/hello-world",
+   "tag": "latest",
+   "architecture": "amd64",
+   "fsLayers": [
+      {
+         "blobSum": "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
+      },
+      {
+         "blobSum": "sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"
+      }
+   ],
+   "history": [
+      {
+         "v1Compatibility": "{\"id\":\"af340544ed62de0680f441c71fa1a80cb084678fed42bae393e543faea3a572c\",\"parent\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.608577814Z\",\"container\":\"c2b715156f640c7ac7d98472ea24335aba5432a1323a3bb722697e6d37ef794f\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) CMD [\\\"/hello\\\"]\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/hello\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":0}\n"
+      },
+      {
+         "v1Compatibility": "{\"id\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.241352727Z\",\"container\":\"9aeb0006ffa72a8287564caaea87625896853701459261d3b569e320c0c9d5dc\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) COPY file:4abd3bff60458ca3b079d7b131ce26b2719055a030dfa96ff827da2b7c7038a7 in /\"],\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":960}\n"
+      }
+   ],
+   "fsLayers": [
+      {
+         "blobSum": "sha256:ffff95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
+      },
+      {
+         "blobSum": "sha256:ffff658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"
+      }
+   ],
+   "signatures": [
+      {
+         "header": {
+            "jwk": {
+               "crv": "P-256",
+               "kid": "OIH7:HQFS:44FK:45VB:3B53:OIAG:TPL4:ATF5:6PNE:MGHN:NHQX:2GE4",
+               "kty": "EC",
+               "x": "Cu_UyxwLgHzE9rvlYSmvVdqYCXY42E9eNhBb0xNv0SQ",
+               "y": "zUsjWJkeKQ5tv7S-hl1Tg71cd-CqnrtiiLxSi6N_yc8"
+            },
+            "alg": "ES256"
+         },
+         "signature": "Y6xaFz9Sy-OtcnKQS1Ilq3Dh8cu4h3nBTJCpOTF1XF7vKtcxxA_xMP8-SgDo869SJ3VsvgPL9-Xn-OoYG2rb1A",
+         "protected": "eyJmb3JtYXRMZW5ndGgiOjMxOTcsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS0wOS0xMVQwNDoxMzo0OFoifQ"
+      }
+   ]
+}

+ 38 - 0
distribution/fixtures/validate_manifest/good_manifest

@@ -0,0 +1,38 @@
+{
+   "schemaVersion": 1,
+   "name": "library/hello-world",
+   "tag": "latest",
+   "architecture": "amd64",
+   "fsLayers": [
+      {
+         "blobSum": "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"
+      },
+      {
+         "blobSum": "sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"
+      }
+   ],
+   "history": [
+      {
+         "v1Compatibility": "{\"id\":\"af340544ed62de0680f441c71fa1a80cb084678fed42bae393e543faea3a572c\",\"parent\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.608577814Z\",\"container\":\"c2b715156f640c7ac7d98472ea24335aba5432a1323a3bb722697e6d37ef794f\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) CMD [\\\"/hello\\\"]\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/hello\"],\"Image\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":{}},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":0}\n"
+      },
+      {
+         "v1Compatibility": "{\"id\":\"535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912\",\"created\":\"2015-08-06T23:53:22.241352727Z\",\"container\":\"9aeb0006ffa72a8287564caaea87625896853701459261d3b569e320c0c9d5dc\",\"container_config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":[\"/bin/sh\",\"-c\",\"#(nop) COPY file:4abd3bff60458ca3b079d7b131ce26b2719055a030dfa96ff827da2b7c7038a7 in /\"],\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"docker_version\":\"1.7.1\",\"config\":{\"Hostname\":\"9aeb0006ffa7\",\"Domainname\":\"\",\"User\":\"\",\"AttachStdin\":false,\"AttachStdout\":false,\"AttachStderr\":false,\"PortSpecs\":null,\"ExposedPorts\":null,\"Tty\":false,\"OpenStdin\":false,\"StdinOnce\":false,\"Env\":null,\"Cmd\":null,\"Image\":\"\",\"Volumes\":null,\"VolumeDriver\":\"\",\"WorkingDir\":\"\",\"Entrypoint\":null,\"NetworkDisabled\":false,\"MacAddress\":\"\",\"OnBuild\":null,\"Labels\":null},\"architecture\":\"amd64\",\"os\":\"linux\",\"Size\":960}\n"
+      }
+   ],
+   "signatures": [
+      {
+         "header": {
+            "jwk": {
+               "crv": "P-256",
+               "kid": "OIH7:HQFS:44FK:45VB:3B53:OIAG:TPL4:ATF5:6PNE:MGHN:NHQX:2GE4",
+               "kty": "EC",
+               "x": "Cu_UyxwLgHzE9rvlYSmvVdqYCXY42E9eNhBb0xNv0SQ",
+               "y": "zUsjWJkeKQ5tv7S-hl1Tg71cd-CqnrtiiLxSi6N_yc8"
+            },
+            "alg": "ES256"
+         },
+         "signature": "Y6xaFz9Sy-OtcnKQS1Ilq3Dh8cu4h3nBTJCpOTF1XF7vKtcxxA_xMP8-SgDo869SJ3VsvgPL9-Xn-OoYG2rb1A",
+         "protected": "eyJmb3JtYXRMZW5ndGgiOjMxOTcsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS0wOS0xMVQwNDoxMzo0OFoifQ"
+      }
+   ]
+}

+ 100 - 0
distribution/metadata/blobsum_service.go

@@ -0,0 +1,100 @@
+package metadata
+
+import (
+	"encoding/json"
+
+	"github.com/docker/distribution/digest"
+	"github.com/docker/docker/layer"
+)
+
+// BlobSumService maps layer IDs to a set of known blobsums for
+// the layer.
+type BlobSumService struct {
+	store Store
+}
+
+// maxBlobSums is the number of blobsums to keep per layer DiffID.
+const maxBlobSums = 5
+
+// NewBlobSumService creates a new blobsum mapping service.
+func NewBlobSumService(store Store) *BlobSumService {
+	return &BlobSumService{
+		store: store,
+	}
+}
+
+func (blobserv *BlobSumService) diffIDNamespace() string {
+	return "blobsum-storage"
+}
+
+func (blobserv *BlobSumService) blobSumNamespace() string {
+	return "blobsum-lookup"
+}
+
+func (blobserv *BlobSumService) diffIDKey(diffID layer.DiffID) string {
+	return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex()
+}
+
+func (blobserv *BlobSumService) blobSumKey(blobsum digest.Digest) string {
+	return string(blobsum.Algorithm()) + "/" + blobsum.Hex()
+}
+
+// GetBlobSums finds the blobsums associated with a layer DiffID.
+func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Digest, error) {
+	jsonBytes, err := blobserv.store.Get(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID))
+	if err != nil {
+		return nil, err
+	}
+
+	var blobsums []digest.Digest
+	if err := json.Unmarshal(jsonBytes, &blobsums); err != nil {
+		return nil, err
+	}
+
+	return blobsums, nil
+}
+
+// GetDiffID finds a layer DiffID from a blobsum hash.
+func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID, error) {
+	diffIDBytes, err := blobserv.store.Get(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum))
+	if err != nil {
+		return layer.DiffID(""), err
+	}
+
+	return layer.DiffID(diffIDBytes), nil
+}
+
+// Add associates a blobsum with a layer DiffID. If too many blobsums are
+// present, the oldest one is dropped.
+func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest) error {
+	oldBlobSums, err := blobserv.GetBlobSums(diffID)
+	if err != nil {
+		oldBlobSums = nil
+	}
+	newBlobSums := make([]digest.Digest, 0, len(oldBlobSums)+1)
+
+	// Copy all other blobsums to new slice
+	for _, oldSum := range oldBlobSums {
+		if oldSum != blobsum {
+			newBlobSums = append(newBlobSums, oldSum)
+		}
+	}
+
+	newBlobSums = append(newBlobSums, blobsum)
+
+	if len(newBlobSums) > maxBlobSums {
+		newBlobSums = newBlobSums[len(newBlobSums)-maxBlobSums:]
+	}
+
+	jsonBytes, err := json.Marshal(newBlobSums)
+	if err != nil {
+		return err
+	}
+
+	err = blobserv.store.Set(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID), jsonBytes)
+	if err != nil {
+		return err
+	}
+
+	return blobserv.store.Set(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum), []byte(diffID))
+}

+ 105 - 0
distribution/metadata/blobsum_service_test.go

@@ -0,0 +1,105 @@
+package metadata
+
+import (
+	"io/ioutil"
+	"os"
+	"reflect"
+	"testing"
+
+	"github.com/docker/distribution/digest"
+	"github.com/docker/docker/layer"
+)
+
+func TestBlobSumService(t *testing.T) {
+	tmpDir, err := ioutil.TempDir("", "blobsum-storage-service-test")
+	if err != nil {
+		t.Fatalf("could not create temp dir: %v", err)
+	}
+	defer os.RemoveAll(tmpDir)
+
+	metadataStore, err := NewFSMetadataStore(tmpDir)
+	if err != nil {
+		t.Fatalf("could not create metadata store: %v", err)
+	}
+	blobSumService := NewBlobSumService(metadataStore)
+
+	testVectors := []struct {
+		diffID   layer.DiffID
+		blobsums []digest.Digest
+	}{
+		{
+			diffID: layer.DiffID("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"),
+			blobsums: []digest.Digest{
+				digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
+			},
+		},
+		{
+			diffID: layer.DiffID("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa"),
+			blobsums: []digest.Digest{
+				digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
+				digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
+			},
+		},
+		{
+			diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
+			blobsums: []digest.Digest{
+				digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
+				digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
+				digest.Digest("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"),
+				digest.Digest("sha256:8902a7ca89aabbb868835260912159026637634090dd8899eee969523252236e"),
+				digest.Digest("sha256:c84364306344ccc48532c52ff5209236273525231dddaaab53262322352883aa"),
+				digest.Digest("sha256:aa7583bbc87532a8352bbb72520a821b3623523523a8352523a52352aaa888fe"),
+			},
+		},
+	}
+
+	// Set some associations
+	for _, vec := range testVectors {
+		for _, blobsum := range vec.blobsums {
+			err := blobSumService.Add(vec.diffID, blobsum)
+			if err != nil {
+				t.Fatalf("error calling Set: %v", err)
+			}
+		}
+	}
+
+	// Check the correct values are read back
+	for _, vec := range testVectors {
+		blobsums, err := blobSumService.GetBlobSums(vec.diffID)
+		if err != nil {
+			t.Fatalf("error calling Get: %v", err)
+		}
+		expectedBlobsums := len(vec.blobsums)
+		if expectedBlobsums > 5 {
+			expectedBlobsums = 5
+		}
+		if !reflect.DeepEqual(blobsums, vec.blobsums[len(vec.blobsums)-expectedBlobsums:len(vec.blobsums)]) {
+			t.Fatal("Get returned incorrect layer ID")
+		}
+	}
+
+	// Test GetBlobSums on a nonexistent entry
+	_, err = blobSumService.GetBlobSums(layer.DiffID("sha256:82379823067823853223359023576437723560923756b03560378f4497753917"))
+	if err == nil {
+		t.Fatal("expected error looking up nonexistent entry")
+	}
+
+	// Test GetDiffID on a nonexistent entry
+	_, err = blobSumService.GetDiffID(digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917"))
+	if err == nil {
+		t.Fatal("expected error looking up nonexistent entry")
+	}
+
+	// Overwrite one of the entries and read it back
+	err = blobSumService.Add(testVectors[1].diffID, testVectors[0].blobsums[0])
+	if err != nil {
+		t.Fatalf("error calling Add: %v", err)
+	}
+	diffID, err := blobSumService.GetDiffID(testVectors[0].blobsums[0])
+	if err != nil {
+		t.Fatalf("error calling GetDiffID: %v", err)
+	}
+	if diffID != testVectors[1].diffID {
+		t.Fatal("GetDiffID returned incorrect diffID")
+	}
+}

+ 65 - 0
distribution/metadata/metadata.go

@@ -0,0 +1,65 @@
+package metadata
+
+import (
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sync"
+)
+
+// Store implements a K/V store for mapping distribution-related IDs
+// to on-disk layer IDs and image IDs. The namespace identifies the type of
+// mapping (i.e. "v1ids" or "artifacts"). MetadataStore is goroutine-safe.
+type Store interface {
+	// Get retrieves data by namespace and key.
+	Get(namespace string, key string) ([]byte, error)
+	// Set writes data indexed by namespace and key.
+	Set(namespace, key string, value []byte) error
+}
+
+// FSMetadataStore uses the filesystem to associate metadata with layer and
+// image IDs.
+type FSMetadataStore struct {
+	sync.RWMutex
+	basePath string
+}
+
+// NewFSMetadataStore creates a new filesystem-based metadata store.
+func NewFSMetadataStore(basePath string) (*FSMetadataStore, error) {
+	if err := os.MkdirAll(basePath, 0700); err != nil {
+		return nil, err
+	}
+	return &FSMetadataStore{
+		basePath: basePath,
+	}, nil
+}
+
+func (store *FSMetadataStore) path(namespace, key string) string {
+	return filepath.Join(store.basePath, namespace, key)
+}
+
+// Get retrieves data by namespace and key. The data is read from a file named
+// after the key, stored in the namespace's directory.
+func (store *FSMetadataStore) Get(namespace string, key string) ([]byte, error) {
+	store.RLock()
+	defer store.RUnlock()
+
+	return ioutil.ReadFile(store.path(namespace, key))
+}
+
+// Set writes data indexed by namespace and key. The data is written to a file
+// named after the key, stored in the namespace's directory.
+func (store *FSMetadataStore) Set(namespace, key string, value []byte) error {
+	store.Lock()
+	defer store.Unlock()
+
+	path := store.path(namespace, key)
+	tempFilePath := path + ".tmp"
+	if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
+		return err
+	}
+	if err := ioutil.WriteFile(tempFilePath, value, 0644); err != nil {
+		return err
+	}
+	return os.Rename(tempFilePath, path)
+}

+ 44 - 0
distribution/metadata/v1_id_service.go

@@ -0,0 +1,44 @@
+package metadata
+
+import (
+	"github.com/docker/docker/image/v1"
+	"github.com/docker/docker/layer"
+)
+
+// V1IDService maps v1 IDs to layers on disk.
+type V1IDService struct {
+	store Store
+}
+
+// NewV1IDService creates a new V1 ID mapping service.
+func NewV1IDService(store Store) *V1IDService {
+	return &V1IDService{
+		store: store,
+	}
+}
+
+// namespace returns the namespace used by this service.
+func (idserv *V1IDService) namespace() string {
+	return "v1id"
+}
+
+// Get finds a layer by its V1 ID.
+func (idserv *V1IDService) Get(v1ID, registry string) (layer.ChainID, error) {
+	if err := v1.ValidateID(v1ID); err != nil {
+		return layer.ChainID(""), err
+	}
+
+	idBytes, err := idserv.store.Get(idserv.namespace(), registry+","+v1ID)
+	if err != nil {
+		return layer.ChainID(""), err
+	}
+	return layer.ChainID(idBytes), nil
+}
+
+// Set associates an image with a V1 ID.
+func (idserv *V1IDService) Set(v1ID, registry string, id layer.ChainID) error {
+	if err := v1.ValidateID(v1ID); err != nil {
+		return err
+	}
+	return idserv.store.Set(idserv.namespace(), registry+","+v1ID, []byte(id))
+}

+ 83 - 0
distribution/metadata/v1_id_service_test.go

@@ -0,0 +1,83 @@
+package metadata
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+
+	"github.com/docker/docker/layer"
+)
+
+func TestV1IDService(t *testing.T) {
+	tmpDir, err := ioutil.TempDir("", "v1-id-service-test")
+	if err != nil {
+		t.Fatalf("could not create temp dir: %v", err)
+	}
+	defer os.RemoveAll(tmpDir)
+
+	metadataStore, err := NewFSMetadataStore(tmpDir)
+	if err != nil {
+		t.Fatalf("could not create metadata store: %v", err)
+	}
+	v1IDService := NewV1IDService(metadataStore)
+
+	testVectors := []struct {
+		registry string
+		v1ID     string
+		layerID  layer.ChainID
+	}{
+		{
+			registry: "registry1",
+			v1ID:     "f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937",
+			layerID:  layer.ChainID("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"),
+		},
+		{
+			registry: "registry2",
+			v1ID:     "9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e",
+			layerID:  layer.ChainID("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa"),
+		},
+		{
+			registry: "registry1",
+			v1ID:     "9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e",
+			layerID:  layer.ChainID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
+		},
+	}
+
+	// Set some associations
+	for _, vec := range testVectors {
+		err := v1IDService.Set(vec.v1ID, vec.registry, vec.layerID)
+		if err != nil {
+			t.Fatalf("error calling Set: %v", err)
+		}
+	}
+
+	// Check the correct values are read back
+	for _, vec := range testVectors {
+		layerID, err := v1IDService.Get(vec.v1ID, vec.registry)
+		if err != nil {
+			t.Fatalf("error calling Get: %v", err)
+		}
+		if layerID != vec.layerID {
+			t.Fatal("Get returned incorrect layer ID")
+		}
+	}
+
+	// Test Get on a nonexistent entry
+	_, err = v1IDService.Get("82379823067823853223359023576437723560923756b03560378f4497753917", "registry1")
+	if err == nil {
+		t.Fatal("expected error looking up nonexistent entry")
+	}
+
+	// Overwrite one of the entries and read it back
+	err = v1IDService.Set(testVectors[0].v1ID, testVectors[0].registry, testVectors[1].layerID)
+	if err != nil {
+		t.Fatalf("error calling Set: %v", err)
+	}
+	layerID, err := v1IDService.Get(testVectors[0].v1ID, testVectors[0].registry)
+	if err != nil {
+		t.Fatalf("error calling Get: %v", err)
+	}
+	if layerID != testVectors[1].layerID {
+		t.Fatal("Get returned incorrect layer ID")
+	}
+}

+ 51 - 0
distribution/pool.go

@@ -0,0 +1,51 @@
+package distribution
+
+import (
+	"sync"
+
+	"github.com/docker/docker/pkg/broadcaster"
+)
+
+// A Pool manages concurrent pulls. It deduplicates in-progress downloads.
+type Pool struct {
+	sync.Mutex
+	pullingPool map[string]*broadcaster.Buffered
+}
+
+// NewPool creates a new Pool.
+func NewPool() *Pool {
+	return &Pool{
+		pullingPool: make(map[string]*broadcaster.Buffered),
+	}
+}
+
+// add checks if a pull is already running, and returns (broadcaster, true)
+// if a running operation is found. Otherwise, it creates a new one and returns
+// (broadcaster, false).
+func (pool *Pool) add(key string) (*broadcaster.Buffered, bool) {
+	pool.Lock()
+	defer pool.Unlock()
+
+	if p, exists := pool.pullingPool[key]; exists {
+		return p, true
+	}
+
+	broadcaster := broadcaster.NewBuffered()
+	pool.pullingPool[key] = broadcaster
+
+	return broadcaster, false
+}
+
+func (pool *Pool) removeWithError(key string, broadcasterResult error) error {
+	pool.Lock()
+	defer pool.Unlock()
+	if broadcaster, exists := pool.pullingPool[key]; exists {
+		broadcaster.CloseWithError(broadcasterResult)
+		delete(pool.pullingPool, key)
+	}
+	return nil
+}
+
+func (pool *Pool) remove(key string) error {
+	return pool.removeWithError(key, nil)
+}

+ 28 - 0
distribution/pool_test.go

@@ -0,0 +1,28 @@
+package distribution
+
+import (
+	"testing"
+)
+
+func TestPools(t *testing.T) {
+	p := NewPool()
+
+	if _, found := p.add("test1"); found {
+		t.Fatal("Expected pull test1 not to be in progress")
+	}
+	if _, found := p.add("test2"); found {
+		t.Fatal("Expected pull test2 not to be in progress")
+	}
+	if _, found := p.add("test1"); !found {
+		t.Fatalf("Expected pull test1 to be in progress`")
+	}
+	if err := p.remove("test2"); err != nil {
+		t.Fatal(err)
+	}
+	if err := p.remove("test2"); err != nil {
+		t.Fatal(err)
+	}
+	if err := p.remove("test1"); err != nil {
+		t.Fatal(err)
+	}
+}

+ 185 - 0
distribution/pull.go

@@ -0,0 +1,185 @@
+package distribution
+
+import (
+	"fmt"
+	"io"
+	"strings"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/cliconfig"
+	"github.com/docker/docker/daemon/events"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/tag"
+)
+
+// ImagePullConfig stores pull configuration.
+type ImagePullConfig struct {
+	// MetaHeaders stores HTTP headers with metadata about the image
+	// (DockerHeaders with prefix X-Meta- in the request).
+	MetaHeaders map[string][]string
+	// AuthConfig holds authentication credentials for authenticating with
+	// the registry.
+	AuthConfig *cliconfig.AuthConfig
+	// OutStream is the output writer for showing the status of the pull
+	// operation.
+	OutStream io.Writer
+	// RegistryService is the registry service to use for TLS configuration
+	// and endpoint lookup.
+	RegistryService *registry.Service
+	// EventsService is the events service to use for logging.
+	EventsService *events.Events
+	// MetadataStore is the storage backend for distribution-specific
+	// metadata.
+	MetadataStore metadata.Store
+	// LayerStore manages layers.
+	LayerStore layer.Store
+	// ImageStore manages images.
+	ImageStore image.Store
+	// TagStore manages tags.
+	TagStore tag.Store
+	// Pool manages concurrent pulls.
+	Pool *Pool
+}
+
+// Puller is an interface that abstracts pulling for different API versions.
+type Puller interface {
+	// Pull tries to pull the image referenced by `tag`
+	// Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint.
+	//
+	Pull(ref reference.Named) (fallback bool, err error)
+}
+
+// newPuller returns a Puller interface that will pull from either a v1 or v2
+// registry. The endpoint argument contains a Version field that determines
+// whether a v1 or v2 puller will be created. The other parameters are passed
+// through to the underlying puller implementation for use during the actual
+// pull operation.
+func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, sf *streamformatter.StreamFormatter) (Puller, error) {
+	switch endpoint.Version {
+	case registry.APIVersion2:
+		return &v2Puller{
+			blobSumService: metadata.NewBlobSumService(imagePullConfig.MetadataStore),
+			endpoint:       endpoint,
+			config:         imagePullConfig,
+			sf:             sf,
+			repoInfo:       repoInfo,
+		}, nil
+	case registry.APIVersion1:
+		return &v1Puller{
+			v1IDService: metadata.NewV1IDService(imagePullConfig.MetadataStore),
+			endpoint:    endpoint,
+			config:      imagePullConfig,
+			sf:          sf,
+			repoInfo:    repoInfo,
+		}, nil
+	}
+	return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
+}
+
+// Pull initiates a pull operation. image is the repository name to pull, and
+// tag may be either empty, or indicate a specific tag to pull.
+func Pull(ref reference.Named, imagePullConfig *ImagePullConfig) error {
+	var sf = streamformatter.NewJSONStreamFormatter()
+
+	// Resolve the Repository name from fqn to RepositoryInfo
+	repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
+	if err != nil {
+		return err
+	}
+
+	// makes sure name is not empty or `scratch`
+	if err := validateRepoName(repoInfo.LocalName.Name()); err != nil {
+		return err
+	}
+
+	endpoints, err := imagePullConfig.RegistryService.LookupPullEndpoints(repoInfo.CanonicalName)
+	if err != nil {
+		return err
+	}
+
+	logName := registry.NormalizeLocalReference(ref)
+
+	var (
+		// use a slice to append the error strings and return a joined string to caller
+		errors []string
+
+		// discardNoSupportErrors is used to track whether an endpoint encountered an error of type registry.ErrNoSupport
+		// By default it is false, which means that if a ErrNoSupport error is encountered, it will be saved in errors.
+		// As soon as another kind of error is encountered, discardNoSupportErrors is set to true, avoiding the saving of
+		// any subsequent ErrNoSupport errors in errors.
+		// It's needed for pull-by-digest on v1 endpoints: if there are only v1 endpoints configured, the error should be
+		// returned and displayed, but if there was a v2 endpoint which supports pull-by-digest, then the last relevant
+		// error is the ones from v2 endpoints not v1.
+		discardNoSupportErrors bool
+	)
+	for _, endpoint := range endpoints {
+		logrus.Debugf("Trying to pull %s from %s %s", repoInfo.LocalName, endpoint.URL, endpoint.Version)
+
+		puller, err := newPuller(endpoint, repoInfo, imagePullConfig, sf)
+		if err != nil {
+			errors = append(errors, err.Error())
+			continue
+		}
+		if fallback, err := puller.Pull(ref); err != nil {
+			if fallback {
+				if _, ok := err.(registry.ErrNoSupport); !ok {
+					// Because we found an error that's not ErrNoSupport, discard all subsequent ErrNoSupport errors.
+					discardNoSupportErrors = true
+					// append subsequent errors
+					errors = append(errors, err.Error())
+				} else if !discardNoSupportErrors {
+					// Save the ErrNoSupport error, because it's either the first error or all encountered errors
+					// were also ErrNoSupport errors.
+					// append subsequent errors
+					errors = append(errors, err.Error())
+				}
+				continue
+			}
+			errors = append(errors, err.Error())
+			logrus.Debugf("Not continuing with error: %v", fmt.Errorf(strings.Join(errors, "\n")))
+			if len(errors) > 0 {
+				return fmt.Errorf(strings.Join(errors, "\n"))
+			}
+		}
+
+		imagePullConfig.EventsService.Log("pull", logName.String(), "")
+		return nil
+	}
+
+	if len(errors) == 0 {
+		return fmt.Errorf("no endpoints found for %s", ref.String())
+	}
+
+	if len(errors) > 0 {
+		return fmt.Errorf(strings.Join(errors, "\n"))
+	}
+	return nil
+}
+
+// writeStatus writes a status message to out. If layersDownloaded is true, the
+// status message indicates that a newer image was downloaded. Otherwise, it
+// indicates that the image is up to date. requestedTag is the tag the message
+// will refer to.
+func writeStatus(requestedTag string, out io.Writer, sf *streamformatter.StreamFormatter, layersDownloaded bool) {
+	if layersDownloaded {
+		out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
+	} else {
+		out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
+	}
+}
+
+// validateRepoName validates the name of a repository.
+func validateRepoName(name string) error {
+	if name == "" {
+		return fmt.Errorf("Repository name can't be empty")
+	}
+	if name == "scratch" {
+		return fmt.Errorf("'scratch' is a reserved name")
+	}
+	return nil
+}

+ 454 - 0
distribution/pull_v1.go

@@ -0,0 +1,454 @@
+package distribution
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/distribution/registry/client/transport"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/image/v1"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/archive"
+	"github.com/docker/docker/pkg/progressreader"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/registry"
+)
+
+type v1Puller struct {
+	v1IDService *metadata.V1IDService
+	endpoint    registry.APIEndpoint
+	config      *ImagePullConfig
+	sf          *streamformatter.StreamFormatter
+	repoInfo    *registry.RepositoryInfo
+	session     *registry.Session
+}
+
+func (p *v1Puller) Pull(ref reference.Named) (fallback bool, err error) {
+	if _, isDigested := ref.(reference.Digested); isDigested {
+		// Allowing fallback, because HTTPS v1 is before HTTP v2
+		return true, registry.ErrNoSupport{errors.New("Cannot pull by digest with v1 registry")}
+	}
+
+	tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
+	if err != nil {
+		return false, err
+	}
+	// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
+	tr := transport.NewTransport(
+		// TODO(tiborvass): was ReceiveTimeout
+		registry.NewTransport(tlsConfig),
+		registry.DockerHeaders(p.config.MetaHeaders)...,
+	)
+	client := registry.HTTPClient(tr)
+	v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
+	if err != nil {
+		logrus.Debugf("Could not get v1 endpoint: %v", err)
+		return true, err
+	}
+	p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
+	if err != nil {
+		// TODO(dmcgowan): Check if should fallback
+		logrus.Debugf("Fallback from error: %s", err)
+		return true, err
+	}
+	if err := p.pullRepository(ref); err != nil {
+		// TODO(dmcgowan): Check if should fallback
+		return false, err
+	}
+	out := p.config.OutStream
+	out.Write(p.sf.FormatStatus("", "%s: this image was pulled from a legacy registry.  Important: This registry version will not be supported in future versions of docker.", p.repoInfo.CanonicalName.Name()))
+
+	return false, nil
+}
+
+func (p *v1Puller) pullRepository(ref reference.Named) error {
+	out := p.config.OutStream
+	out.Write(p.sf.FormatStatus("", "Pulling repository %s", p.repoInfo.CanonicalName.Name()))
+
+	repoData, err := p.session.GetRepositoryData(p.repoInfo.RemoteName)
+	if err != nil {
+		if strings.Contains(err.Error(), "HTTP code: 404") {
+			return fmt.Errorf("Error: image %s not found", p.repoInfo.RemoteName.Name())
+		}
+		// Unexpected HTTP error
+		return err
+	}
+
+	logrus.Debugf("Retrieving the tag list")
+	var tagsList map[string]string
+	tagged, isTagged := ref.(reference.Tagged)
+	if !isTagged {
+		tagsList, err = p.session.GetRemoteTags(repoData.Endpoints, p.repoInfo.RemoteName)
+	} else {
+		var tagID string
+		tagsList = make(map[string]string)
+		tagID, err = p.session.GetRemoteTag(repoData.Endpoints, p.repoInfo.RemoteName, tagged.Tag())
+		if err == registry.ErrRepoNotFound {
+			return fmt.Errorf("Tag %s not found in repository %s", tagged.Tag(), p.repoInfo.CanonicalName.Name())
+		}
+		tagsList[tagged.Tag()] = tagID
+	}
+	if err != nil {
+		logrus.Errorf("unable to get remote tags: %s", err)
+		return err
+	}
+
+	for tag, id := range tagsList {
+		repoData.ImgList[id] = &registry.ImgData{
+			ID:       id,
+			Tag:      tag,
+			Checksum: "",
+		}
+	}
+
+	errors := make(chan error)
+	layerDownloaded := make(chan struct{})
+
+	layersDownloaded := false
+	var wg sync.WaitGroup
+	for _, imgData := range repoData.ImgList {
+		if isTagged && imgData.Tag != tagged.Tag() {
+			continue
+		}
+
+		wg.Add(1)
+		go func(img *registry.ImgData) {
+			p.downloadImage(out, repoData, img, layerDownloaded, errors)
+			wg.Done()
+		}(imgData)
+	}
+
+	go func() {
+		wg.Wait()
+		close(errors)
+	}()
+
+	var lastError error
+selectLoop:
+	for {
+		select {
+		case err, ok := <-errors:
+			if !ok {
+				break selectLoop
+			}
+			lastError = err
+		case <-layerDownloaded:
+			layersDownloaded = true
+		}
+	}
+
+	if lastError != nil {
+		return lastError
+	}
+
+	localNameRef := p.repoInfo.LocalName
+	if isTagged {
+		localNameRef, err = reference.WithTag(localNameRef, tagged.Tag())
+		if err != nil {
+			localNameRef = p.repoInfo.LocalName
+		}
+	}
+	writeStatus(localNameRef.String(), out, p.sf, layersDownloaded)
+	return nil
+}
+
+func (p *v1Puller) downloadImage(out io.Writer, repoData *registry.RepositoryData, img *registry.ImgData, layerDownloaded chan struct{}, errors chan error) {
+	if img.Tag == "" {
+		logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
+		return
+	}
+
+	localNameRef, err := reference.WithTag(p.repoInfo.LocalName, img.Tag)
+	if err != nil {
+		retErr := fmt.Errorf("Image (id: %s) has invalid tag: %s", img.ID, img.Tag)
+		logrus.Debug(retErr.Error())
+		errors <- retErr
+	}
+
+	if err := v1.ValidateID(img.ID); err != nil {
+		errors <- err
+		return
+	}
+
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName.Name()), nil))
+	success := false
+	var lastErr error
+	var isDownloaded bool
+	for _, ep := range p.repoInfo.Index.Mirrors {
+		ep += "v1/"
+		out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep), nil))
+		if isDownloaded, err = p.pullImage(out, img.ID, ep, localNameRef); err != nil {
+			// Don't report errors when pulling from mirrors.
+			logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep, err)
+			continue
+		}
+		if isDownloaded {
+			layerDownloaded <- struct{}{}
+		}
+		success = true
+		break
+	}
+	if !success {
+		for _, ep := range repoData.Endpoints {
+			out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep), nil))
+			if isDownloaded, err = p.pullImage(out, img.ID, ep, localNameRef); err != nil {
+				// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
+				// As the error is also given to the output stream the user will see the error.
+				lastErr = err
+				out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep, err), nil))
+				continue
+			}
+			if isDownloaded {
+				layerDownloaded <- struct{}{}
+			}
+			success = true
+			break
+		}
+	}
+	if !success {
+		err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName.Name(), lastErr)
+		out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil))
+		errors <- err
+		return
+	}
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
+}
+
+func (p *v1Puller) pullImage(out io.Writer, v1ID, endpoint string, localNameRef reference.Named) (layersDownloaded bool, err error) {
+	var history []string
+	history, err = p.session.GetRemoteHistory(v1ID, endpoint)
+	if err != nil {
+		return false, err
+	}
+	if len(history) < 1 {
+		return false, fmt.Errorf("empty history for image %s", v1ID)
+	}
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(v1ID), "Pulling dependent layers", nil))
+	// FIXME: Try to stream the images?
+	// FIXME: Launch the getRemoteImage() in goroutines
+
+	var (
+		referencedLayers []layer.Layer
+		parentID         layer.ChainID
+		newHistory       []image.History
+		img              *image.V1Image
+		imgJSON          []byte
+		imgSize          int64
+	)
+
+	defer func() {
+		for _, l := range referencedLayers {
+			layer.ReleaseAndLog(p.config.LayerStore, l)
+		}
+	}()
+
+	layersDownloaded = false
+
+	// Iterate over layers from top-most to bottom-most, checking if any
+	// already exist on disk.
+	var i int
+	for i = 0; i != len(history); i++ {
+		v1LayerID := history[i]
+		// Do we have a mapping for this particular v1 ID on this
+		// registry?
+		if layerID, err := p.v1IDService.Get(v1LayerID, p.repoInfo.Index.Name); err == nil {
+			// Does the layer actually exist
+			if l, err := p.config.LayerStore.Get(layerID); err == nil {
+				for j := i; j >= 0; j-- {
+					logrus.Debugf("Layer already exists: %s", history[j])
+					out.Write(p.sf.FormatProgress(stringid.TruncateID(history[j]), "Already exists", nil))
+				}
+				referencedLayers = append(referencedLayers, l)
+				parentID = layerID
+				break
+			}
+		}
+	}
+
+	needsDownload := i
+
+	// Iterate over layers, in order from bottom-most to top-most. Download
+	// config for all layers, and download actual layer data if needed.
+	for i = len(history) - 1; i >= 0; i-- {
+		v1LayerID := history[i]
+		imgJSON, imgSize, err = p.downloadLayerConfig(out, v1LayerID, endpoint)
+		if err != nil {
+			return layersDownloaded, err
+		}
+
+		img = &image.V1Image{}
+		if err := json.Unmarshal(imgJSON, img); err != nil {
+			return layersDownloaded, err
+		}
+
+		if i < needsDownload {
+			l, err := p.downloadLayer(out, v1LayerID, endpoint, parentID, imgSize, &layersDownloaded)
+
+			// Note: This needs to be done even in the error case to avoid
+			// stale references to the layer.
+			if l != nil {
+				referencedLayers = append(referencedLayers, l)
+			}
+			if err != nil {
+				return layersDownloaded, err
+			}
+
+			parentID = l.ChainID()
+		}
+
+		// Create a new-style config from the legacy configs
+		h, err := v1.HistoryFromConfig(imgJSON, false)
+		if err != nil {
+			return layersDownloaded, err
+		}
+		newHistory = append(newHistory, h)
+	}
+
+	rootFS := image.NewRootFS()
+	l := referencedLayers[len(referencedLayers)-1]
+	for l != nil {
+		rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
+		l = l.Parent()
+	}
+
+	config, err := v1.MakeConfigFromV1Config(imgJSON, rootFS, newHistory)
+	if err != nil {
+		return layersDownloaded, err
+	}
+
+	imageID, err := p.config.ImageStore.Create(config)
+	if err != nil {
+		return layersDownloaded, err
+	}
+
+	if err := p.config.TagStore.Add(localNameRef, imageID, true); err != nil {
+		return layersDownloaded, err
+	}
+
+	return layersDownloaded, nil
+}
+
+func (p *v1Puller) downloadLayerConfig(out io.Writer, v1LayerID, endpoint string) (imgJSON []byte, imgSize int64, err error) {
+	out.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), "Pulling metadata", nil))
+
+	retries := 5
+	for j := 1; j <= retries; j++ {
+		imgJSON, imgSize, err := p.session.GetRemoteImageJSON(v1LayerID, endpoint)
+		if err != nil && j == retries {
+			out.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), "Error pulling layer metadata", nil))
+			return nil, 0, err
+		} else if err != nil {
+			time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+			continue
+		}
+
+		return imgJSON, imgSize, nil
+	}
+
+	// not reached
+	return nil, 0, nil
+}
+
+func (p *v1Puller) downloadLayer(out io.Writer, v1LayerID, endpoint string, parentID layer.ChainID, layerSize int64, layersDownloaded *bool) (l layer.Layer, err error) {
+	// ensure no two downloads of the same layer happen at the same time
+	poolKey := "layer:" + v1LayerID
+	broadcaster, found := p.config.Pool.add(poolKey)
+	broadcaster.Add(out)
+	if found {
+		logrus.Debugf("Image (id: %s) pull is already running, skipping", v1LayerID)
+		if err = broadcaster.Wait(); err != nil {
+			return nil, err
+		}
+		layerID, err := p.v1IDService.Get(v1LayerID, p.repoInfo.Index.Name)
+		if err != nil {
+			return nil, err
+		}
+		// Does the layer actually exist
+		l, err := p.config.LayerStore.Get(layerID)
+		if err != nil {
+			return nil, err
+		}
+		return l, nil
+	}
+
+	// This must use a closure so it captures the value of err when
+	// the function returns, not when the 'defer' is evaluated.
+	defer func() {
+		p.config.Pool.removeWithError(poolKey, err)
+	}()
+
+	retries := 5
+	for j := 1; j <= retries; j++ {
+		// Get the layer
+		status := "Pulling fs layer"
+		if j > 1 {
+			status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
+		}
+		broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), status, nil))
+		layerReader, err := p.session.GetRemoteImageLayer(v1LayerID, endpoint, layerSize)
+		if uerr, ok := err.(*url.Error); ok {
+			err = uerr.Err
+		}
+		if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
+			time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+			continue
+		} else if err != nil {
+			broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), "Error pulling dependent layers", nil))
+			return nil, err
+		}
+		*layersDownloaded = true
+		defer layerReader.Close()
+
+		reader := progressreader.New(progressreader.Config{
+			In:        layerReader,
+			Out:       broadcaster,
+			Formatter: p.sf,
+			Size:      layerSize,
+			NewLines:  false,
+			ID:        stringid.TruncateID(v1LayerID),
+			Action:    "Downloading",
+		})
+
+		inflatedLayerData, err := archive.DecompressStream(reader)
+		if err != nil {
+			return nil, fmt.Errorf("could not get decompression stream: %v", err)
+		}
+
+		l, err := p.config.LayerStore.Register(inflatedLayerData, parentID)
+		if err != nil {
+			return nil, fmt.Errorf("failed to register layer: %v", err)
+		}
+		logrus.Debugf("layer %s registered successfully", l.DiffID())
+
+		if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
+			time.Sleep(time.Duration(j) * 500 * time.Millisecond)
+			continue
+		} else if err != nil {
+			broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), "Error downloading dependent layers", nil))
+			return nil, err
+		}
+
+		// Cache mapping from this v1 ID to content-addressable layer ID
+		if err := p.v1IDService.Set(v1LayerID, p.repoInfo.Index.Name, l.ChainID()); err != nil {
+			return nil, err
+		}
+
+		broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(v1LayerID), "Download complete", nil))
+		broadcaster.Close()
+		return l, nil
+	}
+
+	// not reached
+	return nil, nil
+}

+ 512 - 0
distribution/pull_v2.go

@@ -0,0 +1,512 @@
+package distribution
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"runtime"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/image/v1"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/archive"
+	"github.com/docker/docker/pkg/broadcaster"
+	"github.com/docker/docker/pkg/progressreader"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/registry"
+	"golang.org/x/net/context"
+)
+
+type v2Puller struct {
+	blobSumService *metadata.BlobSumService
+	endpoint       registry.APIEndpoint
+	config         *ImagePullConfig
+	sf             *streamformatter.StreamFormatter
+	repoInfo       *registry.RepositoryInfo
+	repo           distribution.Repository
+	sessionID      string
+}
+
+func (p *v2Puller) Pull(ref reference.Named) (fallback bool, err error) {
+	// TODO(tiborvass): was ReceiveTimeout
+	p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
+	if err != nil {
+		logrus.Debugf("Error getting v2 registry: %v", err)
+		return true, err
+	}
+
+	p.sessionID = stringid.GenerateRandomID()
+
+	if err := p.pullV2Repository(ref); err != nil {
+		if registry.ContinueOnError(err) {
+			logrus.Debugf("Error trying v2 registry: %v", err)
+			return true, err
+		}
+		return false, err
+	}
+	return false, nil
+}
+
+func (p *v2Puller) pullV2Repository(ref reference.Named) (err error) {
+	var refs []reference.Named
+	taggedName := p.repoInfo.LocalName
+	if tagged, isTagged := ref.(reference.Tagged); isTagged {
+		taggedName, err = reference.WithTag(p.repoInfo.LocalName, tagged.Tag())
+		if err != nil {
+			return err
+		}
+		refs = []reference.Named{taggedName}
+	} else if digested, isDigested := ref.(reference.Digested); isDigested {
+		taggedName, err = reference.WithDigest(p.repoInfo.LocalName, digested.Digest())
+		if err != nil {
+			return err
+		}
+		refs = []reference.Named{taggedName}
+	} else {
+		manSvc, err := p.repo.Manifests(context.Background())
+		if err != nil {
+			return err
+		}
+
+		tags, err := manSvc.Tags()
+		if err != nil {
+			return err
+		}
+
+		// This probably becomes a lot nicer after the manifest
+		// refactor...
+		for _, tag := range tags {
+			tagRef, err := reference.WithTag(p.repoInfo.LocalName, tag)
+			if err != nil {
+				return err
+			}
+			refs = append(refs, tagRef)
+		}
+	}
+
+	var layersDownloaded bool
+	for _, pullRef := range refs {
+		// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
+		// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
+		pulledNew, err := p.pullV2Tag(p.config.OutStream, pullRef)
+		if err != nil {
+			return err
+		}
+		layersDownloaded = layersDownloaded || pulledNew
+	}
+
+	writeStatus(taggedName.String(), p.config.OutStream, p.sf, layersDownloaded)
+
+	return nil
+}
+
+// downloadInfo is used to pass information from download to extractor
+type downloadInfo struct {
+	tmpFile     *os.File
+	digest      digest.Digest
+	layer       distribution.ReadSeekCloser
+	size        int64
+	err         chan error
+	poolKey     string
+	broadcaster *broadcaster.Buffered
+}
+
+type errVerification struct{}
+
+func (errVerification) Error() string { return "verification failed" }
+
+func (p *v2Puller) download(di *downloadInfo) {
+	logrus.Debugf("pulling blob %q", di.digest)
+
+	blobs := p.repo.Blobs(context.Background())
+
+	desc, err := blobs.Stat(context.Background(), di.digest)
+	if err != nil {
+		logrus.Debugf("Error statting layer: %v", err)
+		di.err <- err
+		return
+	}
+	di.size = desc.Size
+
+	layerDownload, err := blobs.Open(context.Background(), di.digest)
+	if err != nil {
+		logrus.Debugf("Error fetching layer: %v", err)
+		di.err <- err
+		return
+	}
+	defer layerDownload.Close()
+
+	verifier, err := digest.NewDigestVerifier(di.digest)
+	if err != nil {
+		di.err <- err
+		return
+	}
+
+	digestStr := di.digest.String()
+
+	reader := progressreader.New(progressreader.Config{
+		In:        ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
+		Out:       di.broadcaster,
+		Formatter: p.sf,
+		Size:      di.size,
+		NewLines:  false,
+		ID:        stringid.TruncateID(digestStr),
+		Action:    "Downloading",
+	})
+	io.Copy(di.tmpFile, reader)
+
+	di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(digestStr), "Verifying Checksum", nil))
+
+	if !verifier.Verified() {
+		err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
+		logrus.Error(err)
+		di.err <- err
+		return
+	}
+
+	di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(digestStr), "Download complete", nil))
+
+	logrus.Debugf("Downloaded %s to tempfile %s", digestStr, di.tmpFile.Name())
+	di.layer = layerDownload
+
+	di.err <- nil
+}
+
+func (p *v2Puller) pullV2Tag(out io.Writer, ref reference.Named) (tagUpdated bool, err error) {
+	tagOrDigest := ""
+	if tagged, isTagged := ref.(reference.Tagged); isTagged {
+		tagOrDigest = tagged.Tag()
+	} else if digested, isDigested := ref.(reference.Digested); isDigested {
+		tagOrDigest = digested.Digest().String()
+	} else {
+		return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", ref.String())
+	}
+
+	logrus.Debugf("Pulling ref from V2 registry: %q", tagOrDigest)
+
+	manSvc, err := p.repo.Manifests(context.Background())
+	if err != nil {
+		return false, err
+	}
+
+	unverifiedManifest, err := manSvc.GetByTag(tagOrDigest)
+	if err != nil {
+		return false, err
+	}
+	if unverifiedManifest == nil {
+		return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
+	}
+	var verifiedManifest *schema1.Manifest
+	verifiedManifest, err = verifyManifest(unverifiedManifest, ref)
+	if err != nil {
+		return false, err
+	}
+
+	rootFS := image.NewRootFS()
+
+	if err := detectBaseLayer(p.config.ImageStore, verifiedManifest, rootFS); err != nil {
+		return false, err
+	}
+
+	// remove duplicate layers and check parent chain validity
+	err = fixManifestLayers(verifiedManifest)
+	if err != nil {
+		return false, err
+	}
+
+	out.Write(p.sf.FormatStatus(tagOrDigest, "Pulling from %s", p.repo.Name()))
+
+	var downloads []*downloadInfo
+
+	defer func() {
+		for _, d := range downloads {
+			p.config.Pool.removeWithError(d.poolKey, err)
+			if d.tmpFile != nil {
+				d.tmpFile.Close()
+				if err := os.RemoveAll(d.tmpFile.Name()); err != nil {
+					logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name())
+				}
+			}
+		}
+	}()
+
+	// Image history converted to the new format
+	var history []image.History
+
+	poolKey := "v2layer:"
+	notFoundLocally := false
+
+	// Note that the order of this loop is in the direction of bottom-most
+	// to top-most, so that the downloads slice gets ordered correctly.
+	for i := len(verifiedManifest.FSLayers) - 1; i >= 0; i-- {
+		blobSum := verifiedManifest.FSLayers[i].BlobSum
+		poolKey += blobSum.String()
+
+		var throwAway struct {
+			ThrowAway bool `json:"throwaway,omitempty"`
+		}
+		if err := json.Unmarshal([]byte(verifiedManifest.History[i].V1Compatibility), &throwAway); err != nil {
+			return false, err
+		}
+
+		h, err := v1.HistoryFromConfig([]byte(verifiedManifest.History[i].V1Compatibility), throwAway.ThrowAway)
+		if err != nil {
+			return false, err
+		}
+		history = append(history, h)
+
+		if throwAway.ThrowAway {
+			continue
+		}
+
+		// Do we have a layer on disk corresponding to the set of
+		// blobsums up to this point?
+		if !notFoundLocally {
+			notFoundLocally = true
+			diffID, err := p.blobSumService.GetDiffID(blobSum)
+			if err == nil {
+				rootFS.Append(diffID)
+				if l, err := p.config.LayerStore.Get(rootFS.ChainID()); err == nil {
+					notFoundLocally = false
+					logrus.Debugf("Layer already exists: %s", blobSum.String())
+					out.Write(p.sf.FormatProgress(stringid.TruncateID(blobSum.String()), "Already exists", nil))
+					defer layer.ReleaseAndLog(p.config.LayerStore, l)
+					continue
+				} else {
+					rootFS.DiffIDs = rootFS.DiffIDs[:len(rootFS.DiffIDs)-1]
+				}
+			}
+		}
+
+		out.Write(p.sf.FormatProgress(stringid.TruncateID(blobSum.String()), "Pulling fs layer", nil))
+
+		tmpFile, err := ioutil.TempFile("", "GetImageBlob")
+		if err != nil {
+			return false, err
+		}
+
+		d := &downloadInfo{
+			poolKey: poolKey,
+			digest:  blobSum,
+			tmpFile: tmpFile,
+			// TODO: seems like this chan buffer solved hanging problem in go1.5,
+			// this can indicate some deeper problem that somehow we never take
+			// error from channel in loop below
+			err: make(chan error, 1),
+		}
+
+		downloads = append(downloads, d)
+
+		broadcaster, found := p.config.Pool.add(d.poolKey)
+		broadcaster.Add(out)
+		d.broadcaster = broadcaster
+		if found {
+			d.err <- nil
+		} else {
+			go p.download(d)
+		}
+	}
+
+	for _, d := range downloads {
+		if err := <-d.err; err != nil {
+			return false, err
+		}
+
+		if d.layer == nil {
+			// Wait for a different pull to download and extract
+			// this layer.
+			err = d.broadcaster.Wait()
+			if err != nil {
+				return false, err
+			}
+
+			diffID, err := p.blobSumService.GetDiffID(d.digest)
+			if err != nil {
+				return false, err
+			}
+			rootFS.Append(diffID)
+
+			l, err := p.config.LayerStore.Get(rootFS.ChainID())
+			if err != nil {
+				return false, err
+			}
+
+			defer layer.ReleaseAndLog(p.config.LayerStore, l)
+
+			continue
+		}
+
+		d.tmpFile.Seek(0, 0)
+		reader := progressreader.New(progressreader.Config{
+			In:        d.tmpFile,
+			Out:       d.broadcaster,
+			Formatter: p.sf,
+			Size:      d.size,
+			NewLines:  false,
+			ID:        stringid.TruncateID(d.digest.String()),
+			Action:    "Extracting",
+		})
+
+		inflatedLayerData, err := archive.DecompressStream(reader)
+		if err != nil {
+			return false, fmt.Errorf("could not get decompression stream: %v", err)
+		}
+
+		l, err := p.config.LayerStore.Register(inflatedLayerData, rootFS.ChainID())
+		if err != nil {
+			return false, fmt.Errorf("failed to register layer: %v", err)
+		}
+		logrus.Debugf("layer %s registered successfully", l.DiffID())
+		rootFS.Append(l.DiffID())
+
+		// Cache mapping from this layer's DiffID to the blobsum
+		if err := p.blobSumService.Add(l.DiffID(), d.digest); err != nil {
+			return false, err
+		}
+
+		defer layer.ReleaseAndLog(p.config.LayerStore, l)
+
+		d.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(d.digest.String()), "Pull complete", nil))
+		d.broadcaster.Close()
+		tagUpdated = true
+	}
+
+	config, err := v1.MakeConfigFromV1Config([]byte(verifiedManifest.History[0].V1Compatibility), rootFS, history)
+	if err != nil {
+		return false, err
+	}
+
+	imageID, err := p.config.ImageStore.Create(config)
+	if err != nil {
+		return false, err
+	}
+
+	manifestDigest, _, err := digestFromManifest(unverifiedManifest, p.repoInfo.LocalName.Name())
+	if err != nil {
+		return false, err
+	}
+
+	// Check for new tag if no layers downloaded
+	var oldTagImageID image.ID
+	if !tagUpdated {
+		oldTagImageID, err = p.config.TagStore.Get(ref)
+		if err != nil || oldTagImageID != imageID {
+			tagUpdated = true
+		}
+	}
+
+	if tagUpdated {
+		if err = p.config.TagStore.Add(ref, imageID, true); err != nil {
+			return false, err
+		}
+	}
+
+	if manifestDigest != "" {
+		out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
+	}
+
+	return tagUpdated, nil
+}
+
+func verifyManifest(signedManifest *schema1.SignedManifest, ref reference.Reference) (m *schema1.Manifest, err error) {
+	// If pull by digest, then verify the manifest digest. NOTE: It is
+	// important to do this first, before any other content validation. If the
+	// digest cannot be verified, don't even bother with those other things.
+	if digested, isDigested := ref.(reference.Digested); isDigested {
+		verifier, err := digest.NewDigestVerifier(digested.Digest())
+		if err != nil {
+			return nil, err
+		}
+		payload, err := signedManifest.Payload()
+		if err != nil {
+			// If this failed, the signatures section was corrupted
+			// or missing. Treat the entire manifest as the payload.
+			payload = signedManifest.Raw
+		}
+		if _, err := verifier.Write(payload); err != nil {
+			return nil, err
+		}
+		if !verifier.Verified() {
+			err := fmt.Errorf("image verification failed for digest %s", digested.Digest())
+			logrus.Error(err)
+			return nil, err
+		}
+
+		var verifiedManifest schema1.Manifest
+		if err = json.Unmarshal(payload, &verifiedManifest); err != nil {
+			return nil, err
+		}
+		m = &verifiedManifest
+	} else {
+		m = &signedManifest.Manifest
+	}
+
+	if m.SchemaVersion != 1 {
+		return nil, fmt.Errorf("unsupported schema version %d for %q", m.SchemaVersion, ref.String())
+	}
+	if len(m.FSLayers) != len(m.History) {
+		return nil, fmt.Errorf("length of history not equal to number of layers for %q", ref.String())
+	}
+	if len(m.FSLayers) == 0 {
+		return nil, fmt.Errorf("no FSLayers in manifest for %q", ref.String())
+	}
+	return m, nil
+}
+
+// fixManifestLayers removes repeated layers from the manifest and checks the
+// correctness of the parent chain.
+func fixManifestLayers(m *schema1.Manifest) error {
+	imgs := make([]*image.V1Image, len(m.FSLayers))
+	for i := range m.FSLayers {
+		img := &image.V1Image{}
+
+		if err := json.Unmarshal([]byte(m.History[i].V1Compatibility), img); err != nil {
+			return err
+		}
+
+		imgs[i] = img
+		if err := v1.ValidateID(img.ID); err != nil {
+			return err
+		}
+	}
+
+	if imgs[len(imgs)-1].Parent != "" && runtime.GOOS != "windows" {
+		// Windows base layer can point to a base layer parent that is not in manifest.
+		return errors.New("Invalid parent ID in the base layer of the image.")
+	}
+
+	// check general duplicates to error instead of a deadlock
+	idmap := make(map[string]struct{})
+
+	var lastID string
+	for _, img := range imgs {
+		// skip IDs that appear after each other, we handle those later
+		if _, exists := idmap[img.ID]; img.ID != lastID && exists {
+			return fmt.Errorf("ID %+v appears multiple times in manifest", img.ID)
+		}
+		lastID = img.ID
+		idmap[lastID] = struct{}{}
+	}
+
+	// backwards loop so that we keep the remaining indexes after removing items
+	for i := len(imgs) - 2; i >= 0; i-- {
+		if imgs[i].ID == imgs[i+1].ID { // repeated ID. remove and continue
+			m.FSLayers = append(m.FSLayers[:i], m.FSLayers[i+1:]...)
+			m.History = append(m.History[:i], m.History[i+1:]...)
+		} else if imgs[i].Parent != imgs[i+1].ID {
+			return fmt.Errorf("Invalid parent ID. Expected %v, got %v.", imgs[i+1].ID, imgs[i].Parent)
+		}
+	}
+
+	return nil
+}

File diff suppressed because it is too large
+ 27 - 0
distribution/pull_v2_test.go


+ 12 - 0
distribution/pull_v2_unix.go

@@ -0,0 +1,12 @@
+// +build !windows
+
+package distribution
+
+import (
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/docker/image"
+)
+
+func detectBaseLayer(is image.Store, m *schema1.Manifest, rootFS *image.RootFS) error {
+	return nil
+}

+ 29 - 0
distribution/pull_v2_windows.go

@@ -0,0 +1,29 @@
+// +build windows
+
+package distribution
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/docker/image"
+)
+
+func detectBaseLayer(is image.Store, m *schema1.Manifest, rootFS *image.RootFS) error {
+	v1img := &image.V1Image{}
+	if err := json.Unmarshal([]byte(m.History[len(m.History)-1].V1Compatibility), v1img); err != nil {
+		return err
+	}
+	if v1img.Parent == "" {
+		return fmt.Errorf("Last layer %q does not have a base layer reference", v1img.ID)
+	}
+	// There must be an image that already references the baselayer.
+	for _, img := range is.Map() {
+		if img.RootFS.BaseLayerID() == v1img.Parent {
+			rootFS.BaseLayer = img.RootFS.BaseLayer
+			return nil
+		}
+	}
+	return fmt.Errorf("Invalid base layer %q", v1img.Parent)
+}

+ 179 - 0
distribution/push.go

@@ -0,0 +1,179 @@
+package distribution
+
+import (
+	"bufio"
+	"compress/gzip"
+	"fmt"
+	"io"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/cliconfig"
+	"github.com/docker/docker/daemon/events"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/tag"
+	"github.com/docker/libtrust"
+)
+
+// ImagePushConfig stores push configuration.
+type ImagePushConfig struct {
+	// MetaHeaders store HTTP headers with metadata about the image
+	// (DockerHeaders with prefix X-Meta- in the request).
+	MetaHeaders map[string][]string
+	// AuthConfig holds authentication credentials for authenticating with
+	// the registry.
+	AuthConfig *cliconfig.AuthConfig
+	// OutStream is the output writer for showing the status of the push
+	// operation.
+	OutStream io.Writer
+	// RegistryService is the registry service to use for TLS configuration
+	// and endpoint lookup.
+	RegistryService *registry.Service
+	// EventsService is the events service to use for logging.
+	EventsService *events.Events
+	// MetadataStore is the storage backend for distribution-specific
+	// metadata.
+	MetadataStore metadata.Store
+	// LayerStore manges layers.
+	LayerStore layer.Store
+	// ImageStore manages images.
+	ImageStore image.Store
+	// TagStore manages tags.
+	TagStore tag.Store
+	// TrustKey is the private key for legacy signatures. This is typically
+	// an ephemeral key, since these signatures are no longer verified.
+	TrustKey libtrust.PrivateKey
+}
+
+// Pusher is an interface that abstracts pushing for different API versions.
+type Pusher interface {
+	// Push tries to push the image configured at the creation of Pusher.
+	// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
+	//
+	// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
+	Push() (fallback bool, err error)
+}
+
+const compressionBufSize = 32768
+
+// NewPusher creates a new Pusher interface that will push to either a v1 or v2
+// registry. The endpoint argument contains a Version field that determines
+// whether a v1 or v2 pusher will be created. The other parameters are passed
+// through to the underlying pusher implementation for use during the actual
+// push operation.
+func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig, sf *streamformatter.StreamFormatter) (Pusher, error) {
+	switch endpoint.Version {
+	case registry.APIVersion2:
+		return &v2Pusher{
+			blobSumService: metadata.NewBlobSumService(imagePushConfig.MetadataStore),
+			ref:            ref,
+			endpoint:       endpoint,
+			repoInfo:       repoInfo,
+			config:         imagePushConfig,
+			sf:             sf,
+			layersPushed:   make(map[digest.Digest]bool),
+		}, nil
+	case registry.APIVersion1:
+		return &v1Pusher{
+			v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
+			ref:         ref,
+			endpoint:    endpoint,
+			repoInfo:    repoInfo,
+			config:      imagePushConfig,
+			sf:          sf,
+		}, nil
+	}
+	return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
+}
+
+// Push initiates a push operation on the repository named localName.
+// ref is the specific variant of the image to be pushed.
+// If no tag is provided, all tags will be pushed.
+func Push(ref reference.Named, imagePushConfig *ImagePushConfig) error {
+	// FIXME: Allow to interrupt current push when new push of same image is done.
+
+	var sf = streamformatter.NewJSONStreamFormatter()
+
+	// Resolve the Repository name from fqn to RepositoryInfo
+	repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
+	if err != nil {
+		return err
+	}
+
+	endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(repoInfo.CanonicalName)
+	if err != nil {
+		return err
+	}
+
+	imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s]", repoInfo.CanonicalName))
+
+	associations := imagePushConfig.TagStore.ReferencesByName(repoInfo.LocalName)
+	if len(associations) == 0 {
+		return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
+	}
+
+	var lastErr error
+	for _, endpoint := range endpoints {
+		logrus.Debugf("Trying to push %s to %s %s", repoInfo.CanonicalName, endpoint.URL, endpoint.Version)
+
+		pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig, sf)
+		if err != nil {
+			lastErr = err
+			continue
+		}
+		if fallback, err := pusher.Push(); err != nil {
+			if fallback {
+				lastErr = err
+				continue
+			}
+			logrus.Debugf("Not continuing with error: %v", err)
+			return err
+
+		}
+
+		imagePushConfig.EventsService.Log("push", repoInfo.LocalName.Name(), "")
+		return nil
+	}
+
+	if lastErr == nil {
+		lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.CanonicalName)
+	}
+	return lastErr
+}
+
+// compress returns an io.ReadCloser which will supply a compressed version of
+// the provided Reader. The caller must close the ReadCloser after reading the
+// compressed data.
+//
+// Note that this function returns a reader instead of taking a writer as an
+// argument so that it can be used with httpBlobWriter's ReadFrom method.
+// Using httpBlobWriter's Write method would send a PATCH request for every
+// Write call.
+func compress(in io.Reader) io.ReadCloser {
+	pipeReader, pipeWriter := io.Pipe()
+	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
+	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
+	compressor := gzip.NewWriter(bufWriter)
+
+	go func() {
+		_, err := io.Copy(compressor, in)
+		if err == nil {
+			err = compressor.Close()
+		}
+		if err == nil {
+			err = bufWriter.Flush()
+		}
+		if err != nil {
+			pipeWriter.CloseWithError(err)
+		} else {
+			pipeWriter.Close()
+		}
+	}()
+
+	return pipeReader
+}

+ 466 - 0
distribution/push_v1.go

@@ -0,0 +1,466 @@
+package distribution
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"sync"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/distribution/registry/client/transport"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/image/v1"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/progressreader"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/registry"
+)
+
+type v1Pusher struct {
+	v1IDService *metadata.V1IDService
+	endpoint    registry.APIEndpoint
+	ref         reference.Named
+	repoInfo    *registry.RepositoryInfo
+	config      *ImagePushConfig
+	sf          *streamformatter.StreamFormatter
+	session     *registry.Session
+
+	out io.Writer
+}
+
+func (p *v1Pusher) Push() (fallback bool, err error) {
+	tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
+	if err != nil {
+		return false, err
+	}
+	// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
+	tr := transport.NewTransport(
+		// TODO(tiborvass): was NoTimeout
+		registry.NewTransport(tlsConfig),
+		registry.DockerHeaders(p.config.MetaHeaders)...,
+	)
+	client := registry.HTTPClient(tr)
+	v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
+	if err != nil {
+		logrus.Debugf("Could not get v1 endpoint: %v", err)
+		return true, err
+	}
+	p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
+	if err != nil {
+		// TODO(dmcgowan): Check if should fallback
+		return true, err
+	}
+	if err := p.pushRepository(); err != nil {
+		// TODO(dmcgowan): Check if should fallback
+		return false, err
+	}
+	return false, nil
+}
+
+// v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
+// image being pushed to a v1 registry.
+type v1Image interface {
+	Config() []byte
+	Layer() layer.Layer
+	V1ID() string
+}
+
+type v1ImageCommon struct {
+	layer  layer.Layer
+	config []byte
+	v1ID   string
+}
+
+func (common *v1ImageCommon) Config() []byte {
+	return common.config
+}
+
+func (common *v1ImageCommon) V1ID() string {
+	return common.v1ID
+}
+
+func (common *v1ImageCommon) Layer() layer.Layer {
+	return common.layer
+}
+
+// v1TopImage defines a runnable (top layer) image being pushed to a v1
+// registry.
+type v1TopImage struct {
+	v1ImageCommon
+	imageID image.ID
+}
+
+func newV1TopImage(imageID image.ID, img *image.Image, l layer.Layer, parent *v1DependencyImage) (*v1TopImage, error) {
+	v1ID := digest.Digest(imageID).Hex()
+	parentV1ID := ""
+	if parent != nil {
+		parentV1ID = parent.V1ID()
+	}
+
+	config, err := v1.MakeV1ConfigFromConfig(img, v1ID, parentV1ID, false)
+	if err != nil {
+		return nil, err
+	}
+
+	return &v1TopImage{
+		v1ImageCommon: v1ImageCommon{
+			v1ID:   v1ID,
+			config: config,
+			layer:  l,
+		},
+		imageID: imageID,
+	}, nil
+}
+
+// v1DependencyImage defines a dependency layer being pushed to a v1 registry.
+type v1DependencyImage struct {
+	v1ImageCommon
+}
+
+func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) (*v1DependencyImage, error) {
+	v1ID := digest.Digest(l.ChainID()).Hex()
+
+	config := ""
+	if parent != nil {
+		config = fmt.Sprintf(`{"id":"%s","parent":"%s"}`, v1ID, parent.V1ID())
+	} else {
+		config = fmt.Sprintf(`{"id":"%s"}`, v1ID)
+	}
+	return &v1DependencyImage{
+		v1ImageCommon: v1ImageCommon{
+			v1ID:   v1ID,
+			config: []byte(config),
+			layer:  l,
+		},
+	}, nil
+}
+
+// Retrieve the all the images to be uploaded in the correct order
+func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []layer.Layer, err error) {
+	tagsByImage = make(map[image.ID][]string)
+
+	// Ignore digest references
+	_, isDigested := p.ref.(reference.Digested)
+	if isDigested {
+		return
+	}
+
+	tagged, isTagged := p.ref.(reference.Tagged)
+	if isTagged {
+		// Push a specific tag
+		var imgID image.ID
+		imgID, err = p.config.TagStore.Get(p.ref)
+		if err != nil {
+			return
+		}
+
+		imageList, err = p.imageListForTag(imgID, nil, &referencedLayers)
+		if err != nil {
+			return
+		}
+
+		tagsByImage[imgID] = []string{tagged.Tag()}
+
+		return
+	}
+
+	imagesSeen := make(map[image.ID]struct{})
+	dependenciesSeen := make(map[layer.ChainID]*v1DependencyImage)
+
+	associations := p.config.TagStore.ReferencesByName(p.ref)
+	for _, association := range associations {
+		if tagged, isTagged = association.Ref.(reference.Tagged); !isTagged {
+			// Ignore digest references.
+			continue
+		}
+
+		tagsByImage[association.ImageID] = append(tagsByImage[association.ImageID], tagged.Tag())
+
+		if _, present := imagesSeen[association.ImageID]; present {
+			// Skip generating image list for already-seen image
+			continue
+		}
+		imagesSeen[association.ImageID] = struct{}{}
+
+		imageListForThisTag, err := p.imageListForTag(association.ImageID, dependenciesSeen, &referencedLayers)
+		if err != nil {
+			return nil, nil, nil, err
+		}
+
+		// append to main image list
+		imageList = append(imageList, imageListForThisTag...)
+	}
+	if len(imageList) == 0 {
+		return nil, nil, nil, fmt.Errorf("No images found for the requested repository / tag")
+	}
+	logrus.Debugf("Image list: %v", imageList)
+	logrus.Debugf("Tags by image: %v", tagsByImage)
+
+	return
+}
+
+func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]layer.Layer) (imageListForThisTag []v1Image, err error) {
+	img, err := p.config.ImageStore.Get(imgID)
+	if err != nil {
+		return nil, err
+	}
+
+	topLayerID := img.RootFS.ChainID()
+
+	var l layer.Layer
+	if topLayerID == "" {
+		l = layer.EmptyLayer
+	} else {
+		l, err = p.config.LayerStore.Get(topLayerID)
+		*referencedLayers = append(*referencedLayers, l)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get top layer from image: %v", err)
+		}
+	}
+
+	dependencyImages, parent, err := generateDependencyImages(l.Parent(), dependenciesSeen)
+	if err != nil {
+		return nil, err
+	}
+
+	topImage, err := newV1TopImage(imgID, img, l, parent)
+	if err != nil {
+		return nil, err
+	}
+
+	imageListForThisTag = append(dependencyImages, topImage)
+
+	return
+}
+
+func generateDependencyImages(l layer.Layer, dependenciesSeen map[layer.ChainID]*v1DependencyImage) (imageListForThisTag []v1Image, parent *v1DependencyImage, err error) {
+	if l == nil {
+		return nil, nil, nil
+	}
+
+	imageListForThisTag, parent, err = generateDependencyImages(l.Parent(), dependenciesSeen)
+
+	if dependenciesSeen != nil {
+		if dependencyImage, present := dependenciesSeen[l.ChainID()]; present {
+			// This layer is already on the list, we can ignore it
+			// and all its parents.
+			return imageListForThisTag, dependencyImage, nil
+		}
+	}
+
+	dependencyImage, err := newV1DependencyImage(l, parent)
+	if err != nil {
+		return nil, nil, err
+	}
+	imageListForThisTag = append(imageListForThisTag, dependencyImage)
+
+	if dependenciesSeen != nil {
+		dependenciesSeen[l.ChainID()] = dependencyImage
+	}
+
+	return imageListForThisTag, dependencyImage, nil
+}
+
+// createImageIndex returns an index of an image's layer IDs and tags.
+func createImageIndex(images []v1Image, tags map[image.ID][]string) []*registry.ImgData {
+	var imageIndex []*registry.ImgData
+	for _, img := range images {
+		v1ID := img.V1ID()
+
+		if topImage, isTopImage := img.(*v1TopImage); isTopImage {
+			if tags, hasTags := tags[topImage.imageID]; hasTags {
+				// If an image has tags you must add an entry in the image index
+				// for each tag
+				for _, tag := range tags {
+					imageIndex = append(imageIndex, &registry.ImgData{
+						ID:  v1ID,
+						Tag: tag,
+					})
+				}
+				continue
+			}
+		}
+
+		// If the image does not have a tag it still needs to be sent to the
+		// registry with an empty tag so that it is associated with the repository
+		imageIndex = append(imageIndex, &registry.ImgData{
+			ID:  v1ID,
+			Tag: "",
+		})
+	}
+	return imageIndex
+}
+
+// lookupImageOnEndpoint checks the specified endpoint to see if an image exists
+// and if it is absent then it sends the image id to the channel to be pushed.
+func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, endpoint string, images chan v1Image, imagesToPush chan string) {
+	defer wg.Done()
+	for image := range images {
+		v1ID := image.V1ID()
+		if err := p.session.LookupRemoteImage(v1ID, endpoint); err != nil {
+			logrus.Errorf("Error in LookupRemoteImage: %s", err)
+			imagesToPush <- v1ID
+		} else {
+			p.out.Write(p.sf.FormatStatus("", "Image %s already pushed, skipping", stringid.TruncateID(v1ID)))
+		}
+	}
+}
+
+func (p *v1Pusher) pushImageToEndpoint(endpoint string, imageList []v1Image, tags map[image.ID][]string, repo *registry.RepositoryData) error {
+	workerCount := len(imageList)
+	// start a maximum of 5 workers to check if images exist on the specified endpoint.
+	if workerCount > 5 {
+		workerCount = 5
+	}
+	var (
+		wg           = &sync.WaitGroup{}
+		imageData    = make(chan v1Image, workerCount*2)
+		imagesToPush = make(chan string, workerCount*2)
+		pushes       = make(chan map[string]struct{}, 1)
+	)
+	for i := 0; i < workerCount; i++ {
+		wg.Add(1)
+		go p.lookupImageOnEndpoint(wg, endpoint, imageData, imagesToPush)
+	}
+	// start a go routine that consumes the images to push
+	go func() {
+		shouldPush := make(map[string]struct{})
+		for id := range imagesToPush {
+			shouldPush[id] = struct{}{}
+		}
+		pushes <- shouldPush
+	}()
+	for _, v1Image := range imageList {
+		imageData <- v1Image
+	}
+	// close the channel to notify the workers that there will be no more images to check.
+	close(imageData)
+	wg.Wait()
+	close(imagesToPush)
+	// wait for all the images that require pushes to be collected into a consumable map.
+	shouldPush := <-pushes
+	// finish by pushing any images and tags to the endpoint.  The order that the images are pushed
+	// is very important that is why we are still iterating over the ordered list of imageIDs.
+	for _, img := range imageList {
+		v1ID := img.V1ID()
+		if _, push := shouldPush[v1ID]; push {
+			if _, err := p.pushImage(img, endpoint); err != nil {
+				// FIXME: Continue on error?
+				return err
+			}
+		}
+		if topImage, isTopImage := img.(*v1TopImage); isTopImage {
+			for _, tag := range tags[topImage.imageID] {
+				p.out.Write(p.sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(v1ID), endpoint+"repositories/"+p.repoInfo.RemoteName.Name()+"/tags/"+tag))
+				if err := p.session.PushRegistryTag(p.repoInfo.RemoteName, v1ID, tag, endpoint); err != nil {
+					return err
+				}
+			}
+		}
+	}
+	return nil
+}
+
+// pushRepository pushes layers that do not already exist on the registry.
+func (p *v1Pusher) pushRepository() error {
+	p.out = ioutils.NewWriteFlusher(p.config.OutStream)
+	imgList, tags, referencedLayers, err := p.getImageList()
+	defer func() {
+		for _, l := range referencedLayers {
+			p.config.LayerStore.Release(l)
+		}
+	}()
+	if err != nil {
+		return err
+	}
+	p.out.Write(p.sf.FormatStatus("", "Sending image list"))
+
+	imageIndex := createImageIndex(imgList, tags)
+	for _, data := range imageIndex {
+		logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
+	}
+
+	// Register all the images in a repository with the registry
+	// If an image is not in this list it will not be associated with the repository
+	repoData, err := p.session.PushImageJSONIndex(p.repoInfo.RemoteName, imageIndex, false, nil)
+	if err != nil {
+		return err
+	}
+	p.out.Write(p.sf.FormatStatus("", "Pushing repository %s", p.repoInfo.CanonicalName))
+	// push the repository to each of the endpoints only if it does not exist.
+	for _, endpoint := range repoData.Endpoints {
+		if err := p.pushImageToEndpoint(endpoint, imgList, tags, repoData); err != nil {
+			return err
+		}
+	}
+	_, err = p.session.PushImageJSONIndex(p.repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
+	return err
+}
+
+func (p *v1Pusher) pushImage(v1Image v1Image, ep string) (checksum string, err error) {
+	v1ID := v1Image.V1ID()
+
+	jsonRaw := v1Image.Config()
+	p.out.Write(p.sf.FormatProgress(stringid.TruncateID(v1ID), "Pushing", nil))
+
+	// General rule is to use ID for graph accesses and compatibilityID for
+	// calls to session.registry()
+	imgData := &registry.ImgData{
+		ID: v1ID,
+	}
+
+	// Send the json
+	if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
+		if err == registry.ErrAlreadyExists {
+			p.out.Write(p.sf.FormatProgress(stringid.TruncateID(v1ID), "Image already pushed, skipping", nil))
+			return "", nil
+		}
+		return "", err
+	}
+
+	l := v1Image.Layer()
+
+	arch, err := l.TarStream()
+	if err != nil {
+		return "", err
+	}
+
+	// don't care if this fails; best effort
+	size, _ := l.Size()
+
+	// Send the layer
+	logrus.Debugf("rendered layer for %s of [%d] size", v1ID, size)
+
+	reader := progressreader.New(progressreader.Config{
+		In:        ioutil.NopCloser(arch),
+		Out:       p.out,
+		Formatter: p.sf,
+		Size:      size,
+		NewLines:  false,
+		ID:        stringid.TruncateID(v1ID),
+		Action:    "Pushing",
+	})
+
+	checksum, checksumPayload, err := p.session.PushImageLayerRegistry(v1ID, reader, ep, jsonRaw)
+	if err != nil {
+		return "", err
+	}
+	imgData.Checksum = checksum
+	imgData.ChecksumPayload = checksumPayload
+	// Send the checksum
+	if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
+		return "", err
+	}
+
+	if err := p.v1IDService.Set(v1ID, p.repoInfo.Index.Name, l.ChainID()); err != nil {
+		logrus.Warnf("Could not set v1 ID mapping: %v", err)
+	}
+
+	p.out.Write(p.sf.FormatProgress(stringid.TruncateID(v1ID), "Image successfully pushed", nil))
+	return imgData.Checksum, nil
+}

+ 410 - 0
distribution/push_v2.go

@@ -0,0 +1,410 @@
+package distribution
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/manifest"
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/distribution/reference"
+	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/image/v1"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/progressreader"
+	"github.com/docker/docker/pkg/streamformatter"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/registry"
+	"github.com/docker/docker/tag"
+	"golang.org/x/net/context"
+)
+
+type v2Pusher struct {
+	blobSumService *metadata.BlobSumService
+	ref            reference.Named
+	endpoint       registry.APIEndpoint
+	repoInfo       *registry.RepositoryInfo
+	config         *ImagePushConfig
+	sf             *streamformatter.StreamFormatter
+	repo           distribution.Repository
+
+	// layersPushed is the set of layers known to exist on the remote side.
+	// This avoids redundant queries when pushing multiple tags that
+	// involve the same layers.
+	layersPushed map[digest.Digest]bool
+}
+
+func (p *v2Pusher) Push() (fallback bool, err error) {
+	p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
+	if err != nil {
+		logrus.Debugf("Error getting v2 registry: %v", err)
+		return true, err
+	}
+
+	localName := p.repoInfo.LocalName.Name()
+
+	var associations []tag.Association
+	if _, isTagged := p.ref.(reference.Tagged); isTagged {
+		imageID, err := p.config.TagStore.Get(p.ref)
+		if err != nil {
+			return false, fmt.Errorf("tag does not exist: %s", p.ref.String())
+		}
+
+		associations = []tag.Association{
+			{
+				Ref:     p.ref,
+				ImageID: imageID,
+			},
+		}
+	} else {
+		// Pull all tags
+		associations = p.config.TagStore.ReferencesByName(p.ref)
+	}
+	if err != nil {
+		return false, fmt.Errorf("error getting tags for %s: %s", localName, err)
+	}
+	if len(associations) == 0 {
+		return false, fmt.Errorf("no tags to push for %s", localName)
+	}
+
+	for _, association := range associations {
+		if err := p.pushV2Tag(association); err != nil {
+			return false, err
+		}
+	}
+
+	return false, nil
+}
+
+func (p *v2Pusher) pushV2Tag(association tag.Association) error {
+	ref := association.Ref
+	logrus.Debugf("Pushing repository: %s", ref.String())
+
+	img, err := p.config.ImageStore.Get(association.ImageID)
+	if err != nil {
+		return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
+	}
+
+	out := p.config.OutStream
+
+	var l layer.Layer
+
+	topLayerID := img.RootFS.ChainID()
+	if topLayerID == "" {
+		l = layer.EmptyLayer
+	} else {
+		l, err = p.config.LayerStore.Get(topLayerID)
+		if err != nil {
+			return fmt.Errorf("failed to get top layer from image: %v", err)
+		}
+		defer layer.ReleaseAndLog(p.config.LayerStore, l)
+	}
+
+	fsLayers := make(map[layer.DiffID]schema1.FSLayer)
+
+	// Push empty layer if necessary
+	for _, h := range img.History {
+		if h.EmptyLayer {
+			dgst, err := p.pushLayerIfNecessary(out, layer.EmptyLayer)
+			if err != nil {
+				return err
+			}
+			p.layersPushed[dgst] = true
+			fsLayers[layer.EmptyLayer.DiffID()] = schema1.FSLayer{BlobSum: dgst}
+			break
+		}
+	}
+
+	for i := 0; i < len(img.RootFS.DiffIDs); i++ {
+		dgst, err := p.pushLayerIfNecessary(out, l)
+		if err != nil {
+			return err
+		}
+
+		p.layersPushed[dgst] = true
+		fsLayers[l.DiffID()] = schema1.FSLayer{BlobSum: dgst}
+
+		l = l.Parent()
+	}
+
+	var tag string
+	if tagged, isTagged := ref.(reference.Tagged); isTagged {
+		tag = tagged.Tag()
+	}
+	m, err := CreateV2Manifest(p.repo.Name(), tag, img, fsLayers)
+	if err != nil {
+		return err
+	}
+
+	logrus.Infof("Signed manifest for %s using daemon's key: %s", ref.String(), p.config.TrustKey.KeyID())
+	signed, err := schema1.Sign(m, p.config.TrustKey)
+	if err != nil {
+		return err
+	}
+
+	manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
+	if err != nil {
+		return err
+	}
+	if manifestDigest != "" {
+		if tagged, isTagged := ref.(reference.Tagged); isTagged {
+			// NOTE: do not change this format without first changing the trust client
+			// code. This information is used to determine what was pushed and should be signed.
+			out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tagged.Tag(), manifestDigest, manifestSize))
+		}
+	}
+
+	manSvc, err := p.repo.Manifests(context.Background())
+	if err != nil {
+		return err
+	}
+	return manSvc.Put(signed)
+}
+
+func (p *v2Pusher) pushLayerIfNecessary(out io.Writer, l layer.Layer) (digest.Digest, error) {
+	logrus.Debugf("Pushing layer: %s", l.DiffID())
+
+	// Do we have any blobsums associated with this layer's DiffID?
+	possibleBlobsums, err := p.blobSumService.GetBlobSums(l.DiffID())
+	if err == nil {
+		dgst, exists, err := p.blobSumAlreadyExists(possibleBlobsums)
+		if err != nil {
+			out.Write(p.sf.FormatProgress(stringid.TruncateID(string(l.DiffID())), "Image push failed", nil))
+			return "", err
+		}
+		if exists {
+			out.Write(p.sf.FormatProgress(stringid.TruncateID(string(l.DiffID())), "Layer already exists", nil))
+			return dgst, nil
+		}
+	}
+
+	// if digest was empty or not saved, or if blob does not exist on the remote repository,
+	// then push the blob.
+	pushDigest, err := p.pushV2Layer(p.repo.Blobs(context.Background()), l)
+	if err != nil {
+		return "", err
+	}
+	// Cache mapping from this layer's DiffID to the blobsum
+	if err := p.blobSumService.Add(l.DiffID(), pushDigest); err != nil {
+		return "", err
+	}
+
+	return pushDigest, nil
+}
+
+// blobSumAlreadyExists checks if the registry already know about any of the
+// blobsums passed in the "blobsums" slice. If it finds one that the registry
+// knows about, it returns the known digest and "true".
+func (p *v2Pusher) blobSumAlreadyExists(blobsums []digest.Digest) (digest.Digest, bool, error) {
+	for _, dgst := range blobsums {
+		if p.layersPushed[dgst] {
+			// it is already known that the push is not needed and
+			// therefore doing a stat is unnecessary
+			return dgst, true, nil
+		}
+		_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
+		switch err {
+		case nil:
+			return dgst, true, nil
+		case distribution.ErrBlobUnknown:
+			// nop
+		default:
+			return "", false, err
+		}
+	}
+	return "", false, nil
+}
+
+// CreateV2Manifest creates a V2 manifest from an image config and set of
+// FSLayer digests.
+// FIXME: This should be moved to the distribution repo, since it will also
+// be useful for converting new manifests to the old format.
+func CreateV2Manifest(name, tag string, img *image.Image, fsLayers map[layer.DiffID]schema1.FSLayer) (*schema1.Manifest, error) {
+	if len(img.History) == 0 {
+		return nil, errors.New("empty history when trying to create V2 manifest")
+	}
+
+	// Generate IDs for each layer
+	// For non-top-level layers, create fake V1Compatibility strings that
+	// fit the format and don't collide with anything else, but don't
+	// result in runnable images on their own.
+	type v1Compatibility struct {
+		ID              string    `json:"id"`
+		Parent          string    `json:"parent,omitempty"`
+		Comment         string    `json:"comment,omitempty"`
+		Created         time.Time `json:"created"`
+		ContainerConfig struct {
+			Cmd []string
+		} `json:"container_config,omitempty"`
+		ThrowAway bool `json:"throwaway,omitempty"`
+	}
+
+	fsLayerList := make([]schema1.FSLayer, len(img.History))
+	history := make([]schema1.History, len(img.History))
+
+	parent := ""
+	layerCounter := 0
+	for i, h := range img.History {
+		if i == len(img.History)-1 {
+			break
+		}
+
+		var diffID layer.DiffID
+		if h.EmptyLayer {
+			diffID = layer.EmptyLayer.DiffID()
+		} else {
+			if len(img.RootFS.DiffIDs) <= layerCounter {
+				return nil, errors.New("too many non-empty layers in History section")
+			}
+			diffID = img.RootFS.DiffIDs[layerCounter]
+			layerCounter++
+		}
+
+		fsLayer, present := fsLayers[diffID]
+		if !present {
+			return nil, fmt.Errorf("missing layer in CreateV2Manifest: %s", diffID.String())
+		}
+		dgst, err := digest.FromBytes([]byte(fsLayer.BlobSum.Hex() + " " + parent))
+		if err != nil {
+			return nil, err
+		}
+		v1ID := dgst.Hex()
+
+		v1Compatibility := v1Compatibility{
+			ID:      v1ID,
+			Parent:  parent,
+			Comment: h.Comment,
+			Created: h.Created,
+		}
+		v1Compatibility.ContainerConfig.Cmd = []string{img.History[i].CreatedBy}
+		if h.EmptyLayer {
+			v1Compatibility.ThrowAway = true
+		}
+		jsonBytes, err := json.Marshal(&v1Compatibility)
+		if err != nil {
+			return nil, err
+		}
+
+		reversedIndex := len(img.History) - i - 1
+		history[reversedIndex].V1Compatibility = string(jsonBytes)
+		fsLayerList[reversedIndex] = fsLayer
+
+		parent = v1ID
+	}
+
+	latestHistory := img.History[len(img.History)-1]
+
+	var diffID layer.DiffID
+	if latestHistory.EmptyLayer {
+		diffID = layer.EmptyLayer.DiffID()
+	} else {
+		if len(img.RootFS.DiffIDs) <= layerCounter {
+			return nil, errors.New("too many non-empty layers in History section")
+		}
+		diffID = img.RootFS.DiffIDs[layerCounter]
+	}
+	fsLayer, present := fsLayers[diffID]
+	if !present {
+		return nil, fmt.Errorf("missing layer in CreateV2Manifest: %s", diffID.String())
+	}
+
+	dgst, err := digest.FromBytes([]byte(fsLayer.BlobSum.Hex() + " " + parent + " " + string(img.RawJSON())))
+	if err != nil {
+		return nil, err
+	}
+	fsLayerList[0] = fsLayer
+
+	// Top-level v1compatibility string should be a modified version of the
+	// image config.
+	transformedConfig, err := v1.MakeV1ConfigFromConfig(img, dgst.Hex(), parent, latestHistory.EmptyLayer)
+	if err != nil {
+		return nil, err
+	}
+
+	history[0].V1Compatibility = string(transformedConfig)
+
+	// windows-only baselayer setup
+	if err := setupBaseLayer(history, *img.RootFS); err != nil {
+		return nil, err
+	}
+
+	return &schema1.Manifest{
+		Versioned: manifest.Versioned{
+			SchemaVersion: 1,
+		},
+		Name:         name,
+		Tag:          tag,
+		Architecture: img.Architecture,
+		FSLayers:     fsLayerList,
+		History:      history,
+	}, nil
+}
+
+func rawJSON(value interface{}) *json.RawMessage {
+	jsonval, err := json.Marshal(value)
+	if err != nil {
+		return nil
+	}
+	return (*json.RawMessage)(&jsonval)
+}
+
+func (p *v2Pusher) pushV2Layer(bs distribution.BlobService, l layer.Layer) (digest.Digest, error) {
+	out := p.config.OutStream
+	displayID := stringid.TruncateID(string(l.DiffID()))
+
+	out.Write(p.sf.FormatProgress(displayID, "Preparing", nil))
+
+	arch, err := l.TarStream()
+	if err != nil {
+		return "", err
+	}
+
+	// Send the layer
+	layerUpload, err := bs.Create(context.Background())
+	if err != nil {
+		return "", err
+	}
+	defer layerUpload.Close()
+
+	// don't care if this fails; best effort
+	size, _ := l.DiffSize()
+
+	reader := progressreader.New(progressreader.Config{
+		In:        ioutil.NopCloser(arch), // we'll take care of close here.
+		Out:       out,
+		Formatter: p.sf,
+		Size:      size,
+		NewLines:  false,
+		ID:        displayID,
+		Action:    "Pushing",
+	})
+
+	compressedReader := compress(reader)
+
+	digester := digest.Canonical.New()
+	tee := io.TeeReader(compressedReader, digester.Hash())
+
+	out.Write(p.sf.FormatProgress(displayID, "Pushing", nil))
+	nn, err := layerUpload.ReadFrom(tee)
+	compressedReader.Close()
+	if err != nil {
+		return "", err
+	}
+
+	dgst := digester.Digest()
+	if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
+		return "", err
+	}
+
+	logrus.Debugf("uploaded layer %s (%s), %d bytes", l.DiffID(), dgst, nn)
+	out.Write(p.sf.FormatProgress(displayID, "Pushed", nil))
+
+	return dgst, nil
+}

+ 176 - 0
distribution/push_v2_test.go

@@ -0,0 +1,176 @@
+package distribution
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/layer"
+)
+
+func TestCreateV2Manifest(t *testing.T) {
+	imgJSON := `{
+    "architecture": "amd64",
+    "config": {
+        "AttachStderr": false,
+        "AttachStdin": false,
+        "AttachStdout": false,
+        "Cmd": [
+            "/bin/sh",
+            "-c",
+            "echo hi"
+        ],
+        "Domainname": "",
+        "Entrypoint": null,
+        "Env": [
+            "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
+            "derived=true",
+            "asdf=true"
+        ],
+        "Hostname": "23304fc829f9",
+        "Image": "sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246",
+        "Labels": {},
+        "OnBuild": [],
+        "OpenStdin": false,
+        "StdinOnce": false,
+        "Tty": false,
+        "User": "",
+        "Volumes": null,
+        "WorkingDir": ""
+    },
+    "container": "e91032eb0403a61bfe085ff5a5a48e3659e5a6deae9f4d678daa2ae399d5a001",
+    "container_config": {
+        "AttachStderr": false,
+        "AttachStdin": false,
+        "AttachStdout": false,
+        "Cmd": [
+            "/bin/sh",
+            "-c",
+            "#(nop) CMD [\"/bin/sh\" \"-c\" \"echo hi\"]"
+        ],
+        "Domainname": "",
+        "Entrypoint": null,
+        "Env": [
+            "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
+            "derived=true",
+            "asdf=true"
+        ],
+        "Hostname": "23304fc829f9",
+        "Image": "sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246",
+        "Labels": {},
+        "OnBuild": [],
+        "OpenStdin": false,
+        "StdinOnce": false,
+        "Tty": false,
+        "User": "",
+        "Volumes": null,
+        "WorkingDir": ""
+    },
+    "created": "2015-11-04T23:06:32.365666163Z",
+    "docker_version": "1.9.0-dev",
+    "history": [
+        {
+            "created": "2015-10-31T22:22:54.690851953Z",
+            "created_by": "/bin/sh -c #(nop) ADD file:a3bc1e842b69636f9df5256c49c5374fb4eef1e281fe3f282c65fb853ee171c5 in /"
+        },
+        {
+            "created": "2015-10-31T22:22:55.613815829Z",
+            "created_by": "/bin/sh -c #(nop) CMD [\"sh\"]"
+        },
+        {
+            "created": "2015-11-04T23:06:30.934316144Z",
+            "created_by": "/bin/sh -c #(nop) ENV derived=true",
+            "empty_layer": true
+        },
+        {
+            "created": "2015-11-04T23:06:31.192097572Z",
+            "created_by": "/bin/sh -c #(nop) ENV asdf=true",
+            "empty_layer": true
+        },
+        {
+            "created": "2015-11-04T23:06:32.083868454Z",
+            "created_by": "/bin/sh -c dd if=/dev/zero of=/file bs=1024 count=1024"
+        },
+        {
+            "created": "2015-11-04T23:06:32.365666163Z",
+            "created_by": "/bin/sh -c #(nop) CMD [\"/bin/sh\" \"-c\" \"echo hi\"]",
+            "empty_layer": true
+        }
+    ],
+    "os": "linux",
+    "rootfs": {
+        "diff_ids": [
+            "sha256:c6f988f4874bb0add23a778f753c65efe992244e148a1d2ec2a8b664fb66bbd1",
+            "sha256:5f70bf18a086007016e948b04aed3b82103a36bea41755b6cddfaf10ace3c6ef",
+            "sha256:13f53e08df5a220ab6d13c58b2bf83a59cbdc2e04d0a3f041ddf4b0ba4112d49"
+        ],
+        "type": "layers"
+    }
+}`
+
+	// To fill in rawJSON
+	img, err := image.NewFromJSON([]byte(imgJSON))
+	if err != nil {
+		t.Fatalf("json decoding failed: %v", err)
+	}
+
+	fsLayers := map[layer.DiffID]schema1.FSLayer{
+		layer.DiffID("sha256:c6f988f4874bb0add23a778f753c65efe992244e148a1d2ec2a8b664fb66bbd1"): {BlobSum: digest.Digest("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")},
+		layer.DiffID("sha256:5f70bf18a086007016e948b04aed3b82103a36bea41755b6cddfaf10ace3c6ef"): {BlobSum: digest.Digest("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa")},
+		layer.DiffID("sha256:13f53e08df5a220ab6d13c58b2bf83a59cbdc2e04d0a3f041ddf4b0ba4112d49"): {BlobSum: digest.Digest("sha256:b4ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")},
+	}
+
+	manifest, err := CreateV2Manifest("testrepo", "testtag", img, fsLayers)
+	if err != nil {
+		t.Fatalf("CreateV2Manifest returned error: %v", err)
+	}
+
+	if manifest.Versioned.SchemaVersion != 1 {
+		t.Fatal("SchemaVersion != 1")
+	}
+	if manifest.Name != "testrepo" {
+		t.Fatal("incorrect name in manifest")
+	}
+	if manifest.Tag != "testtag" {
+		t.Fatal("incorrect tag in manifest")
+	}
+	if manifest.Architecture != "amd64" {
+		t.Fatal("incorrect arch in manifest")
+	}
+
+	expectedFSLayers := []schema1.FSLayer{
+		{BlobSum: digest.Digest("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa")},
+		{BlobSum: digest.Digest("sha256:b4ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")},
+		{BlobSum: digest.Digest("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa")},
+		{BlobSum: digest.Digest("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa")},
+		{BlobSum: digest.Digest("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa")},
+		{BlobSum: digest.Digest("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")},
+	}
+
+	if len(manifest.FSLayers) != len(expectedFSLayers) {
+		t.Fatalf("wrong number of FSLayers: %d", len(manifest.FSLayers))
+	}
+	if !reflect.DeepEqual(manifest.FSLayers, expectedFSLayers) {
+		t.Fatal("wrong FSLayers list")
+	}
+
+	expectedV1Compatibility := []string{
+		`{"architecture":"amd64","config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","echo hi"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"container":"e91032eb0403a61bfe085ff5a5a48e3659e5a6deae9f4d678daa2ae399d5a001","container_config":{"AttachStderr":false,"AttachStdin":false,"AttachStdout":false,"Cmd":["/bin/sh","-c","#(nop) CMD [\"/bin/sh\" \"-c\" \"echo hi\"]"],"Domainname":"","Entrypoint":null,"Env":["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin","derived=true","asdf=true"],"Hostname":"23304fc829f9","Image":"sha256:4ab15c48b859c2920dd5224f92aabcd39a52794c5b3cf088fb3bbb438756c246","Labels":{},"OnBuild":[],"OpenStdin":false,"StdinOnce":false,"Tty":false,"User":"","Volumes":null,"WorkingDir":""},"created":"2015-11-04T23:06:32.365666163Z","docker_version":"1.9.0-dev","id":"d728140d3fd23dfcac505954af0b2224b3579b177029eded62916579eb19ac64","os":"linux","parent":"0594e66a9830fa5ba73b66349eb221ea4beb6bac8d2148b90a0f371f8d67bcd5","throwaway":true}`,
+		`{"id":"0594e66a9830fa5ba73b66349eb221ea4beb6bac8d2148b90a0f371f8d67bcd5","parent":"39bc0dbed47060dd8952b048e73744ae471fe50354d2c267d308292c53b83ce1","created":"2015-11-04T23:06:32.083868454Z","container_config":{"Cmd":["/bin/sh -c dd if=/dev/zero of=/file bs=1024 count=1024"]}}`,
+		`{"id":"39bc0dbed47060dd8952b048e73744ae471fe50354d2c267d308292c53b83ce1","parent":"875d7f206c023dc979e1677567a01364074f82b61e220c9b83a4610170490381","created":"2015-11-04T23:06:31.192097572Z","container_config":{"Cmd":["/bin/sh -c #(nop) ENV asdf=true"]},"throwaway":true}`,
+		`{"id":"875d7f206c023dc979e1677567a01364074f82b61e220c9b83a4610170490381","parent":"9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e","created":"2015-11-04T23:06:30.934316144Z","container_config":{"Cmd":["/bin/sh -c #(nop) ENV derived=true"]},"throwaway":true}`,
+		`{"id":"9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e","parent":"3690474eb5b4b26fdfbd89c6e159e8cc376ca76ef48032a30fa6aafd56337880","created":"2015-10-31T22:22:55.613815829Z","container_config":{"Cmd":["/bin/sh -c #(nop) CMD [\"sh\"]"]}}`,
+		`{"id":"3690474eb5b4b26fdfbd89c6e159e8cc376ca76ef48032a30fa6aafd56337880","created":"2015-10-31T22:22:54.690851953Z","container_config":{"Cmd":["/bin/sh -c #(nop) ADD file:a3bc1e842b69636f9df5256c49c5374fb4eef1e281fe3f282c65fb853ee171c5 in /"]}}`,
+	}
+
+	if len(manifest.History) != len(expectedV1Compatibility) {
+		t.Fatalf("wrong number of history entries: %d", len(manifest.History))
+	}
+	for i := range expectedV1Compatibility {
+		if manifest.History[i].V1Compatibility != expectedV1Compatibility[i] {
+			t.Fatalf("wrong V1Compatibility %d. expected:\n%s\ngot:\n%s", i, expectedV1Compatibility[i], manifest.History[i].V1Compatibility)
+		}
+	}
+}

+ 12 - 0
distribution/push_v2_unix.go

@@ -0,0 +1,12 @@
+// +build !windows
+
+package distribution
+
+import (
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/docker/image"
+)
+
+func setupBaseLayer(history []schema1.History, rootFS image.RootFS) error {
+	return nil
+}

+ 28 - 0
distribution/push_v2_windows.go

@@ -0,0 +1,28 @@
+// +build windows
+
+package distribution
+
+import (
+	"encoding/json"
+
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/docker/image"
+)
+
+func setupBaseLayer(history []schema1.History, rootFS image.RootFS) error {
+	var v1Config map[string]*json.RawMessage
+	if err := json.Unmarshal([]byte(history[len(history)-1].V1Compatibility), &v1Config); err != nil {
+		return err
+	}
+	baseID, err := json.Marshal(rootFS.BaseLayerID())
+	if err != nil {
+		return err
+	}
+	v1Config["parent"] = (*json.RawMessage)(&baseID)
+	configJSON, err := json.Marshal(v1Config)
+	if err != nil {
+		return err
+	}
+	history[len(history)-1].V1Compatibility = string(configJSON)
+	return nil
+}

+ 115 - 0
distribution/registry.go

@@ -0,0 +1,115 @@
+package distribution
+
+import (
+	"errors"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/docker/distribution/registry/client"
+	"github.com/docker/distribution/registry/client/auth"
+	"github.com/docker/distribution/registry/client/transport"
+	"github.com/docker/docker/cliconfig"
+	"github.com/docker/docker/registry"
+	"golang.org/x/net/context"
+)
+
+type dumbCredentialStore struct {
+	auth *cliconfig.AuthConfig
+}
+
+func (dcs dumbCredentialStore) Basic(*url.URL) (string, string) {
+	return dcs.auth.Username, dcs.auth.Password
+}
+
+// NewV2Repository returns a repository (v2 only). It creates a HTTP transport
+// providing timeout settings and authentication support, and also verifies the
+// remote API version.
+func NewV2Repository(repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint, metaHeaders http.Header, authConfig *cliconfig.AuthConfig, actions ...string) (distribution.Repository, error) {
+	ctx := context.Background()
+
+	repoName := repoInfo.CanonicalName
+	// If endpoint does not support CanonicalName, use the RemoteName instead
+	if endpoint.TrimHostname {
+		repoName = repoInfo.RemoteName
+	}
+
+	// TODO(dmcgowan): Call close idle connections when complete, use keep alive
+	base := &http.Transport{
+		Proxy: http.ProxyFromEnvironment,
+		Dial: (&net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+			DualStack: true,
+		}).Dial,
+		TLSHandshakeTimeout: 10 * time.Second,
+		TLSClientConfig:     endpoint.TLSConfig,
+		// TODO(dmcgowan): Call close idle connections when complete and use keep alive
+		DisableKeepAlives: true,
+	}
+
+	modifiers := registry.DockerHeaders(metaHeaders)
+	authTransport := transport.NewTransport(base, modifiers...)
+	pingClient := &http.Client{
+		Transport: authTransport,
+		Timeout:   5 * time.Second,
+	}
+	endpointStr := strings.TrimRight(endpoint.URL, "/") + "/v2/"
+	req, err := http.NewRequest("GET", endpointStr, nil)
+	if err != nil {
+		return nil, err
+	}
+	resp, err := pingClient.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	versions := auth.APIVersions(resp, endpoint.VersionHeader)
+	if endpoint.VersionHeader != "" && len(endpoint.Versions) > 0 {
+		var foundVersion bool
+		for _, version := range endpoint.Versions {
+			for _, pingVersion := range versions {
+				if version == pingVersion {
+					foundVersion = true
+				}
+			}
+		}
+		if !foundVersion {
+			return nil, errors.New("endpoint does not support v2 API")
+		}
+	}
+
+	challengeManager := auth.NewSimpleChallengeManager()
+	if err := challengeManager.AddResponse(resp); err != nil {
+		return nil, err
+	}
+
+	creds := dumbCredentialStore{auth: authConfig}
+	tokenHandler := auth.NewTokenHandler(authTransport, creds, repoName.Name(), actions...)
+	basicHandler := auth.NewBasicHandler(creds)
+	modifiers = append(modifiers, auth.NewAuthorizer(challengeManager, tokenHandler, basicHandler))
+	tr := transport.NewTransport(base, modifiers...)
+
+	return client.NewRepository(ctx, repoName.Name(), endpoint.URL, tr)
+}
+
+func digestFromManifest(m *schema1.SignedManifest, localName string) (digest.Digest, int, error) {
+	payload, err := m.Payload()
+	if err != nil {
+		// If this failed, the signatures section was corrupted
+		// or missing. Treat the entire manifest as the payload.
+		payload = m.Raw
+	}
+	manifestDigest, err := digest.FromBytes(payload)
+	if err != nil {
+		logrus.Infof("Could not compute manifest digest for %s:%s : %v", localName, m.Tag, err)
+	}
+	return manifestDigest, len(payload), nil
+}

Some files were not shown because too many files changed in this diff