Merge pull request #45249 from vvoland/c8d-push-upstream-2

c8d/push: Follow up fixes
This commit is contained in:
Sebastiaan van Stijn 2023-03-31 22:37:59 +02:00 committed by GitHub
commit 348f83670a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"sync"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs" cerrdefs "github.com/containerd/containerd/errdefs"
@ -44,7 +45,13 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named,
if err != nil { if err != nil {
return err return err
} }
defer release(leasedCtx) defer func() {
err := release(leasedCtx)
if err != nil && !cerrdefs.IsNotFound(err) {
logrus.WithField("image", targetRef).WithError(err).Error("failed to delete lease created for push")
}
}()
out := streamformatter.NewJSONProgressOutput(outStream, false) out := streamformatter.NewJSONProgressOutput(outStream, false)
img, err := i.client.ImageService().Get(ctx, targetRef.String()) img, err := i.client.ImageService().Get(ctx, targetRef.String())
@ -57,8 +64,8 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named,
resolver, tracker := i.newResolverFromAuthConfig(authConfig) resolver, tracker := i.newResolverFromAuthConfig(authConfig)
progress := pushProgress{Tracker: tracker} progress := pushProgress{Tracker: tracker}
jobs := newJobs() jobsQueue := newJobs()
finishProgress := jobs.showProgress(ctx, out, combinedProgress([]progressUpdater{ finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
&progress, &progress,
pullProgress{ShowExists: false, Store: store}, pullProgress{ShowExists: false, Store: store},
})) }))
@ -66,7 +73,7 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named,
var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
mountableBlobs, err := i.findMissingMountable(ctx, store, jobs, target, targetRef, limiter) mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
if err != nil { if err != nil {
return err return err
} }
@ -92,24 +99,17 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named,
return nil, err return nil, err
} }
for _, c := range children { for _, c := range children {
jobs.Add(c) jobsQueue.Add(c)
} }
jobs.Add(desc) jobsQueue.Add(desc)
return nil, nil return nil, nil
}, },
) )
appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
if err != nil {
// This shouldn't happen at this point because the reference would have to be invalid
// and if it was, then it would error out earlier.
return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
}
handlerWrapper := func(h images.Handler) images.Handler { handlerWrapper := func(h images.Handler) images.Handler {
return containerdimages.Handlers(addChildrenToJobs, h, appendSource) return containerdimages.Handlers(addChildrenToJobs, h)
} }
err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper) err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
@ -125,19 +125,34 @@ func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named,
err)) err))
} }
} }
} else {
appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
if err != nil {
// This shouldn't happen at this point because the reference would have to be invalid
// and if it was, then it would error out earlier.
return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
}
if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil {
// Shouldn't happen, but even if it would fail, then make it only a warning
// because it doesn't affect the pushed data.
logrus.WithError(err).Warn("failed to append distribution source labels to pushed content")
}
} }
return err return err
} }
// findMissingMountable will walk the target descriptor recursively and return // findMissingMountable will walk the target descriptor recursively and return
// missing contents with their distribution source which could potentially // missing contents with their distribution source which could potentially
// be cross-repo mounted. // be cross-repo mounted.
func (i *ImageService) findMissingMountable(ctx context.Context, store content.Store, jobs *jobs, func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted, target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
) (map[digest.Digest]distributionSource, error) { ) (map[digest.Digest]distributionSource, error) {
mountableBlobs := map[digest.Digest]distributionSource{} mountableBlobs := map[digest.Digest]distributionSource{}
sources, err := getDigestSources(ctx, store, target.Digest) var mutex sync.Mutex
sources, err := getDigestSources(ctx, store, target.Digest)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return nil, err return nil, err
@ -154,9 +169,11 @@ func (i *ImageService) findMissingMountable(ctx context.Context, store content.S
} }
for _, source := range sources { for _, source := range sources {
if canBeMounted(desc.MediaType, targetRef, i.registryService.IsInsecureRegistry, source) { if canBeMounted(desc.MediaType, targetRef, source) {
mutex.Lock()
mountableBlobs[desc.Digest] = source mountableBlobs[desc.Digest] = source
jobs.Add(desc) mutex.Unlock()
queue.Add(desc)
break break
} }
} }
@ -202,14 +219,16 @@ func extractDistributionSources(labels map[string]string) []distributionSource {
// if yes, read it as source // if yes, read it as source
for k, v := range labels { for k, v := range labels {
if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k { if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k {
ref, err := reference.ParseNamed(reg + "/" + v) for _, repo := range strings.Split(v, ",") {
if err != nil { ref, err := reference.ParseNamed(reg + "/" + repo)
continue if err != nil {
} continue
}
sources = append(sources, distributionSource{ sources = append(sources, distributionSource{
registryRef: ref, registryRef: ref,
}) })
}
} }
} }
@ -233,7 +252,7 @@ func (source distributionSource) GetReference(dgst digest.Digest) (reference.Nam
// canBeMounted returns if the content with given media type can be cross-repo // canBeMounted returns if the content with given media type can be cross-repo
// mounted when pushing it to a remote reference ref. // mounted when pushing it to a remote reference ref.
func canBeMounted(mediaType string, targetRef reference.Named, isInsecureFunc func(string) bool, source distributionSource) bool { func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
if containerdimages.IsManifestType(mediaType) { if containerdimages.IsManifestType(mediaType) {
return false return false
} }
@ -242,11 +261,10 @@ func canBeMounted(mediaType string, targetRef reference.Named, isInsecureFunc fu
} }
reg := reference.Domain(targetRef) reg := reference.Domain(targetRef)
// Remove :port suffix from domain
// Cross-repo mount doesn't seem to work with insecure registries. // containerd distribution source label doesn't store port
isInsecure := isInsecureFunc(reg) if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
if isInsecure { reg = reg[:portIdx]
return false
} }
// If the source registry is the same as the one we are pushing to // If the source registry is the same as the one we are pushing to