123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package containerd
- import (
- "context"
- "fmt"
- "io"
- "strings"
- "sync"
- "github.com/containerd/containerd/content"
- cerrdefs "github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/images"
- containerdimages "github.com/containerd/containerd/images"
- "github.com/containerd/containerd/log"
- "github.com/containerd/containerd/platforms"
- "github.com/containerd/containerd/remotes"
- "github.com/containerd/containerd/remotes/docker"
- "github.com/docker/distribution/reference"
- "github.com/docker/docker/api/types/registry"
- "github.com/docker/docker/errdefs"
- "github.com/docker/docker/pkg/streamformatter"
- "github.com/opencontainers/go-digest"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- "golang.org/x/sync/semaphore"
- )
- // PushImage initiates a push operation of the image pointed to by targetRef.
- // Image manifest (or index) is pushed as is, which will probably fail if you
- // don't have all content referenced by the index.
- // Cross-repo mounts will be attempted for non-existing blobs.
- //
- // It will also add distribution source labels to the pushed content
- // pointing to the new target repository. This will allow subsequent pushes
- // to perform cross-repo mounts of the shared content when pushing to a different
- // repository on the same registry.
- func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
- if _, tagged := targetRef.(reference.Tagged); !tagged {
- if _, digested := targetRef.(reference.Digested); !digested {
- return errdefs.NotImplemented(errors.New("push all tags is not implemented"))
- }
- }
- leasedCtx, release, err := i.client.WithLease(ctx)
- if err != nil {
- return err
- }
- defer func() {
- err := release(leasedCtx)
- if err != nil && !cerrdefs.IsNotFound(err) {
- log.G(ctx).WithField("image", targetRef).WithError(err).Error("failed to delete lease created for push")
- }
- }()
- out := streamformatter.NewJSONProgressOutput(outStream, false)
- img, err := i.client.ImageService().Get(ctx, targetRef.String())
- if err != nil {
- return errdefs.NotFound(err)
- }
- target := img.Target
- store := i.client.ContentStore()
- resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig)
- progress := pushProgress{Tracker: tracker}
- jobsQueue := newJobs()
- finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
- &progress,
- pullProgress{ShowExists: false, Store: store},
- }))
- defer finishProgress()
- var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
- mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
- if err != nil {
- return err
- }
- for dgst := range mountableBlobs {
- progress.addMountable(dgst)
- }
- // Create a store which fakes the local existence of possibly mountable blobs.
- // Otherwise they can't be pushed at all.
- realStore := store
- wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
- store = wrapped
- pusher, err := resolver.Pusher(ctx, targetRef.String())
- if err != nil {
- return err
- }
- addChildrenToJobs := containerdimages.HandlerFunc(
- func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- children, err := containerdimages.Children(ctx, store, desc)
- if err != nil {
- return nil, err
- }
- for _, c := range children {
- jobsQueue.Add(c)
- }
- jobsQueue.Add(desc)
- return nil, nil
- },
- )
- handlerWrapper := func(h images.Handler) images.Handler {
- return containerdimages.Handlers(addChildrenToJobs, h)
- }
- err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
- if err != nil {
- if containerdimages.IsIndexType(target.MediaType) {
- if cerrdefs.IsNotFound(err) {
- err = errdefs.NotFound(fmt.Errorf(
- "missing content: %w\n"+
- "Note: You're trying to push a manifest list/index which "+
- "references multiple platform specific manifests, but not all of them are available locally "+
- "or available to the remote repository.\n"+
- "Make sure you have all the referenced content and try again.",
- err))
- }
- }
- } else {
- appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
- if err != nil {
- // This shouldn't happen at this point because the reference would have to be invalid
- // and if it was, then it would error out earlier.
- return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
- }
- if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil {
- // Shouldn't happen, but even if it would fail, then make it only a warning
- // because it doesn't affect the pushed data.
- log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
- }
- }
- return err
- }
- // findMissingMountable will walk the target descriptor recursively and return
- // missing contents with their distribution source which could potentially
- // be cross-repo mounted.
- func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
- target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
- ) (map[digest.Digest]distributionSource, error) {
- mountableBlobs := map[digest.Digest]distributionSource{}
- var mutex sync.Mutex
- sources, err := getDigestSources(ctx, store, target.Digest)
- if err != nil {
- if !errdefs.IsNotFound(err) {
- return nil, err
- }
- log.G(ctx).WithField("target", target).Debug("distribution source label not found")
- return mountableBlobs, nil
- }
- handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- _, err := store.Info(ctx, desc.Digest)
- if err != nil {
- if !cerrdefs.IsNotFound(err) {
- return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
- }
- for _, source := range sources {
- if canBeMounted(desc.MediaType, targetRef, source) {
- mutex.Lock()
- mountableBlobs[desc.Digest] = source
- mutex.Unlock()
- queue.Add(desc)
- break
- }
- }
- return nil, nil
- }
- return containerdimages.Children(ctx, store, desc)
- }
- err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
- if err != nil {
- return nil, err
- }
- return mountableBlobs, nil
- }
- func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
- info, err := store.Info(ctx, digest)
- if err != nil {
- if cerrdefs.IsNotFound(err) {
- return nil, errdefs.NotFound(err)
- }
- return nil, errdefs.System(err)
- }
- sources := extractDistributionSources(info.Labels)
- if sources == nil {
- return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", labelDistributionSource, digest.String()))
- }
- return sources, nil
- }
- // TODO(vvoland): Remove and use containerd const in containerd 1.7+
- // https://github.com/containerd/containerd/pull/8224
- const labelDistributionSource = "containerd.io/distribution.source."
- func extractDistributionSources(labels map[string]string) []distributionSource {
- var sources []distributionSource
- // Check if this blob has a distributionSource label
- // if yes, read it as source
- for k, v := range labels {
- if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k {
- for _, repo := range strings.Split(v, ",") {
- ref, err := reference.ParseNamed(reg + "/" + repo)
- if err != nil {
- continue
- }
- sources = append(sources, distributionSource{
- registryRef: ref,
- })
- }
- }
- }
- return sources
- }
- type distributionSource struct {
- registryRef reference.Named
- }
- // ToAnnotation returns key and value
- func (source distributionSource) ToAnnotation() (string, string) {
- domain := reference.Domain(source.registryRef)
- v := reference.Path(source.registryRef)
- return labelDistributionSource + domain, v
- }
- func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
- return reference.WithDigest(source.registryRef, dgst)
- }
- // canBeMounted returns if the content with given media type can be cross-repo
- // mounted when pushing it to a remote reference ref.
- func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
- if containerdimages.IsManifestType(mediaType) {
- return false
- }
- if containerdimages.IsIndexType(mediaType) {
- return false
- }
- reg := reference.Domain(targetRef)
- // Remove :port suffix from domain
- // containerd distribution source label doesn't store port
- if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
- reg = reg[:portIdx]
- }
- // If the source registry is the same as the one we are pushing to
- // then the cross-repo mount will work.
- return reg == reference.Domain(source.registryRef)
- }
|