image_pull.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "github.com/containerd/containerd"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/images"
  10. "github.com/containerd/containerd/pkg/snapshotters"
  11. "github.com/containerd/containerd/platforms"
  12. "github.com/containerd/containerd/remotes/docker"
  13. "github.com/containerd/log"
  14. "github.com/distribution/reference"
  15. "github.com/docker/docker/api/types/events"
  16. registrytypes "github.com/docker/docker/api/types/registry"
  17. "github.com/docker/docker/distribution"
  18. "github.com/docker/docker/errdefs"
  19. "github.com/docker/docker/internal/compatcontext"
  20. "github.com/docker/docker/pkg/progress"
  21. "github.com/docker/docker/pkg/streamformatter"
  22. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  23. "github.com/pkg/errors"
  24. )
  25. // PullImage initiates a pull operation. baseRef is the image to pull.
  26. // If reference is not tagged, all tags are pulled.
  27. func (i *ImageService) PullImage(ctx context.Context, baseRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, outStream io.Writer) error {
  28. out := streamformatter.NewJSONProgressOutput(outStream, false)
  29. if !reference.IsNameOnly(baseRef) {
  30. return i.pullTag(ctx, baseRef, platform, metaHeaders, authConfig, out)
  31. }
  32. tags, err := distribution.Tags(ctx, baseRef, &distribution.Config{
  33. RegistryService: i.registryService,
  34. MetaHeaders: metaHeaders,
  35. AuthConfig: authConfig,
  36. })
  37. if err != nil {
  38. return err
  39. }
  40. for _, tag := range tags {
  41. ref, err := reference.WithTag(baseRef, tag)
  42. if err != nil {
  43. log.G(ctx).WithFields(log.Fields{
  44. "tag": tag,
  45. "baseRef": baseRef,
  46. }).Warn("invalid tag, won't pull")
  47. continue
  48. }
  49. if err := i.pullTag(ctx, ref, platform, metaHeaders, authConfig, out); err != nil {
  50. return fmt.Errorf("error pulling %s: %w", ref, err)
  51. }
  52. }
  53. return nil
  54. }
  55. func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, out progress.Output) error {
  56. var opts []containerd.RemoteOpt
  57. if platform != nil {
  58. opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
  59. }
  60. resolver, _ := i.newResolverFromAuthConfig(ctx, authConfig, ref)
  61. opts = append(opts, containerd.WithResolver(resolver))
  62. old, err := i.resolveDescriptor(ctx, ref.String())
  63. if err != nil && !errdefs.IsNotFound(err) {
  64. return err
  65. }
  66. p := platforms.Default()
  67. if platform != nil {
  68. p = platforms.Only(*platform)
  69. }
  70. jobs := newJobs()
  71. h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  72. if images.IsLayerType(desc.MediaType) {
  73. jobs.Add(desc)
  74. }
  75. return nil, nil
  76. })
  77. opts = append(opts, containerd.WithImageHandler(h))
  78. pp := pullProgress{store: i.client.ContentStore(), showExists: true}
  79. finishProgress := jobs.showProgress(ctx, out, pp)
  80. var outNewImg *containerd.Image
  81. defer func() {
  82. finishProgress()
  83. // Send final status message after the progress updater has finished.
  84. // Otherwise the layer/manifest progress messages may arrive AFTER the
  85. // status message have been sent, so they won't update the previous
  86. // progress leaving stale progress like:
  87. // 70f5ac315c5a: Downloading [> ] 0B/3.19kB
  88. // Digest: sha256:4f53e2564790c8e7856ec08e384732aa38dc43c52f02952483e3f003afbf23db
  89. // 70f5ac315c5a: Download complete
  90. // Status: Downloaded newer image for hello-world:latest
  91. // docker.io/library/hello-world:latest
  92. if outNewImg != nil {
  93. img := *outNewImg
  94. progress.Message(out, "", "Digest: "+img.Target().Digest.String())
  95. writeStatus(out, reference.FamiliarString(ref), old.Digest != img.Target().Digest)
  96. }
  97. }()
  98. var sentPullingFrom, sentSchema1Deprecation bool
  99. ah := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  100. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && !sentSchema1Deprecation {
  101. progress.Message(out, "", distribution.DeprecatedSchema1ImageMessage(ref))
  102. sentSchema1Deprecation = true
  103. }
  104. if images.IsManifestType(desc.MediaType) {
  105. if !sentPullingFrom {
  106. var tagOrDigest string
  107. if tagged, ok := ref.(reference.Tagged); ok {
  108. tagOrDigest = tagged.Tag()
  109. } else {
  110. tagOrDigest = ref.String()
  111. }
  112. progress.Message(out, tagOrDigest, "Pulling from "+reference.Path(ref))
  113. sentPullingFrom = true
  114. }
  115. available, _, _, missing, err := images.Check(ctx, i.client.ContentStore(), desc, p)
  116. if err != nil {
  117. return nil, err
  118. }
  119. // If we already have all the contents pull shouldn't show any layer
  120. // download progress, not even a "Already present" message.
  121. if available && len(missing) == 0 {
  122. pp.hideLayers = true
  123. }
  124. }
  125. return nil, nil
  126. })
  127. opts = append(opts, containerd.WithImageHandler(ah))
  128. opts = append(opts, containerd.WithPullUnpack)
  129. // TODO(thaJeztah): we may have to pass the snapshotter to use if the pull is part of a "docker run" (container create -> pull image if missing). See https://github.com/moby/moby/issues/45273
  130. opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
  131. // AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
  132. // this information is used to enable remote snapshotters like nydus and stargz to query a registry.
  133. infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
  134. opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
  135. // Allow pulling application/vnd.docker.distribution.manifest.v1+prettyjws images
  136. // by converting them to OCI manifests.
  137. opts = append(opts, containerd.WithSchema1Conversion) //nolint:staticcheck // Ignore SA1019: containerd.WithSchema1Conversion is deprecated: use Schema 2 or OCI images.
  138. img, err := i.client.Pull(ctx, ref.String(), opts...)
  139. if err != nil {
  140. if errors.Is(err, docker.ErrInvalidAuthorization) {
  141. // Match error returned by containerd.
  142. // https://github.com/containerd/containerd/blob/v1.7.8/remotes/docker/authorizer.go#L189-L191
  143. if strings.Contains(err.Error(), "no basic auth credentials") {
  144. return err
  145. }
  146. return errdefs.NotFound(fmt.Errorf("pull access denied for %s, repository does not exist or may require 'docker login'", reference.FamiliarName(ref)))
  147. }
  148. return err
  149. }
  150. logger := log.G(ctx).WithFields(log.Fields{
  151. "digest": img.Target().Digest,
  152. "remote": ref.String(),
  153. })
  154. logger.Info("image pulled")
  155. // The pull succeeded, so try to remove any dangling image we have for this target
  156. err = i.client.ImageService().Delete(compatcontext.WithoutCancel(ctx), danglingImageName(img.Target().Digest))
  157. if err != nil && !cerrdefs.IsNotFound(err) {
  158. // Image pull succeeded, but cleaning up the dangling image failed. Ignore the
  159. // error to not mark the pull as failed.
  160. logger.WithError(err).Warn("unexpected error while removing outdated dangling image reference")
  161. }
  162. i.LogImageEvent(reference.FamiliarString(ref), reference.FamiliarName(ref), events.ActionPull)
  163. outNewImg = &img
  164. return nil
  165. }
  166. // writeStatus writes a status message to out. If newerDownloaded is true, the
  167. // status message indicates that a newer image was downloaded. Otherwise, it
  168. // indicates that the image is up to date. requestedTag is the tag the message
  169. // will refer to.
  170. func writeStatus(out progress.Output, requestedTag string, newerDownloaded bool) {
  171. if newerDownloaded {
  172. progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
  173. } else {
  174. progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
  175. }
  176. }