|
@@ -274,27 +274,29 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
|
|
// then push the blob.
|
|
// then push the blob.
|
|
bs := pd.repo.Blobs(ctx)
|
|
bs := pd.repo.Blobs(ctx)
|
|
|
|
|
|
- var mountFrom metadata.V2Metadata
|
|
|
|
-
|
|
|
|
- // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
|
|
|
|
- for _, metadata := range v2Metadata {
|
|
|
|
- sourceRepo, err := reference.ParseNamed(metadata.SourceRepository)
|
|
|
|
|
|
+ var layerUpload distribution.BlobWriter
|
|
|
|
+ mountAttemptsRemaining := 3
|
|
|
|
+
|
|
|
|
+ // Attempt to find another repository in the same registry to mount the layer
|
|
|
|
+ // from to avoid an unnecessary upload.
|
|
|
|
+ // Note: metadata is stored from oldest to newest, so we iterate through this
|
|
|
|
+ // slice in reverse to maximize our chances of the blob still existing in the
|
|
|
|
+ // remote repository.
|
|
|
|
+ for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
|
|
|
|
+ mountFrom := v2Metadata[i]
|
|
|
|
+
|
|
|
|
+ sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
|
|
if err != nil {
|
|
if err != nil {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- if pd.repoInfo.Hostname() == sourceRepo.Hostname() {
|
|
|
|
- logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, metadata.Digest, sourceRepo.FullName())
|
|
|
|
- mountFrom = metadata
|
|
|
|
- break
|
|
|
|
|
|
+ if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
|
|
|
|
+ // don't mount blobs from another registry
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- var createOpts []distribution.BlobCreateOption
|
|
|
|
|
|
|
|
- if mountFrom.SourceRepository != "" {
|
|
|
|
namedRef, err := reference.WithName(mountFrom.SourceRepository)
|
|
namedRef, err := reference.WithName(mountFrom.SourceRepository)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return err
|
|
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
|
|
|
|
// TODO (brianbland): We need to construct a reference where the Name is
|
|
// TODO (brianbland): We need to construct a reference where the Name is
|
|
@@ -302,45 +304,49 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
|
|
// richer reference package
|
|
// richer reference package
|
|
remoteRef, err := distreference.WithName(namedRef.RemoteName())
|
|
remoteRef, err := distreference.WithName(namedRef.RemoteName())
|
|
if err != nil {
|
|
if err != nil {
|
|
- return err
|
|
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
|
|
|
|
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
|
|
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return err
|
|
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
|
|
|
|
- createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
|
|
|
|
- }
|
|
|
|
|
|
+ logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
|
|
|
|
|
|
- // Send the layer
|
|
|
|
- layerUpload, err := bs.Create(ctx, createOpts...)
|
|
|
|
- switch err := err.(type) {
|
|
|
|
- case distribution.ErrBlobMounted:
|
|
|
|
- progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
|
|
|
|
|
|
+ layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
|
|
|
|
+ switch err := err.(type) {
|
|
|
|
+ case distribution.ErrBlobMounted:
|
|
|
|
+ progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
|
|
|
|
|
|
- err.Descriptor.MediaType = schema2.MediaTypeLayer
|
|
|
|
|
|
+ err.Descriptor.MediaType = schema2.MediaTypeLayer
|
|
|
|
|
|
- pd.pushState.Lock()
|
|
|
|
- pd.pushState.confirmedV2 = true
|
|
|
|
- pd.pushState.remoteLayers[diffID] = err.Descriptor
|
|
|
|
- pd.pushState.Unlock()
|
|
|
|
|
|
+ pd.pushState.Lock()
|
|
|
|
+ pd.pushState.confirmedV2 = true
|
|
|
|
+ pd.pushState.remoteLayers[diffID] = err.Descriptor
|
|
|
|
+ pd.pushState.Unlock()
|
|
|
|
|
|
- // Cache mapping from this layer's DiffID to the blobsum
|
|
|
|
- if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
|
- return xfer.DoNotRetry{Err: err}
|
|
|
|
|
|
+ // Cache mapping from this layer's DiffID to the blobsum
|
|
|
|
+ if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
|
|
|
|
+ return xfer.DoNotRetry{Err: err}
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+ case nil:
|
|
|
|
+ // blob upload session created successfully, so begin the upload
|
|
|
|
+ mountAttemptsRemaining = 0
|
|
|
|
+ default:
|
|
|
|
+ // unable to mount layer from this repository, so this source mapping is no longer valid
|
|
|
|
+ logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
|
|
|
|
+ pd.v2MetadataService.Remove(mountFrom)
|
|
|
|
+ mountAttemptsRemaining--
|
|
}
|
|
}
|
|
-
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
- if mountFrom.SourceRepository != "" {
|
|
|
|
- // unable to mount layer from this repository, so this source mapping is no longer valid
|
|
|
|
- logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
|
|
|
|
- pd.v2MetadataService.Remove(mountFrom)
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- return retryOnError(err)
|
|
|
|
|
|
+ if layerUpload == nil {
|
|
|
|
+ layerUpload, err = bs.Create(ctx)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return retryOnError(err)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
defer layerUpload.Close()
|
|
defer layerUpload.Close()
|
|
|
|
|