|
@@ -11,6 +11,7 @@ import (
|
|
|
"github.com/docker/distribution/digest"
|
|
|
"github.com/docker/distribution/manifest/schema1"
|
|
|
"github.com/docker/distribution/manifest/schema2"
|
|
|
+ distreference "github.com/docker/distribution/reference"
|
|
|
"github.com/docker/distribution/registry/client"
|
|
|
"github.com/docker/docker/distribution/metadata"
|
|
|
"github.com/docker/docker/distribution/xfer"
|
|
@@ -34,12 +35,12 @@ type PushResult struct {
|
|
|
}
|
|
|
|
|
|
type v2Pusher struct {
|
|
|
- blobSumService *metadata.BlobSumService
|
|
|
- ref reference.Named
|
|
|
- endpoint registry.APIEndpoint
|
|
|
- repoInfo *registry.RepositoryInfo
|
|
|
- config *ImagePushConfig
|
|
|
- repo distribution.Repository
|
|
|
+ v2MetadataService *metadata.V2MetadataService
|
|
|
+ ref reference.Named
|
|
|
+ endpoint registry.APIEndpoint
|
|
|
+ repoInfo *registry.RepositoryInfo
|
|
|
+ config *ImagePushConfig
|
|
|
+ repo distribution.Repository
|
|
|
|
|
|
// pushState is state built by the Download functions.
|
|
|
pushState pushState
|
|
@@ -130,10 +131,10 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima
|
|
|
var descriptors []xfer.UploadDescriptor
|
|
|
|
|
|
descriptorTemplate := v2PushDescriptor{
|
|
|
- blobSumService: p.blobSumService,
|
|
|
- repoInfo: p.repoInfo,
|
|
|
- repo: p.repo,
|
|
|
- pushState: &p.pushState,
|
|
|
+ v2MetadataService: p.v2MetadataService,
|
|
|
+ repoInfo: p.repoInfo,
|
|
|
+ repo: p.repo,
|
|
|
+ pushState: &p.pushState,
|
|
|
}
|
|
|
|
|
|
// Loop bounds condition is to avoid pushing the base layer on Windows.
|
|
@@ -210,11 +211,11 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
|
|
|
}
|
|
|
|
|
|
type v2PushDescriptor struct {
|
|
|
- layer layer.Layer
|
|
|
- blobSumService *metadata.BlobSumService
|
|
|
- repoInfo reference.Named
|
|
|
- repo distribution.Repository
|
|
|
- pushState *pushState
|
|
|
+ layer layer.Layer
|
|
|
+ v2MetadataService *metadata.V2MetadataService
|
|
|
+ repoInfo reference.Named
|
|
|
+ repo distribution.Repository
|
|
|
+ pushState *pushState
|
|
|
}
|
|
|
|
|
|
func (pd *v2PushDescriptor) Key() string {
|
|
@@ -242,10 +243,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
|
|
|
}
|
|
|
pd.pushState.Unlock()
|
|
|
|
|
|
- // Do we have any blobsums associated with this layer's DiffID?
|
|
|
- possibleBlobsums, err := pd.blobSumService.GetBlobSums(diffID)
|
|
|
+ // Do we have any metadata associated with this layer's DiffID?
|
|
|
+ v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
|
|
|
if err == nil {
|
|
|
- descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repoInfo, pd.repo, pd.pushState)
|
|
|
+ descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
|
|
|
if err != nil {
|
|
|
progress.Update(progressOutput, pd.ID(), "Image push failed")
|
|
|
return retryOnError(err)
|
|
@@ -265,39 +266,69 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
|
|
|
// then push the blob.
|
|
|
bs := pd.repo.Blobs(ctx)
|
|
|
|
|
|
+ var mountFrom metadata.V2Metadata
|
|
|
+
|
|
|
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
|
|
|
- for _, blobsum := range possibleBlobsums {
|
|
|
- sourceRepo, err := reference.ParseNamed(blobsum.SourceRepository)
|
|
|
+ for _, metadata := range v2Metadata {
|
|
|
+ sourceRepo, err := reference.ParseNamed(metadata.SourceRepository)
|
|
|
if err != nil {
|
|
|
continue
|
|
|
}
|
|
|
if pd.repoInfo.Hostname() == sourceRepo.Hostname() {
|
|
|
- logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, blobsum.Digest, sourceRepo.FullName())
|
|
|
+ logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, metadata.Digest, sourceRepo.FullName())
|
|
|
+ mountFrom = metadata
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- desc, err := bs.Mount(ctx, sourceRepo.RemoteName(), blobsum.Digest)
|
|
|
- if err == nil {
|
|
|
- progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", sourceRepo.RemoteName())
|
|
|
+ var createOpts []distribution.BlobCreateOption
|
|
|
|
|
|
- pd.pushState.Lock()
|
|
|
- pd.pushState.confirmedV2 = true
|
|
|
- pd.pushState.remoteLayers[diffID] = desc
|
|
|
- pd.pushState.Unlock()
|
|
|
+ if mountFrom.SourceRepository != "" {
|
|
|
+ namedRef, err := reference.WithName(mountFrom.SourceRepository)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- // Cache mapping from this layer's DiffID to the blobsum
|
|
|
- if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: blobsum.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
- return xfer.DoNotRetry{Err: err}
|
|
|
- }
|
|
|
+ // TODO (brianbland): We need to construct a reference where the Name is
|
|
|
+ // only the full remote name, so clean this up when distribution has a
|
|
|
+ // richer reference package
|
|
|
+ remoteRef, err := distreference.WithName(namedRef.RemoteName())
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- return nil
|
|
|
- }
|
|
|
- // Unable to mount layer from this repository, so this source mapping is no longer valid
|
|
|
- logrus.Debugf("unassociating layer %s (%s) with %s", diffID, blobsum.Digest, sourceRepo.FullName())
|
|
|
- pd.blobSumService.Remove(blobsum)
|
|
|
+ canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
+
|
|
|
+ createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
|
|
|
}
|
|
|
|
|
|
// Send the layer
|
|
|
- layerUpload, err := bs.Create(ctx)
|
|
|
+ layerUpload, err := bs.Create(ctx, createOpts...)
|
|
|
+ switch err := err.(type) {
|
|
|
+ case distribution.ErrBlobMounted:
|
|
|
+ progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
|
|
|
+
|
|
|
+ pd.pushState.Lock()
|
|
|
+ pd.pushState.confirmedV2 = true
|
|
|
+ pd.pushState.remoteLayers[diffID] = err.Descriptor
|
|
|
+ pd.pushState.Unlock()
|
|
|
+
|
|
|
+ // Cache mapping from this layer's DiffID to the blobsum
|
|
|
+ if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
+ return xfer.DoNotRetry{Err: err}
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if mountFrom.SourceRepository != "" {
|
|
|
+ // unable to mount layer from this repository, so this source mapping is no longer valid
|
|
|
+ logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
|
|
|
+ pd.v2MetadataService.Remove(mountFrom)
|
|
|
+ }
|
|
|
+
|
|
|
if err != nil {
|
|
|
return retryOnError(err)
|
|
|
}
|
|
@@ -333,7 +364,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
|
|
|
progress.Update(progressOutput, pd.ID(), "Pushed")
|
|
|
|
|
|
// Cache mapping from this layer's DiffID to the blobsum
|
|
|
- if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
+ if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
return xfer.DoNotRetry{Err: err}
|
|
|
}
|
|
|
|
|
@@ -362,16 +393,16 @@ func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
|
|
|
return pd.pushState.remoteLayers[pd.DiffID()]
|
|
|
}
|
|
|
|
|
|
-// blobSumAlreadyExists checks if the registry already know about any of the
|
|
|
-// blobsums passed in the "blobsums" slice. If it finds one that the registry
|
|
|
+// layerAlreadyExists checks if the registry already know about any of the
|
|
|
+// metadata passed in the "metadata" slice. If it finds one that the registry
|
|
|
// knows about, it returns the known digest and "true".
|
|
|
-func blobSumAlreadyExists(ctx context.Context, blobsums []metadata.BlobSum, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
|
|
|
- for _, blobSum := range blobsums {
|
|
|
+func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
|
|
|
+ for _, meta := range metadata {
|
|
|
// Only check blobsums that are known to this repository or have an unknown source
|
|
|
- if blobSum.SourceRepository != "" && blobSum.SourceRepository != repoInfo.FullName() {
|
|
|
+ if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() {
|
|
|
continue
|
|
|
}
|
|
|
- descriptor, err := repo.Blobs(ctx).Stat(ctx, blobSum.Digest)
|
|
|
+ descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest)
|
|
|
switch err {
|
|
|
case nil:
|
|
|
descriptor.MediaType = schema2.MediaTypeLayer
|