Browse Source

Different number of retries for layers of different sizes

Classify blobs into three categories based on size.
Use a very limited number of mount attempts and no existence check for
small blobs. Use more attempts for bigger blobs.

Also remember blob associations during layer existence check.

Blob digests are now checked in the target repository from newest to
latest. If the blob exists and the metadata entry does not, it will be
created. If the blob is not found, the metadata entry will be removed.

Signed-off-by: Michal Minář <miminar@redhat.com>
Michal Minář 8 years ago
parent
commit
81f7b1f1e5
2 changed files with 561 additions and 36 deletions
  1. 134 36
      distribution/push_v2.go
  2. 427 0
      distribution/push_v2_test.go

+ 134 - 36
distribution/push_v2.go

@@ -29,7 +29,10 @@ import (
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/registry"
 )
 )
 
 
-const maxRepositoryMountAttempts = 4
+const (
+	smallLayerMaximumSize  = 100 * (1 << 10) // 100KB
+	middleLayerMaximumSize = 10 * (1 << 20)  // 10MB
+)
 
 
 // PushResult contains the tag, manifest digest, and manifest size from the
 // PushResult contains the tag, manifest digest, and manifest size from the
 // push. It's used to signal this information to the trust code in the client
 // push. It's used to signal this information to the trust code in the client
@@ -158,6 +161,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima
 	for i := 0; i < len(img.RootFS.DiffIDs); i++ {
 	for i := 0; i < len(img.RootFS.DiffIDs); i++ {
 		descriptor := descriptorTemplate
 		descriptor := descriptorTemplate
 		descriptor.layer = l
 		descriptor.layer = l
+		descriptor.checkedDigests = make(map[digest.Digest]struct{})
 		descriptors = append(descriptors, &descriptor)
 		descriptors = append(descriptors, &descriptor)
 
 
 		l = l.Parent()
 		l = l.Parent()
@@ -250,6 +254,8 @@ type v2PushDescriptor struct {
 	repo              distribution.Repository
 	repo              distribution.Repository
 	pushState         *pushState
 	pushState         *pushState
 	remoteDescriptor  distribution.Descriptor
 	remoteDescriptor  distribution.Descriptor
+	// a set of digests whose presence has been checked in a target repository
+	checkedDigests map[digest.Digest]struct{}
 }
 }
 
 
 func (pd *v2PushDescriptor) Key() string {
 func (pd *v2PushDescriptor) Key() string {
@@ -284,25 +290,18 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	}
 	}
 	pd.pushState.Unlock()
 	pd.pushState.Unlock()
 
 
+	maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
+
 	// Do we have any metadata associated with this layer's DiffID?
 	// Do we have any metadata associated with this layer's DiffID?
 	v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
 	v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
 	if err == nil {
 	if err == nil {
-		descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
-		if err != nil {
-			progress.Update(progressOutput, pd.ID(), "Image push failed")
-			return distribution.Descriptor{}, retryOnError(err)
-		}
-		if exists {
-			progress.Update(progressOutput, pd.ID(), "Layer already exists")
-			pd.pushState.Lock()
-			pd.pushState.remoteLayers[diffID] = descriptor
-			pd.pushState.Unlock()
-			return descriptor, nil
+		// check for blob existence in the target repository if we have a mapping with it
+		descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, false, 1, v2Metadata)
+		if exists || err != nil {
+			return descriptor, err
 		}
 		}
 	}
 	}
 
 
-	logrus.Debugf("Pushing layer: %s", diffID)
-
 	// if digest was empty or not saved, or if blob does not exist on the remote repository,
 	// if digest was empty or not saved, or if blob does not exist on the remote repository,
 	// then push the blob.
 	// then push the blob.
 	bs := pd.repo.Blobs(ctx)
 	bs := pd.repo.Blobs(ctx)
@@ -310,7 +309,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 	var layerUpload distribution.BlobWriter
 	var layerUpload distribution.BlobWriter
 
 
 	// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
 	// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
-	candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxRepositoryMountAttempts, v2Metadata)
+	candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
 	for _, mountCandidate := range candidates {
 	for _, mountCandidate := range candidates {
 		logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
 		logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
 		createOpts := []distribution.BlobCreateOption{}
 		createOpts := []distribution.BlobCreateOption{}
@@ -386,6 +385,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 		}
 		}
 	}
 	}
 
 
