image_pull.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strings"
  8. "github.com/containerd/containerd"
  9. cerrdefs "github.com/containerd/containerd/errdefs"
  10. "github.com/containerd/containerd/images"
  11. "github.com/containerd/containerd/pkg/snapshotters"
  12. "github.com/containerd/containerd/platforms"
  13. "github.com/containerd/containerd/remotes/docker"
  14. "github.com/containerd/log"
  15. "github.com/distribution/reference"
  16. "github.com/docker/docker/api/types/events"
  17. registrytypes "github.com/docker/docker/api/types/registry"
  18. "github.com/docker/docker/distribution"
  19. "github.com/docker/docker/errdefs"
  20. "github.com/docker/docker/internal/compatcontext"
  21. "github.com/docker/docker/pkg/progress"
  22. "github.com/docker/docker/pkg/streamformatter"
  23. "github.com/docker/docker/pkg/stringid"
  24. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  25. "github.com/pkg/errors"
  26. )
  27. // PullImage initiates a pull operation. baseRef is the image to pull.
  28. // If reference is not tagged, all tags are pulled.
  29. func (i *ImageService) PullImage(ctx context.Context, baseRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, outStream io.Writer) error {
  30. out := streamformatter.NewJSONProgressOutput(outStream, false)
  31. if !reference.IsNameOnly(baseRef) {
  32. return i.pullTag(ctx, baseRef, platform, metaHeaders, authConfig, out)
  33. }
  34. tags, err := distribution.Tags(ctx, baseRef, &distribution.Config{
  35. RegistryService: i.registryService,
  36. MetaHeaders: metaHeaders,
  37. AuthConfig: authConfig,
  38. })
  39. if err != nil {
  40. return err
  41. }
  42. for _, tag := range tags {
  43. ref, err := reference.WithTag(baseRef, tag)
  44. if err != nil {
  45. log.G(ctx).WithFields(log.Fields{
  46. "tag": tag,
  47. "baseRef": baseRef,
  48. }).Warn("invalid tag, won't pull")
  49. continue
  50. }
  51. if err := i.pullTag(ctx, ref, platform, metaHeaders, authConfig, out); err != nil {
  52. return fmt.Errorf("error pulling %s: %w", ref, err)
  53. }
  54. }
  55. return nil
  56. }
  57. func (i *ImageService) pullTag(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, out progress.Output) error {
  58. var opts []containerd.RemoteOpt
  59. if platform != nil {
  60. opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
  61. }
  62. resolver, _ := i.newResolverFromAuthConfig(ctx, authConfig, ref)
  63. opts = append(opts, containerd.WithResolver(resolver))
  64. old, err := i.resolveDescriptor(ctx, ref.String())
  65. if err != nil && !errdefs.IsNotFound(err) {
  66. return err
  67. }
  68. p := platforms.Default()
  69. if platform != nil {
  70. p = platforms.Only(*platform)
  71. }
  72. jobs := newJobs()
  73. h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  74. if images.IsLayerType(desc.MediaType) {
  75. jobs.Add(desc)
  76. }
  77. return nil, nil
  78. })
  79. opts = append(opts, containerd.WithImageHandler(h))
  80. pp := pullProgress{store: i.content, showExists: true}
  81. finishProgress := jobs.showProgress(ctx, out, pp)
  82. var outNewImg *containerd.Image
  83. defer func() {
  84. finishProgress()
  85. // Send final status message after the progress updater has finished.
  86. // Otherwise the layer/manifest progress messages may arrive AFTER the
  87. // status message have been sent, so they won't update the previous
  88. // progress leaving stale progress like:
  89. // 70f5ac315c5a: Downloading [> ] 0B/3.19kB
  90. // Digest: sha256:4f53e2564790c8e7856ec08e384732aa38dc43c52f02952483e3f003afbf23db
  91. // 70f5ac315c5a: Download complete
  92. // Status: Downloaded newer image for hello-world:latest
  93. // docker.io/library/hello-world:latest
  94. if outNewImg != nil {
  95. img := *outNewImg
  96. progress.Message(out, "", "Digest: "+img.Target().Digest.String())
  97. writeStatus(out, reference.FamiliarString(ref), old.Digest != img.Target().Digest)
  98. }
  99. }()
  100. var sentPullingFrom, sentSchema1Deprecation bool
  101. ah := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  102. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && !sentSchema1Deprecation {
  103. err := distribution.DeprecatedSchema1ImageError(ref)
  104. if os.Getenv("DOCKER_ENABLE_DEPRECATED_PULL_SCHEMA_1_IMAGE") == "" {
  105. log.G(context.TODO()).Warn(err.Error())
  106. return nil, err
  107. }
  108. progress.Message(out, "", err.Error())
  109. sentSchema1Deprecation = true
  110. }
  111. if images.IsLayerType(desc.MediaType) {
  112. id := stringid.TruncateID(desc.Digest.String())
  113. progress.Update(out, id, "Pulling fs layer")
  114. }
  115. if images.IsManifestType(desc.MediaType) {
  116. if !sentPullingFrom {
  117. var tagOrDigest string
  118. if tagged, ok := ref.(reference.Tagged); ok {
  119. tagOrDigest = tagged.Tag()
  120. } else {
  121. tagOrDigest = ref.String()
  122. }
  123. progress.Message(out, tagOrDigest, "Pulling from "+reference.Path(ref))
  124. sentPullingFrom = true
  125. }
  126. available, _, _, missing, err := images.Check(ctx, i.content, desc, p)
  127. if err != nil {
  128. return nil, err
  129. }
  130. // If we already have all the contents pull shouldn't show any layer
  131. // download progress, not even a "Already present" message.
  132. if available && len(missing) == 0 {
  133. pp.hideLayers = true
  134. }
  135. }
  136. return nil, nil
  137. })
  138. opts = append(opts, containerd.WithImageHandler(ah))
  139. opts = append(opts, containerd.WithPullUnpack)
  140. // TODO(thaJeztah): we may have to pass the snapshotter to use if the pull is part of a "docker run" (container create -> pull image if missing). See https://github.com/moby/moby/issues/45273
  141. opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
  142. // AppendInfoHandlerWrapper will annotate the image with basic information like manifest and layer digests as labels;
  143. // this information is used to enable remote snapshotters like nydus and stargz to query a registry.
  144. infoHandler := snapshotters.AppendInfoHandlerWrapper(ref.String())
  145. opts = append(opts, containerd.WithImageHandlerWrapper(infoHandler))
  146. // Allow pulling application/vnd.docker.distribution.manifest.v1+prettyjws images
  147. // by converting them to OCI manifests.
  148. opts = append(opts, containerd.WithSchema1Conversion) //nolint:staticcheck // Ignore SA1019: containerd.WithSchema1Conversion is deprecated: use Schema 2 or OCI images.
  149. img, err := i.client.Pull(ctx, ref.String(), opts...)
  150. if err != nil {
  151. if errors.Is(err, docker.ErrInvalidAuthorization) {
  152. // Match error returned by containerd.
  153. // https://github.com/containerd/containerd/blob/v1.7.8/remotes/docker/authorizer.go#L189-L191
  154. if strings.Contains(err.Error(), "no basic auth credentials") {
  155. return err
  156. }
  157. return errdefs.NotFound(fmt.Errorf("pull access denied for %s, repository does not exist or may require 'docker login'", reference.FamiliarName(ref)))
  158. }
  159. return err
  160. }
  161. logger := log.G(ctx).WithFields(log.Fields{
  162. "digest": img.Target().Digest,
  163. "remote": ref.String(),
  164. })
  165. logger.Info("image pulled")
  166. // The pull succeeded, so try to remove any dangling image we have for this target
  167. err = i.images.Delete(compatcontext.WithoutCancel(ctx), danglingImageName(img.Target().Digest))
  168. if err != nil && !cerrdefs.IsNotFound(err) {
  169. // Image pull succeeded, but cleaning up the dangling image failed. Ignore the
  170. // error to not mark the pull as failed.
  171. logger.WithError(err).Warn("unexpected error while removing outdated dangling image reference")
  172. }
  173. i.LogImageEvent(reference.FamiliarString(ref), reference.FamiliarName(ref), events.ActionPull)
  174. outNewImg = &img
  175. return nil
  176. }
  177. // writeStatus writes a status message to out. If newerDownloaded is true, the
  178. // status message indicates that a newer image was downloaded. Otherwise, it
  179. // indicates that the image is up to date. requestedTag is the tag the message
  180. // will refer to.
  181. func writeStatus(out progress.Output, requestedTag string, newerDownloaded bool) {
  182. if newerDownloaded {
  183. progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
  184. } else {
  185. progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
  186. }
  187. }