image_pull.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package containerd
  2. import (
  3. "context"
  4. "io"
  5. "github.com/containerd/containerd"
  6. cerrdefs "github.com/containerd/containerd/errdefs"
  7. "github.com/containerd/containerd/images"
  8. "github.com/containerd/containerd/log"
  9. "github.com/containerd/containerd/pkg/snapshotters"
  10. "github.com/containerd/containerd/platforms"
  11. "github.com/distribution/reference"
  12. "github.com/docker/docker/api/types/events"
  13. "github.com/docker/docker/api/types/registry"
  14. "github.com/docker/docker/distribution"
  15. "github.com/docker/docker/errdefs"
  16. "github.com/docker/docker/pkg/progress"
  17. "github.com/docker/docker/pkg/streamformatter"
  18. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  19. )
  20. // PullImage initiates a pull operation. ref is the image to pull.
  21. func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  22. var opts []containerd.RemoteOpt
  23. if platform != nil {
  24. opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
  25. }
  26. resolver, _ := i.newResolverFromAuthConfig(ctx, authConfig)
  27. opts = append(opts, containerd.WithResolver(resolver))
  28. old, err := i.resolveDescriptor(ctx, ref.String())
  29. if err != nil && !errdefs.IsNotFound(err) {
  30. return err
  31. }
  32. p := platforms.Default()
  33. if platform != nil {
  34. p = platforms.Only(*platform)
  35. }
  36. jobs := newJobs()
  37. h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  38. if images.IsLayerType(desc.MediaType) {
  39. jobs.Add(desc)
  40. }
  41. return nil, nil
  42. })
  43. opts = append(opts, containerd.WithImageHandler(h))
  44. out := streamformatter.NewJSONProgressOutput(outStream, false)
  45. pp := pullProgress{store: i.client.ContentStore(), showExists: true}
  46. finishProgress := jobs.showProgress(ctx, out, pp)
  47. var outNewImg *containerd.Image
  48. defer func() {
  49. finishProgress()
  50. // Send final status message after the progress updater has finished.
  51. // Otherwise the layer/manifest progress messages may arrive AFTER the
  52. // status message have been sent, so they won't update the previous
  53. // progress leaving stale progress like:
  54. // 70f5ac315c5a: Downloading [> ] 0B/3.19kB
  55. // Digest: sha256:4f53e2564790c8e7856ec08e384732aa38dc43c52f02952483e3f003afbf23db
  56. // 70f5ac315c5a: Download complete
  57. // Status: Downloaded newer image for hello-world:latest
  58. // docker.io/library/hello-world:latest
  59. if outNewImg != nil {
  60. img := *outNewImg
  61. progress.Message(out, "", "Digest: "+img.Target().Digest.String())
  62. writeStatus(out, reference.FamiliarString(ref), old.Digest != img.Target().Digest)
  63. }
  64. }()
  65. var sentPullingFrom, sentSchema1Deprecation bool
  66. ah := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  67. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && !sentSchema1Deprecation {
  68. progress.Message(out, "", distribution.DeprecatedSchema1ImageMessage(ref))
  69. sentSchema1Deprecation = true
  70. }
  71. if images.IsManifestType(desc.MediaType) {
  72. if !sentPullingFrom {
  73. var tagOrDigest string
  74. if tagged, ok := ref.(reference.Tagged); ok {
  75. tagOrDigest = tagged.Tag()
  76. } else {
  77. tagOrDigest = ref.String()
  78. }
  79. progress.Message(out, tagOrDigest, "Pulling from "+reference.Path(ref))
  80. sentPullingFrom = true
  81. }
  82. available, _, _, missing, err := images.Check(ctx, i.client.ContentStore(), desc, p)
  83. if err != nil {
  84. return nil, err
  85. }
  86. // If we already have all the contents pull shouldn't show any layer
  87. // download progress, not even a "Already present" message.
  88. if available && len(missing) == 0 {
  89. pp.hideLayers = true
  90. }
  91. }
  92. return nil, nil
  93. })
  94. opts = append(opts, containerd.WithImageHandler(ah))
  95. opts = append(opts, containerd.WithPullUnpack)
  96. // TODO(thaJeztah): we may have to pass the snapshotter to use if the pull is part of a "docker run" (container create -> pull image if missing). See https://github.com/moby/moby/issues/45273
  97. opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
  98. // AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
  99. // this information is used to enable remote snapshotters like nydus and stargz to query a registry.
  100. infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
  101. opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
  102. // Allow pulling application/vnd.docker.distribution.manifest.v1+prettyjws images
  103. // by converting them to OCI manifests.
  104. opts = append(opts, containerd.WithSchema1Conversion)
  105. img, err := i.client.Pull(ctx, ref.String(), opts...)
  106. if err != nil {
  107. return err
  108. }
  109. logger := log.G(ctx).WithFields(log.Fields{
  110. "digest": img.Target().Digest,
  111. "remote": ref.String(),
  112. })
  113. logger.Info("image pulled")
  114. // The pull succeeded, so try to remove any dangling image we have for this target
  115. err = i.client.ImageService().Delete(context.Background(), danglingImageName(img.Target().Digest))
  116. if err != nil && !cerrdefs.IsNotFound(err) {
  117. // Image pull succeeded, but cleaning up the dangling image failed. Ignore the
  118. // error to not mark the pull as failed.
  119. logger.WithError(err).Warn("unexpected error while removing outdated dangling image reference")
  120. }
  121. i.LogImageEvent(reference.FamiliarString(ref), reference.FamiliarName(ref), events.ActionPull)
  122. outNewImg = &img
  123. return nil
  124. }
  125. // writeStatus writes a status message to out. If newerDownloaded is true, the
  126. // status message indicates that a newer image was downloaded. Otherwise, it
  127. // indicates that the image is up to date. requestedTag is the tag the message
  128. // will refer to.
  129. func writeStatus(out progress.Output, requestedTag string, newerDownloaded bool) {
  130. if newerDownloaded {
  131. progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
  132. } else {
  133. progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
  134. }
  135. }