+	if maxExistenceChecks-len(pd.checkedDigests) > 0 {
+		// do additional layer existence checks with other known digests if any
+		descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
+		if exists || err != nil {
+			return descriptor, err
+		}
+	}
+
+	logrus.Debugf("Pushing layer: %s", diffID)
 	if layerUpload == nil {
 	if layerUpload == nil {
 		layerUpload, err = bs.Create(ctx)
 		layerUpload, err = bs.Create(ctx)
 		if err != nil {
 		if err != nil {
@@ -400,12 +408,6 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
 		return desc, err
 		return desc, err
 	}
 	}
 
 
-	pd.pushState.Lock()
-	// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
-	pd.pushState.confirmedV2 = true
-	pd.pushState.remoteLayers[diffID] = desc
-	pd.pushState.Unlock()
-
 	return desc, nil
 	return desc, nil
 }
 }
 
 
@@ -463,34 +465,130 @@ func (pd *v2PushDescriptor) uploadUsingSession(
 		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
 		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
 	}
 	}
 
 
-	return distribution.Descriptor{
+	desc := distribution.Descriptor{
 		Digest:    pushDigest,
 		Digest:    pushDigest,
 		MediaType: schema2.MediaTypeLayer,
 		MediaType: schema2.MediaTypeLayer,
 		Size:      nn,
 		Size:      nn,
-	}, nil
+	}
+
+	pd.pushState.Lock()
+	// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
+	pd.pushState.confirmedV2 = true
+	pd.pushState.remoteLayers[diffID] = desc
+	pd.pushState.Unlock()
+
+	return desc, nil
 }
 }
 
 
-// layerAlreadyExists checks if the registry already know about any of the
-// metadata passed in the "metadata" slice. If it finds one that the registry
-// knows about, it returns the known digest and "true".
-func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
-	for _, meta := range metadata {
-		// Only check blobsums that are known to this repository or have an unknown source
-		if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() {
+// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
+// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
+// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
+// (not just the target one).
+func (pd *v2PushDescriptor) layerAlreadyExists(
+	ctx context.Context,
+	progressOutput progress.Output,
+	diffID layer.DiffID,
+	checkOtherRepositories bool,
+	maxExistenceCheckAttempts int,
+	v2Metadata []metadata.V2Metadata,
+) (desc distribution.Descriptor, exists bool, err error) {
+	// filter the metadata
+	candidates := []metadata.V2Metadata{}
+	for _, meta := range v2Metadata {
+		if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.FullName() {
+			continue
+		}
+		candidates = append(candidates, meta)
+	}
+	// sort the candidates by similarity
+	sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
+
+	digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
+	// an array of unique blob digests ordered from the best mount candidates to worst
+	layerDigests := []digest.Digest{}
+	for i := 0; i < len(candidates); i++ {
+		if len(layerDigests) >= maxExistenceCheckAttempts {
+			break
+		}
+		meta := &candidates[i]
+		if _, exists := digestToMetadata[meta.Digest]; exists {
+			// keep reference just to the first mapping (the best mount candidate)
+			continue
+		}
+		if _, exists := pd.checkedDigests[meta.Digest]; exists {
+			// existence of this digest has already been tested
 			continue
 			continue
 		}
 		}
-		descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest)
+		digestToMetadata[meta.Digest] = meta
+		layerDigests = append(layerDigests, meta.Digest)
+	}
+
+	for _, dgst := range layerDigests {
+		meta := digestToMetadata[dgst]
+		logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName())
+		desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
+		pd.checkedDigests[meta.Digest] = struct{}{}
 		switch err {
 		switch err {
 		case nil:
 		case nil:
-			descriptor.MediaType = schema2.MediaTypeLayer
-			return descriptor, true, nil
+			if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.FullName() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
+				// cache mapping from this layer's DiffID to the blobsum
+				if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
+					Digest:           desc.Digest,
+					SourceRepository: pd.repoInfo.FullName(),
+				}); err != nil {
+					return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
+				}
+			}
+			desc.MediaType = schema2.MediaTypeLayer
+			exists = true
+			break
 		case distribution.ErrBlobUnknown:
 		case distribution.ErrBlobUnknown:
-			// nop
+			if meta.SourceRepository == pd.repoInfo.FullName() {
+				// remove the mapping to the target repository
+				pd.v2MetadataService.Remove(*meta)
+			}
 		default:
 		default:
-			return distribution.Descriptor{}, false, err
+			progress.Update(progressOutput, pd.ID(), "Image push failed")
+			return desc, false, retryOnError(err)
 		}
 		}
 	}
 	}
