image_push.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "sync"
  8. "github.com/containerd/containerd/content"
  9. cerrdefs "github.com/containerd/containerd/errdefs"
  10. "github.com/containerd/containerd/images"
  11. containerdimages "github.com/containerd/containerd/images"
  12. "github.com/containerd/containerd/log"
  13. "github.com/containerd/containerd/platforms"
  14. "github.com/containerd/containerd/remotes"
  15. "github.com/containerd/containerd/remotes/docker"
  16. "github.com/docker/distribution/reference"
  17. "github.com/docker/docker/api/types/registry"
  18. "github.com/docker/docker/errdefs"
  19. "github.com/docker/docker/pkg/streamformatter"
  20. "github.com/opencontainers/go-digest"
  21. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  22. "github.com/pkg/errors"
  23. "golang.org/x/sync/semaphore"
  24. )
  25. // PushImage initiates a push operation of the image pointed to by targetRef.
  26. // Image manifest (or index) is pushed as is, which will probably fail if you
  27. // don't have all content referenced by the index.
  28. // Cross-repo mounts will be attempted for non-existing blobs.
  29. //
  30. // It will also add distribution source labels to the pushed content
  31. // pointing to the new target repository. This will allow subsequent pushes
  32. // to perform cross-repo mounts of the shared content when pushing to a different
  33. // repository on the same registry.
  34. func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  35. if _, tagged := targetRef.(reference.Tagged); !tagged {
  36. if _, digested := targetRef.(reference.Digested); !digested {
  37. return errdefs.NotImplemented(errors.New("push all tags is not implemented"))
  38. }
  39. }
  40. leasedCtx, release, err := i.client.WithLease(ctx)
  41. if err != nil {
  42. return err
  43. }
  44. defer func() {
  45. err := release(leasedCtx)
  46. if err != nil && !cerrdefs.IsNotFound(err) {
  47. log.G(ctx).WithField("image", targetRef).WithError(err).Error("failed to delete lease created for push")
  48. }
  49. }()
  50. out := streamformatter.NewJSONProgressOutput(outStream, false)
  51. img, err := i.client.ImageService().Get(ctx, targetRef.String())
  52. if err != nil {
  53. return errdefs.NotFound(err)
  54. }
  55. target := img.Target
  56. store := i.client.ContentStore()
  57. resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig)
  58. progress := pushProgress{Tracker: tracker}
  59. jobsQueue := newJobs()
  60. finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
  61. &progress,
  62. pullProgress{ShowExists: false, Store: store},
  63. }))
  64. defer finishProgress()
  65. var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
  66. mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
  67. if err != nil {
  68. return err
  69. }
  70. for dgst := range mountableBlobs {
  71. progress.addMountable(dgst)
  72. }
  73. // Create a store which fakes the local existence of possibly mountable blobs.
  74. // Otherwise they can't be pushed at all.
  75. realStore := store
  76. wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
  77. store = wrapped
  78. pusher, err := resolver.Pusher(ctx, targetRef.String())
  79. if err != nil {
  80. return err
  81. }
  82. addChildrenToJobs := containerdimages.HandlerFunc(
  83. func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  84. children, err := containerdimages.Children(ctx, store, desc)
  85. if err != nil {
  86. return nil, err
  87. }
  88. for _, c := range children {
  89. jobsQueue.Add(c)
  90. }
  91. jobsQueue.Add(desc)
  92. return nil, nil
  93. },
  94. )
  95. handlerWrapper := func(h images.Handler) images.Handler {
  96. return containerdimages.Handlers(addChildrenToJobs, h)
  97. }
  98. err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
  99. if err != nil {
  100. if containerdimages.IsIndexType(target.MediaType) {
  101. if cerrdefs.IsNotFound(err) {
  102. err = errdefs.NotFound(fmt.Errorf(
  103. "missing content: %w\n"+
  104. "Note: You're trying to push a manifest list/index which "+
  105. "references multiple platform specific manifests, but not all of them are available locally "+
  106. "or available to the remote repository.\n"+
  107. "Make sure you have all the referenced content and try again.",
  108. err))
  109. }
  110. }
  111. } else {
  112. appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
  113. if err != nil {
  114. // This shouldn't happen at this point because the reference would have to be invalid
  115. // and if it was, then it would error out earlier.
  116. return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
  117. }
  118. if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil {
  119. // Shouldn't happen, but even if it would fail, then make it only a warning
  120. // because it doesn't affect the pushed data.
  121. log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
  122. }
  123. }
  124. return err
  125. }
  126. // findMissingMountable will walk the target descriptor recursively and return
  127. // missing contents with their distribution source which could potentially
  128. // be cross-repo mounted.
  129. func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
  130. target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
  131. ) (map[digest.Digest]distributionSource, error) {
  132. mountableBlobs := map[digest.Digest]distributionSource{}
  133. var mutex sync.Mutex
  134. sources, err := getDigestSources(ctx, store, target.Digest)
  135. if err != nil {
  136. if !errdefs.IsNotFound(err) {
  137. return nil, err
  138. }
  139. log.G(ctx).WithField("target", target).Debug("distribution source label not found")
  140. return mountableBlobs, nil
  141. }
  142. handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  143. _, err := store.Info(ctx, desc.Digest)
  144. if err != nil {
  145. if !cerrdefs.IsNotFound(err) {
  146. return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
  147. }
  148. for _, source := range sources {
  149. if canBeMounted(desc.MediaType, targetRef, source) {
  150. mutex.Lock()
  151. mountableBlobs[desc.Digest] = source
  152. mutex.Unlock()
  153. queue.Add(desc)
  154. break
  155. }
  156. }
  157. return nil, nil
  158. }
  159. return containerdimages.Children(ctx, store, desc)
  160. }
  161. err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
  162. if err != nil {
  163. return nil, err
  164. }
  165. return mountableBlobs, nil
  166. }
  167. func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
  168. info, err := store.Info(ctx, digest)
  169. if err != nil {
  170. if cerrdefs.IsNotFound(err) {
  171. return nil, errdefs.NotFound(err)
  172. }
  173. return nil, errdefs.System(err)
  174. }
  175. sources := extractDistributionSources(info.Labels)
  176. if sources == nil {
  177. return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", labelDistributionSource, digest.String()))
  178. }
  179. return sources, nil
  180. }
  181. // TODO(vvoland): Remove and use containerd const in containerd 1.7+
  182. // https://github.com/containerd/containerd/pull/8224
  183. const labelDistributionSource = "containerd.io/distribution.source."
  184. func extractDistributionSources(labels map[string]string) []distributionSource {
  185. var sources []distributionSource
  186. // Check if this blob has a distributionSource label
  187. // if yes, read it as source
  188. for k, v := range labels {
  189. if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k {
  190. for _, repo := range strings.Split(v, ",") {
  191. ref, err := reference.ParseNamed(reg + "/" + repo)
  192. if err != nil {
  193. continue
  194. }
  195. sources = append(sources, distributionSource{
  196. registryRef: ref,
  197. })
  198. }
  199. }
  200. }
  201. return sources
  202. }
  203. type distributionSource struct {
  204. registryRef reference.Named
  205. }
  206. // ToAnnotation returns key and value
  207. func (source distributionSource) ToAnnotation() (string, string) {
  208. domain := reference.Domain(source.registryRef)
  209. v := reference.Path(source.registryRef)
  210. return labelDistributionSource + domain, v
  211. }
  212. func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
  213. return reference.WithDigest(source.registryRef, dgst)
  214. }
  215. // canBeMounted returns if the content with given media type can be cross-repo
  216. // mounted when pushing it to a remote reference ref.
  217. func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
  218. if containerdimages.IsManifestType(mediaType) {
  219. return false
  220. }
  221. if containerdimages.IsIndexType(mediaType) {
  222. return false
  223. }
  224. reg := reference.Domain(targetRef)
  225. // Remove :port suffix from domain
  226. // containerd distribution source label doesn't store port
  227. if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
  228. reg = reg[:portIdx]
  229. }
  230. // If the source registry is the same as the one we are pushing to
  231. // then the cross-repo mount will work.
  232. return reg == reference.Domain(source.registryRef)
  233. }