diff --git a/daemon/containerd/image_push.go b/daemon/containerd/image_push.go index cf2f377458..ae3814761d 100644 --- a/daemon/containerd/image_push.go +++ b/daemon/containerd/image_push.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "strings" + "sync" "github.com/containerd/containerd/content" cerrdefs "github.com/containerd/containerd/errdefs" @@ -44,7 +45,13 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, if err != nil { return err } - defer release(leasedCtx) + defer func() { + err := release(leasedCtx) + if err != nil && !cerrdefs.IsNotFound(err) { + logrus.WithField("image", targetRef).WithError(err).Error("failed to delete lease created for push") + } + }() + out := streamformatter.NewJSONProgressOutput(outStream, false) img, err := i.client.ImageService().Get(ctx, targetRef.String()) @@ -57,8 +64,8 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, resolver, tracker := i.newResolverFromAuthConfig(authConfig) progress := pushProgress{Tracker: tracker} - jobs := newJobs() - finishProgress := jobs.showProgress(ctx, out, combinedProgress([]progressUpdater{ + jobsQueue := newJobs() + finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{ &progress, pullProgress{ShowExists: false, Store: store}, })) @@ -66,7 +73,7 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads - mountableBlobs, err := i.findMissingMountable(ctx, store, jobs, target, targetRef, limiter) + mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter) if err != nil { return err } @@ -92,24 +99,17 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, return nil, err } for _, c := range children { - jobs.Add(c) + jobsQueue.Add(c) } - jobs.Add(desc) + jobsQueue.Add(desc) return nil, nil }, ) - appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String()) - if err != nil { - // This shouldn't happen at this point because the reference would have to be invalid - // and if it was, then it would error out earlier. - return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content")) - } - handlerWrapper := func(h images.Handler) images.Handler { - return containerdimages.Handlers(addChildrenToJobs, h, appendSource) + return containerdimages.Handlers(addChildrenToJobs, h) } err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper) @@ -125,19 +125,34 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, err)) } } + } else { + appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String()) + if err != nil { + // This shouldn't happen at this point because the reference would have to be invalid + // and if it was, then it would error out earlier. + return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content")) + } + + if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil { + // Shouldn't happen, but even if it would fail, then make it only a warning + // because it doesn't affect the pushed data. + logrus.WithError(err).Warn("failed to append distribution source labels to pushed content") + } } + return err } // findMissingMountable will walk the target descriptor recursively and return // missing contents with their distribution source which could potentially // be cross-repo mounted. -func (i *ImageService) findMissingMountable(ctx context.Context, store content.Store, jobs *jobs, +func findMissingMountable(ctx context.Context, store content.Store, queue *jobs, target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted, ) (map[digest.Digest]distributionSource, error) { mountableBlobs := map[digest.Digest]distributionSource{} - sources, err := getDigestSources(ctx, store, target.Digest) + var mutex sync.Mutex + sources, err := getDigestSources(ctx, store, target.Digest) if err != nil { if !errdefs.IsNotFound(err) { return nil, err @@ -154,9 +169,11 @@ func (i *ImageService) findMissingMountable(ctx context.Context, store content.S } for _, source := range sources { - if canBeMounted(desc.MediaType, targetRef, i.registryService.IsInsecureRegistry, source) { + if canBeMounted(desc.MediaType, targetRef, source) { + mutex.Lock() mountableBlobs[desc.Digest] = source - jobs.Add(desc) + mutex.Unlock() + queue.Add(desc) break } } @@ -202,14 +219,16 @@ func extractDistributionSources(labels map[string]string) []distributionSource { // if yes, read it as source for k, v := range labels { if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k { - ref, err := reference.ParseNamed(reg + "/" + v) - if err != nil { - continue - } + for _, repo := range strings.Split(v, ",") { + ref, err := reference.ParseNamed(reg + "/" + repo) + if err != nil { + continue + } - sources = append(sources, distributionSource{ - registryRef: ref, - }) + sources = append(sources, distributionSource{ + registryRef: ref, + }) + } } } @@ -233,7 +252,7 @@ func (source distributionSource) GetReference(dgst digest.Digest) (reference.Nam // canBeMounted returns if the content with given media type can be cross-repo // mounted when pushing it to a remote reference ref. -func canBeMounted(mediaType string, targetRef reference.Named, isInsecureFunc func(string) bool, source distributionSource) bool { +func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool { if containerdimages.IsManifestType(mediaType) { return false } @@ -242,11 +261,10 @@ func canBeMounted(mediaType string, targetRef reference.Named, isInsecureFunc fu } reg := reference.Domain(targetRef) - - // Cross-repo mount doesn't seem to work with insecure registries. - isInsecure := isInsecureFunc(reg) - if isInsecure { - return false + // Remove :port suffix from domain + // containerd distribution source label doesn't store port + if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 { + reg = reg[:portIdx] } // If the source registry is the same as the one we are pushing to