-	return distribution.Descriptor{}, false, nil
+
+	if exists {
+		progress.Update(progressOutput, pd.ID(), "Layer already exists")
+		pd.pushState.Lock()
+		pd.pushState.remoteLayers[diffID] = desc
+		pd.pushState.Unlock()
+	}
+
+	return desc, exists, nil
+}
+
+// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
+// source repositories of target registry, maximum number of layer existence checks performed on the target
+// repository and whether the check shall be done also with digests mapped to different repositories. The
+// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
+// of upload does not outweigh a latency.
+func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
+	size, err := layer.DiffSize()
+	switch {
+	// big blob
+	case size > middleLayerMaximumSize:
+		// 1st attempt to mount the blob few times
+		// 2nd few existence checks with digests associated to any repository
+		// then fallback to upload
+		return 4, 3, true
+
+	// middle sized blobs; if we could not get the size, assume we deal with middle sized blob
+	case size > smallLayerMaximumSize, err != nil:
+		// 1st attempt to mount blobs of average size few times
+		// 2nd try at most 1 existence check if there's an existing mapping to the target repository
+		// then fallback to upload
+		return 3, 1, false
+
+	// small blobs, do a minimum number of checks
+	default:
+		return 1, 1, false
+	}
 }
 }
 
 
 // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
 // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The

+ 427 - 0
distribution/push_v2_test.go

@@ -1,11 +1,18 @@
 package distribution
 package distribution
 
 
 import (
 import (
+	"net/http"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
 
 
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/context"
 	"github.com/docker/distribution/digest"
 	"github.com/docker/distribution/digest"
+	"github.com/docker/distribution/manifest/schema2"
+	distreference "github.com/docker/distribution/reference"
 	"github.com/docker/docker/distribution/metadata"
 	"github.com/docker/docker/distribution/metadata"
+	"github.com/docker/docker/layer"
+	"github.com/docker/docker/pkg/progress"
 	"github.com/docker/docker/reference"
 	"github.com/docker/docker/reference"
 )
 )
 
 
