image_push.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "regexp"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "github.com/containerd/containerd/content"
  11. cerrdefs "github.com/containerd/containerd/errdefs"
  12. "github.com/containerd/containerd/images"
  13. containerdimages "github.com/containerd/containerd/images"
  14. "github.com/containerd/containerd/log"
  15. "github.com/containerd/containerd/platforms"
  16. "github.com/containerd/containerd/remotes"
  17. "github.com/containerd/containerd/remotes/docker"
  18. "github.com/distribution/reference"
  19. "github.com/docker/docker/api/types/events"
  20. "github.com/docker/docker/api/types/registry"
  21. "github.com/docker/docker/errdefs"
  22. "github.com/docker/docker/pkg/progress"
  23. "github.com/docker/docker/pkg/streamformatter"
  24. "github.com/opencontainers/go-digest"
  25. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  26. "github.com/pkg/errors"
  27. "golang.org/x/sync/semaphore"
  28. )
  29. // PushImage initiates a push operation of the image pointed to by sourceRef.
  30. // If reference is untagged, all tags from the reference repository are pushed.
  31. // Image manifest (or index) is pushed as is, which will probably fail if you
  32. // don't have all content referenced by the index.
  33. // Cross-repo mounts will be attempted for non-existing blobs.
  34. //
  35. // It will also add distribution source labels to the pushed content
  36. // pointing to the new target repository. This will allow subsequent pushes
  37. // to perform cross-repo mounts of the shared content when pushing to a different
  38. // repository on the same registry.
  39. func (i *ImageService) PushImage(ctx context.Context, sourceRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) (retErr error) {
  40. out := streamformatter.NewJSONProgressOutput(outStream, false)
  41. progress.Messagef(out, "", "The push refers to repository [%s]", sourceRef.Name())
  42. if _, tagged := sourceRef.(reference.Tagged); !tagged {
  43. if _, digested := sourceRef.(reference.Digested); !digested {
  44. // Image is not tagged nor digested, that means all tags push was requested.
  45. // Find all images with the same repository.
  46. nameFilter := "^" + regexp.QuoteMeta(sourceRef.Name()) + ":" + reference.TagRegexp.String() + "$"
  47. imgs, err := i.client.ImageService().List(ctx, "name~="+strconv.Quote(nameFilter))
  48. if err != nil {
  49. return err
  50. }
  51. for _, img := range imgs {
  52. named, err := reference.ParseNamed(img.Name)
  53. if err != nil {
  54. // This shouldn't happen, but log a warning just in case.
  55. log.G(ctx).WithFields(log.Fields{
  56. "image": img.Name,
  57. "sourceRef": sourceRef,
  58. }).Warn("refusing to push an invalid tag")
  59. continue
  60. }
  61. if err := i.pushRef(ctx, named, metaHeaders, authConfig, out); err != nil {
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. }
  68. return i.pushRef(ctx, sourceRef, metaHeaders, authConfig, out)
  69. }
  70. func (i *ImageService) pushRef(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, out progress.Output) (retErr error) {
  71. leasedCtx, release, err := i.client.WithLease(ctx)
  72. if err != nil {
  73. return err
  74. }
  75. defer func() {
  76. if err := release(leasedCtx); err != nil {
  77. log.G(ctx).WithField("image", targetRef).WithError(err).Warn("failed to release lease created for push")
  78. }
  79. }()
  80. img, err := i.client.ImageService().Get(ctx, targetRef.String())
  81. if err != nil {
  82. return errdefs.NotFound(err)
  83. }
  84. target := img.Target
  85. store := i.client.ContentStore()
  86. resolver, tracker := i.newResolverFromAuthConfig(ctx, authConfig)
  87. pp := pushProgress{Tracker: tracker}
  88. jobsQueue := newJobs()
  89. finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
  90. &pp,
  91. pullProgress{showExists: false, store: store},
  92. }))
  93. defer func() {
  94. finishProgress()
  95. if retErr == nil {
  96. if tagged, ok := targetRef.(reference.Tagged); ok {
  97. progress.Messagef(out, "", "%s: digest: %s size: %d", tagged.Tag(), target.Digest, img.Target.Size)
  98. }
  99. }
  100. }()
  101. var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
  102. mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
  103. if err != nil {
  104. return err
  105. }
  106. for dgst := range mountableBlobs {
  107. pp.addMountable(dgst)
  108. }
  109. // Create a store which fakes the local existence of possibly mountable blobs.
  110. // Otherwise they can't be pushed at all.
  111. realStore := store
  112. wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
  113. store = wrapped
  114. pusher, err := resolver.Pusher(ctx, targetRef.String())
  115. if err != nil {
  116. return err
  117. }
  118. addChildrenToJobs := containerdimages.HandlerFunc(
  119. func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  120. children, err := containerdimages.Children(ctx, store, desc)
  121. if err != nil {
  122. return nil, err
  123. }
  124. for _, c := range children {
  125. jobsQueue.Add(c)
  126. }
  127. jobsQueue.Add(desc)
  128. return nil, nil
  129. },
  130. )
  131. handlerWrapper := func(h images.Handler) images.Handler {
  132. return containerdimages.Handlers(addChildrenToJobs, h)
  133. }
  134. err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
  135. if err != nil {
  136. if containerdimages.IsIndexType(target.MediaType) {
  137. if cerrdefs.IsNotFound(err) {
  138. err = errdefs.NotFound(fmt.Errorf(
  139. "missing content: %w\n"+
  140. "Note: You're trying to push a manifest list/index which "+
  141. "references multiple platform specific manifests, but not all of them are available locally "+
  142. "or available to the remote repository.\n"+
  143. "Make sure you have all the referenced content and try again.",
  144. err))
  145. }
  146. }
  147. } else {
  148. appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
  149. if err != nil {
  150. // This shouldn't happen at this point because the reference would have to be invalid
  151. // and if it was, then it would error out earlier.
  152. return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
  153. }
  154. if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil {
  155. // Shouldn't happen, but even if it would fail, then make it only a warning
  156. // because it doesn't affect the pushed data.
  157. log.G(ctx).WithError(err).Warn("failed to append distribution source labels to pushed content")
  158. }
  159. }
  160. if err == nil {
  161. i.LogImageEvent(reference.FamiliarString(targetRef), reference.FamiliarName(targetRef), events.ActionPush)
  162. }
  163. return err
  164. }
  165. // findMissingMountable will walk the target descriptor recursively and return
  166. // missing contents with their distribution source which could potentially
  167. // be cross-repo mounted.
  168. func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
  169. target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
  170. ) (map[digest.Digest]distributionSource, error) {
  171. mountableBlobs := map[digest.Digest]distributionSource{}
  172. var mutex sync.Mutex
  173. sources, err := getDigestSources(ctx, store, target.Digest)
  174. if err != nil {
  175. if !errdefs.IsNotFound(err) {
  176. return nil, err
  177. }
  178. log.G(ctx).WithField("target", target).Debug("distribution source label not found")
  179. return mountableBlobs, nil
  180. }
  181. handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  182. _, err := store.Info(ctx, desc.Digest)
  183. if err != nil {
  184. if !cerrdefs.IsNotFound(err) {
  185. return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
  186. }
  187. for _, source := range sources {
  188. if canBeMounted(desc.MediaType, targetRef, source) {
  189. mutex.Lock()
  190. mountableBlobs[desc.Digest] = source
  191. mutex.Unlock()
  192. queue.Add(desc)
  193. break
  194. }
  195. }
  196. return nil, nil
  197. }
  198. return containerdimages.Children(ctx, store, desc)
  199. }
  200. err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
  201. if err != nil {
  202. return nil, err
  203. }
  204. return mountableBlobs, nil
  205. }
  206. func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
  207. info, err := store.Info(ctx, digest)
  208. if err != nil {
  209. if cerrdefs.IsNotFound(err) {
  210. return nil, errdefs.NotFound(err)
  211. }
  212. return nil, errdefs.System(err)
  213. }
  214. sources := extractDistributionSources(info.Labels)
  215. if sources == nil {
  216. return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", labelDistributionSource, digest.String()))
  217. }
  218. return sources, nil
  219. }
  220. // TODO(vvoland): Remove and use containerd const in containerd 1.7+
  221. // https://github.com/containerd/containerd/pull/8224
  222. const labelDistributionSource = "containerd.io/distribution.source."
  223. func extractDistributionSources(labels map[string]string) []distributionSource {
  224. var sources []distributionSource
  225. // Check if this blob has a distributionSource label
  226. // if yes, read it as source
  227. for k, v := range labels {
  228. if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k {
  229. for _, repo := range strings.Split(v, ",") {
  230. ref, err := reference.ParseNamed(reg + "/" + repo)
  231. if err != nil {
  232. continue
  233. }
  234. sources = append(sources, distributionSource{
  235. registryRef: ref,
  236. })
  237. }
  238. }
  239. }
  240. return sources
  241. }
  242. type distributionSource struct {
  243. registryRef reference.Named
  244. }
  245. // ToAnnotation returns key and value
  246. func (source distributionSource) ToAnnotation() (string, string) {
  247. domain := reference.Domain(source.registryRef)
  248. v := reference.Path(source.registryRef)
  249. return labelDistributionSource + domain, v
  250. }
  251. func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
  252. return reference.WithDigest(source.registryRef, dgst)
  253. }
  254. // canBeMounted returns if the content with given media type can be cross-repo
  255. // mounted when pushing it to a remote reference ref.
  256. func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
  257. if containerdimages.IsManifestType(mediaType) {
  258. return false
  259. }
  260. if containerdimages.IsIndexType(mediaType) {
  261. return false
  262. }
  263. reg := reference.Domain(targetRef)
  264. // Remove :port suffix from domain
  265. // containerd distribution source label doesn't store port
  266. if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
  267. reg = reg[:portIdx]
  268. }
  269. // If the source registry is the same as the one we are pushing to
  270. // then the cross-repo mount will work.
  271. return reg == reference.Domain(source.registryRef)
  272. }