image_push.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "sync"
  8. "github.com/containerd/containerd/content"
  9. cerrdefs "github.com/containerd/containerd/errdefs"
  10. "github.com/containerd/containerd/images"
  11. containerdimages "github.com/containerd/containerd/images"
  12. containerdlabels "github.com/containerd/containerd/labels"
  13. "github.com/containerd/containerd/platforms"
  14. "github.com/containerd/containerd/remotes"
  15. "github.com/containerd/containerd/remotes/docker"
  16. "github.com/containerd/log"
  17. "github.com/distribution/reference"
  18. "github.com/docker/docker/api/types/events"
  19. "github.com/docker/docker/api/types/registry"
  20. "github.com/docker/docker/errdefs"
  21. "github.com/docker/docker/internal/compatcontext"
  22. "github.com/docker/docker/pkg/progress"
  23. "github.com/docker/docker/pkg/streamformatter"
  24. "github.com/opencontainers/go-digest"
  25. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  26. "github.com/pkg/errors"
  27. "golang.org/x/sync/semaphore"
  28. )
  29. // PushImage initiates a push operation of the image pointed to by sourceRef.
  30. // If reference is untagged, all tags from the reference repository are pushed.
  31. // Image manifest (or index) is pushed as is, which will probably fail if you
  32. // don't have all content referenced by the index.
  33. // Cross-repo mounts will be attempted for non-existing blobs.
  34. //
  35. // It will also add distribution source labels to the pushed content
  36. // pointing to the new target repository. This will allow subsequent pushes
  37. // to perform cross-repo mounts of the shared content when pushing to a different
  38. // repository on the same registry.
  39. func (i *ImageService) PushImage(ctx context.Context, sourceRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) (retErr error) {
  40. out := streamformatter.NewJSONProgressOutput(outStream, false)
  41. progress.Messagef(out, "", "The push refers to repository [%s]", sourceRef.Name())
  42. if _, tagged := sourceRef.(reference.Tagged); !tagged {
  43. if _, digested := sourceRef.(reference.Digested); !digested {
  44. // Image is not tagged nor digested, that means all tags push was requested.
  45. // Find all images with the same repository.
  46. imgs, err := i.getAllImagesWithRepository(ctx, sourceRef)
  47. if err != nil {
  48. return err
  49. }
  50. if len(imgs) == 0 {
  51. return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(sourceRef))
  52. }
  53. for _, img := range imgs {
  54. named, err := reference.ParseNamed(img.Name)
  55. if err != nil {
  56. // This shouldn't happen, but log a warning just in case.
  57. log.G(ctx).WithFields(log.Fields{
  58. "image": img.Name,
  59. "sourceRef": sourceRef,
  60. }).Warn("refusing to push an invalid tag")
  61. continue
  62. }
  63. if err := i.pushRef(ctx, named, metaHeaders, authConfig, out); err != nil {
  64. return err
  65. }
  66. }
  67. return nil
  68. }
  69. }
  70. return i.pushRef(ctx, sourceRef, metaHeaders, authConfig, out)
  71. }
  72. func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, out progress.Output) (retErr error) {
  73. leasedCtx, release, err := i.client.WithLease(ctx)
  74. if err != nil {
  75. return err
  76. }
  77. defer func() {
  78. if err := release(compatcontext.WithoutCancel(leasedCtx)); err != nil {
  79. log.G(ctx).WithField("image", targetRef).WithError(err).Warn("failed to release lease created for push")
  80. }
  81. }()
  82. img, err := i.client.ImageService().Get(ctx, targetRef.String())
  83. if err != nil {
  84. if cerrdefs.IsNotFound(err) {
  85. return errdefs.NotFound(fmt.Errorf("tag does not exist: %s", reference.FamiliarString(targetRef)))
  86. }
  87. return errdefs.NotFound(err)
  88. }
  89. target := img.Target
  90. store := i.client.ContentStore()
  91. resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig, targetRef)
  92. pp := pushProgress{Tracker: tracker}
  93. jobsQueue := newJobs()
  94. finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
  95. &pp,
  96. pullProgress{showExists: false, store: store},
  97. }))
  98. defer func() {
  99. finishProgress()
  100. if retErr == nil {
  101. if tagged, ok := targetRef.(reference.Tagged); ok {
  102. progress.Messagef(out, "", "%s: digest: %s size: %d", tagged.Tag(), target.Digest, img.Target.Size)
  103. }
  104. }
  105. }()
  106. var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
  107. mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
  108. if err != nil {
  109. return err
  110. }
  111. // Create a store which fakes the local existence of possibly mountable blobs.
  112. // Otherwise they can't be pushed at all.
  113. realStore := store
  114. wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
  115. store = wrapped
  116. pusher, err := resolver.Pusher(ctx, targetRef.String())
  117. if err != nil {
  118. return err
  119. }
  120. addLayerJobs := containerdimages.HandlerFunc(
  121. func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  122. switch {
  123. case containerdimages.IsIndexType(desc.MediaType),
  124. containerdimages.IsManifestType(desc.MediaType),
  125. containerdimages.IsConfigType(desc.MediaType):
  126. default:
  127. jobsQueue.Add(desc)
  128. }
  129. return nil, nil
  130. },
  131. )
  132. handlerWrapper := func(h images.Handler) images.Handler {
  133. return containerdimages.Handlers(addLayerJobs, h)
  134. }
  135. err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
  136. if err != nil {
  137. if containerdimages.IsIndexType(target.MediaType) && cerrdefs.IsNotFound(err) {
  138. return errdefs.NotFound(fmt.Errorf(
  139. "missing content: %w\n"+
  140. "Note: You're trying to push a manifest list/index which "+
  141. "references multiple platform specific manifests, but not all of them are available locally "+
  142. "or available to the remote repository.\n"+
  143. "Make sure you have all the referenced content and try again.",
  144. err))
  145. }
  146. return err
  147. }
  148. appendDistributionSourceLabel(ctx, realStore, targetRef, target)
  149. i.LogImageEvent(reference.FamiliarString(targetRef), reference.FamiliarName(targetRef), events.ActionPush)
  150. return nil
  151. }
  152. func appendDistributionSourceLabel(ctx context.Context, realStore content.Store, targetRef reference.Named, target ocispec.Descriptor) {
  153. appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
  154. if err != nil {
  155. // This shouldn't happen at this point because the reference would have to be invalid
  156. // and if it was, then it would error out earlier.
  157. log.G(ctx).WithError(err).Warn("failed to create an handler that appends distribution source label to pushed content")
  158. return
  159. }
  160. handler := presentChildrenHandler(realStore, appendSource)
  161. if err := containerdimages.Dispatch(ctx, handler, nil, target); err != nil {
  162. // Shouldn't happen, but even if it would fail, then make it only a warning
  163. // because it doesn't affect the pushed data.
  164. log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
  165. }
  166. }
  167. // findMissingMountable will walk the target descriptor recursively and return
  168. // missing contents with their distribution source which could potentially
  169. // be cross-repo mounted.
  170. func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
  171. target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
  172. ) (map[digest.Digest]distributionSource, error) {
  173. mountableBlobs := map[digest.Digest]distributionSource{}
  174. var mutex sync.Mutex
  175. sources, err := getDigestSources(ctx, store, target.Digest)
  176. if err != nil {
  177. if !errdefs.IsNotFound(err) {
  178. return nil, err
  179. }
  180. log.G(ctx).WithField("target", target).Debug("distribution source label not found")
  181. return mountableBlobs, nil
  182. }
  183. handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  184. _, err := store.Info(ctx, desc.Digest)
  185. if err != nil {
  186. if !cerrdefs.IsNotFound(err) {
  187. return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
  188. }
  189. for _, source := range sources {
  190. if canBeMounted(desc.MediaType, targetRef, source) {
  191. mutex.Lock()
  192. mountableBlobs[desc.Digest] = source
  193. mutex.Unlock()
  194. queue.Add(desc)
  195. break
  196. }
  197. }
  198. return nil, nil
  199. }
  200. return containerdimages.Children(ctx, store, desc)
  201. }
  202. err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
  203. if err != nil {
  204. return nil, err
  205. }
  206. return mountableBlobs, nil
  207. }
  208. func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
  209. info, err := store.Info(ctx, digest)
  210. if err != nil {
  211. if cerrdefs.IsNotFound(err) {
  212. return nil, errdefs.NotFound(err)
  213. }
  214. return nil, errdefs.System(err)
  215. }
  216. sources := extractDistributionSources(info.Labels)
  217. if sources == nil {
  218. return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", containerdlabels.LabelDistributionSource, digest.String()))
  219. }
  220. return sources, nil
  221. }
  222. func extractDistributionSources(labels map[string]string) []distributionSource {
  223. var sources []distributionSource
  224. // Check if this blob has a distributionSource label
  225. // if yes, read it as source
  226. for k, v := range labels {
  227. if reg := strings.TrimPrefix(k, containerdlabels.LabelDistributionSource); reg != k {
  228. for _, repo := range strings.Split(v, ",") {
  229. ref, err := reference.ParseNamed(reg + "/" + repo)
  230. if err != nil {
  231. continue
  232. }
  233. sources = append(sources, distributionSource{
  234. registryRef: ref,
  235. })
  236. }
  237. }
  238. }
  239. return sources
  240. }
  241. type distributionSource struct {
  242. registryRef reference.Named
  243. }
  244. // ToAnnotation returns key and value
  245. func (source distributionSource) ToAnnotation() (string, string) {
  246. domain := reference.Domain(source.registryRef)
  247. v := reference.Path(source.registryRef)
  248. return containerdlabels.LabelDistributionSource + domain, v
  249. }
  250. func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
  251. return reference.WithDigest(source.registryRef, dgst)
  252. }
  253. // canBeMounted returns if the content with given media type can be cross-repo
  254. // mounted when pushing it to a remote reference ref.
  255. func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
  256. if containerdimages.IsManifestType(mediaType) {
  257. return false
  258. }
  259. if containerdimages.IsIndexType(mediaType) {
  260. return false
  261. }
  262. reg := reference.Domain(targetRef)
  263. // Remove :port suffix from domain
  264. // containerd distribution source label doesn't store port
  265. if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
  266. reg = reg[:portIdx]
  267. }
  268. // If the source registry is the same as the one we are pushing to
  269. // then the cross-repo mount will work.
  270. return reg == reference.Domain(source.registryRef)
  271. }