@@ -136,6 +143,315 @@ func TestGetRepositoryMountCandidates(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestLayerAlreadyExists(t *testing.T) {
+	for _, tc := range []struct {
+		name                   string
+		metadata               []metadata.V2Metadata
+		targetRepo             string
+		hmacKey                string
+		maxExistenceChecks     int
+		checkOtherRepositories bool
+		remoteBlobs            map[digest.Digest]distribution.Descriptor
+		remoteErrors           map[digest.Digest]error
+		expectedDescriptor     distribution.Descriptor
+		expectedExists         bool
+		expectedError          error
+		expectedRequests       []string
+		expectedAdditions      []metadata.V2Metadata
+		expectedRemovals       []metadata.V2Metadata
+	}{
+		{
+			name:                   "empty metadata",
+			targetRepo:             "busybox",
+			maxExistenceChecks:     3,
+			checkOtherRepositories: true,
+		},
+		{
+			name:               "single not existent metadata",
+			targetRepo:         "busybox",
+			metadata:           []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}},
+			maxExistenceChecks: 3,
+			expectedRequests:   []string{"pear"},
+			expectedRemovals:   []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}},
+		},
+		{
+			name:               "access denied",
+			targetRepo:         "busybox",
+			maxExistenceChecks: 1,
+			metadata:           []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}},
+			remoteErrors:       map[digest.Digest]error{digest.Digest("apple"): distribution.ErrAccessDenied},
+			expectedError:      distribution.ErrAccessDenied,
+			expectedRequests:   []string{"apple"},
+		},
+		{
+			name:               "not matching reposies",
+			targetRepo:         "busybox",
+			maxExistenceChecks: 3,
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"},
+				{Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"},
+				{Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"},
+				{Digest: digest.Digest("plum"), SourceRepository: "busybox"},
+				{Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"},
+			},
+		},
+		{
+			name:                   "check other repositories",
+			targetRepo:             "busybox",
+			maxExistenceChecks:     10,
+			checkOtherRepositories: true,
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"},
+				{Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"},
+				{Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"},
+				{Digest: digest.Digest("plum"), SourceRepository: "busybox"},
+				{Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"},
+			},
+			expectedRequests: []string{"plum", "pear", "apple", "orange", "banana"},
+		},
+		{
+			name:               "find existing blob",
+			targetRepo:         "busybox",
+			metadata:           []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}},
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}},
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"apple"},
+		},
+		{
+			name:               "find existing blob with different hmac",
+			targetRepo:         "busybox",
+			metadata:           []metadata.V2Metadata{{SourceRepository: "docker.io/library/busybox", Digest: digest.Digest("apple"), HMAC: "dummyhmac"}},
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}},
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"apple"},
+			expectedAdditions:  []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}},
+		},
+		{
+			name:               "overwrite media types",
+			targetRepo:         "busybox",
+			metadata:           []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}},
+			hmacKey:            "key",
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple"), MediaType: "custom-media-type"}},
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"apple"},
+			expectedAdditions:  []metadata.V2Metadata{taggedMetadata("key", "apple", "docker.io/library/busybox")},
+		},
+		{
+			name:       "find existing blob among many",
+			targetRepo: "127.0.0.1/myapp",
+			hmacKey:    "key",
+			metadata: []metadata.V2Metadata{
+				taggedMetadata("someotherkey", "pear", "127.0.0.1/myapp"),
+				taggedMetadata("key", "apple", "127.0.0.1/myapp"),
+				taggedMetadata("", "plum", "127.0.0.1/myapp"),
+			},
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}},
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"apple", "plum", "pear"},
+			expectedAdditions:  []metadata.V2Metadata{taggedMetadata("key", "pear", "127.0.0.1/myapp")},
+			expectedRemovals: []metadata.V2Metadata{
+				taggedMetadata("key", "apple", "127.0.0.1/myapp"),
+				{Digest: digest.Digest("plum"), SourceRepository: "127.0.0.1/myapp"},
+			},
+		},
+		{
+			name:       "reach maximum existence checks",
+			targetRepo: "user/app",
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"},
+			},
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}},
+			expectedExists:     false,
+			expectedRequests:   []string{"banana", "plum", "apple"},
+			expectedRemovals: []metadata.V2Metadata{
+				{Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"},
+			},
+		},
+		{
+			name:       "zero allowed existence checks",
+			targetRepo: "user/app",
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"},
+				{Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"},
+			},
+			maxExistenceChecks: 0,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}},
+		},
+		{
+			name:       "stat single digest just once",
+			targetRepo: "busybox",
+			metadata: []metadata.V2Metadata{
+				taggedMetadata("key1", "pear", "docker.io/library/busybox"),
+				taggedMetadata("key2", "apple", "docker.io/library/busybox"),
+				taggedMetadata("key3", "apple", "docker.io/library/busybox"),
+			},
+			maxExistenceChecks: 3,
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}},
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"apple", "pear"},
+			expectedAdditions:  []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}},
+			expectedRemovals:   []metadata.V2Metadata{taggedMetadata("key3", "apple", "docker.io/library/busybox")},
+		},
+		{
+			name:       "stop on first error",
+			targetRepo: "user/app",
+			hmacKey:    "key",
+			metadata: []metadata.V2Metadata{
+				taggedMetadata("key", "banana", "docker.io/user/app"),
+				taggedMetadata("key", "orange", "docker.io/user/app"),
+				taggedMetadata("key", "plum", "docker.io/user/app"),
+			},
+			maxExistenceChecks: 3,
+			remoteErrors:       map[digest.Digest]error{"orange": distribution.ErrAccessDenied},
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {}},
+			expectedError:      distribution.ErrAccessDenied,
+			expectedRequests:   []string{"plum", "orange"},
+			expectedRemovals:   []metadata.V2Metadata{taggedMetadata("key", "plum", "docker.io/user/app")},
+		},
+		{
+			name:       "remove outdated metadata",
+			targetRepo: "docker.io/user/app",
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("plum"), SourceRepository: "docker.io/library/busybox"},
+				{Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"},
+			},
+			maxExistenceChecks: 3,
+			remoteErrors:       map[digest.Digest]error{"orange": distribution.ErrBlobUnknown},
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("plum"): {}},
+			expectedExists:     false,
+			expectedRequests:   []string{"orange"},
+			expectedRemovals:   []metadata.V2Metadata{{Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"}},
+		},
+		{
+			name:       "missing SourceRepository",
+			targetRepo: "busybox",
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("1")},
+				{Digest: digest.Digest("3")},
+				{Digest: digest.Digest("2")},
+			},
+			maxExistenceChecks: 3,
+			expectedExists:     false,
+			expectedRequests:   []string{"2", "3", "1"},
+		},
+
+		{
+			name:       "with and without SourceRepository",
+			targetRepo: "busybox",
+			metadata: []metadata.V2Metadata{
+				{Digest: digest.Digest("1")},
+				{Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"},
+				{Digest: digest.Digest("3")},
+			},
+			remoteBlobs:        map[digest.Digest]distribution.Descriptor{digest.Digest("1"): {Digest: digest.Digest("1")}},
+			maxExistenceChecks: 3,
+			expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("1"), MediaType: schema2.MediaTypeLayer},
+			expectedExists:     true,
+			expectedRequests:   []string{"2", "3", "1"},
+			expectedAdditions:  []metadata.V2Metadata{{Digest: digest.Digest("1"), SourceRepository: "docker.io/library/busybox"}},
+			expectedRemovals: []metadata.V2Metadata{
+				{Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"},
+			},
+		},
+	} {
+		repoInfo, err := reference.ParseNamed(tc.targetRepo)
+		if err != nil {
+			t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err)
+		}
+		repo := &mockRepo{
+			t:        t,
+			errors:   tc.remoteErrors,
+			blobs:    tc.remoteBlobs,
+			requests: []string{},
+		}
+		ctx := context.Background()
+		ms := &mockV2MetadataService{}
+		pd := &v2PushDescriptor{
+			hmacKey:           []byte(tc.hmacKey),
+			repoInfo:          repoInfo,
+			layer:             layer.EmptyLayer,
+			repo:              repo,
+			v2MetadataService: ms,
+			pushState:         &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},
+			checkedDigests:    make(map[digest.Digest]struct{}),
+		}
+
+		desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata)
+
+		if !reflect.DeepEqual(desc, tc.expectedDescriptor) {
+			t.Errorf("[%s] got unexpected descriptor: %#+v != %#+v", tc.name, desc, tc.expectedDescriptor)
+		}
+		if exists != tc.expectedExists {
+			t.Errorf("[%s] got unexpected exists: %t != %t", tc.name, exists, tc.expectedExists)
+		}
+		if !reflect.DeepEqual(err, tc.expectedError) {
+			t.Errorf("[%s] got unexpected error: %#+v != %#+v", tc.name, err, tc.expectedError)
+		}
+
+		if len(repo.requests) != len(tc.expectedRequests) {
+			t.Errorf("[%s] got unexpected number of requests: %d != %d", tc.name, len(repo.requests), len(tc.expectedRequests))
+		}
+		for i := 0; i < len(repo.requests) && i < len(tc.expectedRequests); i++ {
+			if repo.requests[i] != tc.expectedRequests[i] {
+				t.Errorf("[%s] request %d does not match expected: %q != %q", tc.name, i, repo.requests[i], tc.expectedRequests[i])
+			}
+		}
+		for i := len(repo.requests); i < len(tc.expectedRequests); i++ {
+			t.Errorf("[%s] missing expected request at position %d (%q)", tc.name, i, tc.expectedRequests[i])
+		}
+		for i := len(tc.expectedRequests); i < len(repo.requests); i++ {
+			t.Errorf("[%s] got unexpected request at position %d (%q)", tc.name, i, repo.requests[i])
+		}
+
+		if len(ms.added) != len(tc.expectedAdditions) {
+			t.Errorf("[%s] got unexpected number of additions: %d != %d", tc.name, len(ms.added), len(tc.expectedAdditions))
+		}
+		for i := 0; i < len(ms.added) && i < len(tc.expectedAdditions); i++ {
+			if ms.added[i] != tc.expectedAdditions[i] {
+				t.Errorf("[%s] added metadata at %d does not match expected: %q != %q", tc.name, i, ms.added[i], tc.expectedAdditions[i])
+			}
+		}
+		for i := len(ms.added); i < len(tc.expectedAdditions); i++ {
+			t.Errorf("[%s] missing expected addition at position %d (%q)", tc.name, i, tc.expectedAdditions[i])
+		}
+		for i := len(tc.expectedAdditions); i < len(ms.added); i++ {
+			t.Errorf("[%s] unexpected metadata addition at position %d (%q)", tc.name, i, ms.added[i])
+		}
+
+		if len(ms.removed) != len(tc.expectedRemovals) {
+			t.Errorf("[%s] got unexpected number of removals: %d != %d", tc.name, len(ms.removed), len(tc.expectedRemovals))
+		}
+		for i := 0; i < len(ms.removed) && i < len(tc.expectedRemovals); i++ {
+			if ms.removed[i] != tc.expectedRemovals[i] {
+				t.Errorf("[%s] removed metadata at %d does not match expected: %q != %q", tc.name, i, ms.removed[i], tc.expectedRemovals[i])
+			}
+		}
+		for i := len(ms.removed); i < len(tc.expectedRemovals); i++ {
+			t.Errorf("[%s] missing expected removal at position %d (%q)", tc.name, i, tc.expectedRemovals[i])
+		}
+		for i := len(tc.expectedRemovals); i < len(ms.removed); i++ {
+			t.Errorf("[%s] removed unexpected metadata at position %d (%q)", tc.name, i, ms.removed[i])
+		}
+	}
+}
+
 func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata {
 func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata {
 	meta := metadata.V2Metadata{
 	meta := metadata.V2Metadata{
 		Digest:           digest.Digest(dgst),
 		Digest:           digest.Digest(dgst),
@@ -145,3 +461,114 @@ func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metad
 	meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta)
 	meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta)
 	return meta
 	return meta
 }
 }
