image_pull.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/containerd/containerd"
  7. cerrdefs "github.com/containerd/containerd/errdefs"
  8. "github.com/containerd/containerd/images"
  9. "github.com/containerd/containerd/pkg/snapshotters"
  10. "github.com/containerd/containerd/platforms"
  11. "github.com/containerd/containerd/remotes/docker"
  12. "github.com/containerd/log"
  13. "github.com/distribution/reference"
  14. "github.com/docker/docker/api/types/events"
  15. "github.com/docker/docker/api/types/registry"
  16. "github.com/docker/docker/distribution"
  17. "github.com/docker/docker/errdefs"
  18. "github.com/docker/docker/internal/compatcontext"
  19. "github.com/docker/docker/pkg/progress"
  20. "github.com/docker/docker/pkg/streamformatter"
  21. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  22. "github.com/pkg/errors"
  23. )
  24. // PullImage initiates a pull operation. ref is the image to pull.
  25. func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  26. var opts []containerd.RemoteOpt
  27. if platform != nil {
  28. opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
  29. }
  30. resolver, _ := i.newResolverFromAuthConfig(ctx, authConfig)
  31. opts = append(opts, containerd.WithResolver(resolver))
  32. old, err := i.resolveDescriptor(ctx, ref.String())
  33. if err != nil && !errdefs.IsNotFound(err) {
  34. return err
  35. }
  36. p := platforms.Default()
  37. if platform != nil {
  38. p = platforms.Only(*platform)
  39. }
  40. jobs := newJobs()
  41. h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  42. if images.IsLayerType(desc.MediaType) {
  43. jobs.Add(desc)
  44. }
  45. return nil, nil
  46. })
  47. opts = append(opts, containerd.WithImageHandler(h))
  48. out := streamformatter.NewJSONProgressOutput(outStream, false)
  49. pp := pullProgress{store: i.client.ContentStore(), showExists: true}
  50. finishProgress := jobs.showProgress(ctx, out, pp)
  51. var outNewImg *containerd.Image
  52. defer func() {
  53. finishProgress()
  54. // Send final status message after the progress updater has finished.
  55. // Otherwise the layer/manifest progress messages may arrive AFTER the
  56. // status message have been sent, so they won't update the previous
  57. // progress leaving stale progress like:
  58. // 70f5ac315c5a: Downloading [> ] 0B/3.19kB
  59. // Digest: sha256:4f53e2564790c8e7856ec08e384732aa38dc43c52f02952483e3f003afbf23db
  60. // 70f5ac315c5a: Download complete
  61. // Status: Downloaded newer image for hello-world:latest
  62. // docker.io/library/hello-world:latest
  63. if outNewImg != nil {
  64. img := *outNewImg
  65. progress.Message(out, "", "Digest: "+img.Target().Digest.String())
  66. writeStatus(out, reference.FamiliarString(ref), old.Digest != img.Target().Digest)
  67. }
  68. }()
  69. var sentPullingFrom, sentSchema1Deprecation bool
  70. ah := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  71. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && !sentSchema1Deprecation {
  72. progress.Message(out, "", distribution.DeprecatedSchema1ImageMessage(ref))
  73. sentSchema1Deprecation = true
  74. }
  75. if images.IsManifestType(desc.MediaType) {
  76. if !sentPullingFrom {
  77. var tagOrDigest string
  78. if tagged, ok := ref.(reference.Tagged); ok {
  79. tagOrDigest = tagged.Tag()
  80. } else {
  81. tagOrDigest = ref.String()
  82. }
  83. progress.Message(out, tagOrDigest, "Pulling from "+reference.Path(ref))
  84. sentPullingFrom = true
  85. }
  86. available, _, _, missing, err := images.Check(ctx, i.client.ContentStore(), desc, p)
  87. if err != nil {
  88. return nil, err
  89. }
  90. // If we already have all the contents pull shouldn't show any layer
  91. // download progress, not even a "Already present" message.
  92. if available && len(missing) == 0 {
  93. pp.hideLayers = true
  94. }
  95. }
  96. return nil, nil
  97. })
  98. opts = append(opts, containerd.WithImageHandler(ah))
  99. opts = append(opts, containerd.WithPullUnpack)
  100. // TODO(thaJeztah): we may have to pass the snapshotter to use if the pull is part of a "docker run" (container create -> pull image if missing). See https://github.com/moby/moby/issues/45273
  101. opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
  102. // AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
  103. // this information is used to enable remote snapshotters like nydus and stargz to query a registry.
  104. infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
  105. opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
  106. // Allow pulling application/vnd.docker.distribution.manifest.v1+prettyjws images
  107. // by converting them to OCI manifests.
  108. opts = append(opts, containerd.WithSchema1Conversion) //nolint:staticcheck // Ignore SA1019: containerd.WithSchema1Conversion is deprecated: use Schema 2 or OCI images.
  109. img, err := i.client.Pull(ctx, ref.String(), opts...)
  110. if err != nil {
  111. if errors.Is(err, docker.ErrInvalidAuthorization) {
  112. return errdefs.NotFound(fmt.Errorf("pull access denied for %s, repository does not exist or may require 'docker login'", reference.FamiliarName(ref)))
  113. }
  114. return err
  115. }
  116. logger := log.G(ctx).WithFields(log.Fields{
  117. "digest": img.Target().Digest,
  118. "remote": ref.String(),
  119. })
  120. logger.Info("image pulled")
  121. // The pull succeeded, so try to remove any dangling image we have for this target
  122. err = i.client.ImageService().Delete(compatcontext.WithoutCancel(ctx), danglingImageName(img.Target().Digest))
  123. if err != nil && !cerrdefs.IsNotFound(err) {
  124. // Image pull succeeded, but cleaning up the dangling image failed. Ignore the
  125. // error to not mark the pull as failed.
  126. logger.WithError(err).Warn("unexpected error while removing outdated dangling image reference")
  127. }
  128. i.LogImageEvent(reference.FamiliarString(ref), reference.FamiliarName(ref), events.ActionPull)
  129. outNewImg = &img
  130. return nil
  131. }
  132. // writeStatus writes a status message to out. If newerDownloaded is true, the
  133. // status message indicates that a newer image was downloaded. Otherwise, it
  134. // indicates that the image is up to date. requestedTag is the tag the message
  135. // will refer to.
  136. func writeStatus(out progress.Output, requestedTag string, newerDownloaded bool) {
  137. if newerDownloaded {
  138. progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
  139. } else {
  140. progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
  141. }
  142. }