image_pull.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "github.com/containerd/containerd"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/images"
  10. "github.com/containerd/containerd/pkg/snapshotters"
  11. "github.com/containerd/containerd/platforms"
  12. "github.com/containerd/containerd/remotes/docker"
  13. "github.com/containerd/log"
  14. "github.com/distribution/reference"
  15. "github.com/docker/docker/api/types/events"
  16. "github.com/docker/docker/api/types/registry"
  17. "github.com/docker/docker/distribution"
  18. "github.com/docker/docker/errdefs"
  19. "github.com/docker/docker/internal/compatcontext"
  20. "github.com/docker/docker/pkg/progress"
  21. "github.com/docker/docker/pkg/streamformatter"
  22. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  23. "github.com/pkg/errors"
  24. )
  25. // PullImage initiates a pull operation. ref is the image to pull.
  26. func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  27. var opts []containerd.RemoteOpt
  28. if platform != nil {
  29. opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
  30. }
  31. resolver, _ := i.newResolverFromAuthConfig(ctx, authConfig)
  32. opts = append(opts, containerd.WithResolver(resolver))
  33. old, err := i.resolveDescriptor(ctx, ref.String())
  34. if err != nil && !errdefs.IsNotFound(err) {
  35. return err
  36. }
  37. p := platforms.Default()
  38. if platform != nil {
  39. p = platforms.Only(*platform)
  40. }
  41. jobs := newJobs()
  42. h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  43. if images.IsLayerType(desc.MediaType) {
  44. jobs.Add(desc)
  45. }
  46. return nil, nil
  47. })
  48. opts = append(opts, containerd.WithImageHandler(h))
  49. out := streamformatter.NewJSONProgressOutput(outStream, false)
  50. pp := pullProgress{store: i.client.ContentStore(), showExists: true}
  51. finishProgress := jobs.showProgress(ctx, out, pp)
  52. var outNewImg *containerd.Image
  53. defer func() {
  54. finishProgress()
  55. // Send final status message after the progress updater has finished.
  56. // Otherwise the layer/manifest progress messages may arrive AFTER the
  57. // status message have been sent, so they won't update the previous
  58. // progress leaving stale progress like:
  59. // 70f5ac315c5a: Downloading [> ] 0B/3.19kB
  60. // Digest: sha256:4f53e2564790c8e7856ec08e384732aa38dc43c52f02952483e3f003afbf23db
  61. // 70f5ac315c5a: Download complete
  62. // Status: Downloaded newer image for hello-world:latest
  63. // docker.io/library/hello-world:latest
  64. if outNewImg != nil {
  65. img := *outNewImg
  66. progress.Message(out, "", "Digest: "+img.Target().Digest.String())
  67. writeStatus(out, reference.FamiliarString(ref), old.Digest != img.Target().Digest)
  68. }
  69. }()
  70. var sentPullingFrom, sentSchema1Deprecation bool
  71. ah := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  72. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && !sentSchema1Deprecation {
  73. progress.Message(out, "", distribution.DeprecatedSchema1ImageMessage(ref))
  74. sentSchema1Deprecation = true
  75. }
  76. if images.IsManifestType(desc.MediaType) {
  77. if !sentPullingFrom {
  78. var tagOrDigest string
  79. if tagged, ok := ref.(reference.Tagged); ok {
  80. tagOrDigest = tagged.Tag()
  81. } else {
  82. tagOrDigest = ref.String()
  83. }
  84. progress.Message(out, tagOrDigest, "Pulling from "+reference.Path(ref))
  85. sentPullingFrom = true
  86. }
  87. available, _, _, missing, err := images.Check(ctx, i.client.ContentStore(), desc, p)
  88. if err != nil {
  89. return nil, err
  90. }
  91. // If we already have all the contents pull shouldn't show any layer
  92. // download progress, not even a "Already present" message.
  93. if available && len(missing) == 0 {
  94. pp.hideLayers = true
  95. }
  96. }
  97. return nil, nil
  98. })
  99. opts = append(opts, containerd.WithImageHandler(ah))
  100. opts = append(opts, containerd.WithPullUnpack)
  101. // TODO(thaJeztah): we may have to pass the snapshotter to use if the pull is part of a "docker run" (container create -> pull image if missing). See https://github.com/moby/moby/issues/45273
  102. opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
  103. // AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
  104. // this information is used to enable remote snapshotters like nydus and stargz to query a registry.
  105. infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
  106. opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
  107. // Allow pulling application/vnd.docker.distribution.manifest.v1+prettyjws images
  108. // by converting them to OCI manifests.
  109. opts = append(opts, containerd.WithSchema1Conversion) //nolint:staticcheck // Ignore SA1019: containerd.WithSchema1Conversion is deprecated: use Schema 2 or OCI images.
  110. img, err := i.client.Pull(ctx, ref.String(), opts...)
  111. if err != nil {
  112. if errors.Is(err, docker.ErrInvalidAuthorization) {
  113. // Match error returned by containerd.
  114. // https://github.com/containerd/containerd/blob/v1.7.8/remotes/docker/authorizer.go#L189-L191
  115. if strings.Contains(err.Error(), "no basic auth credentials") {
  116. return err
  117. }
  118. return errdefs.NotFound(fmt.Errorf("pull access denied for %s, repository does not exist or may require 'docker login'", reference.FamiliarName(ref)))
  119. }
  120. return err
  121. }
  122. logger := log.G(ctx).WithFields(log.Fields{
  123. "digest": img.Target().Digest,
  124. "remote": ref.String(),
  125. })
  126. logger.Info("image pulled")
  127. // The pull succeeded, so try to remove any dangling image we have for this target
  128. err = i.client.ImageService().Delete(compatcontext.WithoutCancel(ctx), danglingImageName(img.Target().Digest))
  129. if err != nil && !cerrdefs.IsNotFound(err) {
  130. // Image pull succeeded, but cleaning up the dangling image failed. Ignore the
  131. // error to not mark the pull as failed.
  132. logger.WithError(err).Warn("unexpected error while removing outdated dangling image reference")
  133. }
  134. i.LogImageEvent(reference.FamiliarString(ref), reference.FamiliarName(ref), events.ActionPull)
  135. outNewImg = &img
  136. return nil
  137. }
  138. // writeStatus writes a status message to out. If newerDownloaded is true, the
  139. // status message indicates that a newer image was downloaded. Otherwise, it
  140. // indicates that the image is up to date. requestedTag is the tag the message
  141. // will refer to.
  142. func writeStatus(out progress.Output, requestedTag string, newerDownloaded bool) {
  143. if newerDownloaded {
  144. progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
  145. } else {
  146. progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
  147. }
  148. }