+
+type mockRepo struct {
+	t        *testing.T
+	errors   map[digest.Digest]error
+	blobs    map[digest.Digest]distribution.Descriptor
+	requests []string
+}
+
+var _ distribution.Repository = &mockRepo{}
+
+func (m *mockRepo) Named() distreference.Named {
+	m.t.Fatalf("Named() not implemented")
+	return nil
+}
+func (m *mockRepo) Manifests(ctc context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
+	m.t.Fatalf("Manifests() not implemented")
+	return nil, nil
+}
+func (m *mockRepo) Tags(ctc context.Context) distribution.TagService {
+	m.t.Fatalf("Tags() not implemented")
+	return nil
+}
+func (m *mockRepo) Blobs(ctx context.Context) distribution.BlobStore {
+	return &mockBlobStore{
+		repo: m,
+	}
+}
+
+type mockBlobStore struct {
+	repo *mockRepo
+}
+
+var _ distribution.BlobStore = &mockBlobStore{}
+
+func (m *mockBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
+	m.repo.requests = append(m.repo.requests, dgst.String())
+	if err, exists := m.repo.errors[dgst]; exists {
+		return distribution.Descriptor{}, err
+	}
+	if desc, exists := m.repo.blobs[dgst]; exists {
+		return desc, nil
+	}
+	return distribution.Descriptor{}, distribution.ErrBlobUnknown
+}
+func (m *mockBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
+	m.repo.t.Fatal("Get() not implemented")
+	return nil, nil
+}
+
+func (m *mockBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
+	m.repo.t.Fatal("Open() not implemented")
+	return nil, nil
+}
+
+func (m *mockBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
+	m.repo.t.Fatal("Put() not implemented")
+	return distribution.Descriptor{}, nil
+}
+
+func (m *mockBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
+	m.repo.t.Fatal("Create() not implemented")
+	return nil, nil
+}
+func (m *mockBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
+	m.repo.t.Fatal("Resume() not implemented")
+	return nil, nil
+}
+func (m *mockBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
+	m.repo.t.Fatal("Delete() not implemented")
+	return nil
+}
+func (m *mockBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
+	m.repo.t.Fatalf("ServeBlob() not implemented")
+	return nil
+}
+
+type mockV2MetadataService struct {
+	added   []metadata.V2Metadata
+	removed []metadata.V2Metadata
+}
+
+var _ metadata.V2MetadataService = &mockV2MetadataService{}
+
+func (*mockV2MetadataService) GetMetadata(diffID layer.DiffID) ([]metadata.V2Metadata, error) {
+	return nil, nil
+}
+func (*mockV2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) {
+	return "", nil
+}
+func (m *mockV2MetadataService) Add(diffID layer.DiffID, metadata metadata.V2Metadata) error {
+	m.added = append(m.added, metadata)
+	return nil
+}
+func (m *mockV2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta metadata.V2Metadata) error {
+	meta.HMAC = metadata.ComputeV2MetadataHMAC(hmacKey, &meta)
+	m.Add(diffID, meta)
+	return nil
+}
+func (m *mockV2MetadataService) Remove(metadata metadata.V2Metadata) error {
+	m.removed = append(m.removed, metadata)
+	return nil
+}
+
+type progressSink struct {
+	t *testing.T
+}
+
+func (s *progressSink) WriteProgress(p progress.Progress) error {
+	s.t.Logf("progress update: %#+v", p)
+	return nil
+}