image_pull.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package images // import "github.com/docker/docker/daemon/images"
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/containerd/containerd/leases"
  7. "github.com/containerd/containerd/namespaces"
  8. "github.com/containerd/log"
  9. "github.com/distribution/reference"
  10. imagetypes "github.com/docker/docker/api/types/image"
  11. "github.com/docker/docker/api/types/registry"
  12. "github.com/docker/docker/distribution"
  13. progressutils "github.com/docker/docker/distribution/utils"
  14. "github.com/docker/docker/errdefs"
  15. "github.com/docker/docker/pkg/progress"
  16. "github.com/docker/docker/pkg/streamformatter"
  17. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  18. "github.com/pkg/errors"
  19. )
  20. // PullImage initiates a pull operation. image is the repository name to pull, and
  21. // tag may be either empty, or indicate a specific tag to pull.
  22. func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  23. start := time.Now()
  24. err := i.pullImageWithReference(ctx, ref, platform, metaHeaders, authConfig, outStream)
  25. imageActions.WithValues("pull").UpdateSince(start)
  26. if err != nil {
  27. return err
  28. }
  29. if platform != nil {
  30. // If --platform was specified, check that the image we pulled matches
  31. // the expected platform. This check is for situations where the image
  32. // is a single-arch image, in which case (for backward compatibility),
  33. // we allow the image to have a non-matching architecture. The code
  34. // below checks for this situation, and returns a warning to the client,
  35. // as well as logging it to the daemon logs.
  36. img, err := i.GetImage(ctx, ref.String(), imagetypes.GetImageOpts{Platform: platform})
  37. // Note that this is a special case where GetImage returns both an image
  38. // and an error: https://github.com/docker/docker/blob/v20.10.7/daemon/images/image.go#L175-L183
  39. if errdefs.IsNotFound(err) && img != nil {
  40. po := streamformatter.NewJSONProgressOutput(outStream, false)
  41. progress.Messagef(po, "", `WARNING: %s`, err.Error())
  42. log.G(ctx).WithError(err).WithField("image", reference.FamiliarName(ref)).Warn("ignoring platform mismatch on single-arch image")
  43. } else if err != nil {
  44. return err
  45. }
  46. }
  47. return nil
  48. }
  49. func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
  50. // Include a buffer so that slow client connections don't affect
  51. // transfer performance.
  52. progressChan := make(chan progress.Progress, 100)
  53. writesDone := make(chan struct{})
  54. ctx, cancelFunc := context.WithCancel(ctx)
  55. go func() {
  56. progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
  57. close(writesDone)
  58. }()
  59. ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
  60. // Take out a temporary lease for everything that gets persisted to the content store.
  61. // Before the lease is cancelled, any content we want to keep should have it's own lease applied.
  62. ctx, done, err := tempLease(ctx, i.leases)
  63. if err != nil {
  64. return err
  65. }
  66. defer done(ctx)
  67. cs := &contentStoreForPull{
  68. ContentStore: i.content,
  69. leases: i.leases,
  70. }
  71. imageStore := &imageStoreForPull{
  72. ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
  73. ingested: cs,
  74. leases: i.leases,
  75. }
  76. imagePullConfig := &distribution.ImagePullConfig{
  77. Config: distribution.Config{
  78. MetaHeaders: metaHeaders,
  79. AuthConfig: authConfig,
  80. ProgressOutput: progress.ChanOutput(progressChan),
  81. RegistryService: i.registryService,
  82. ImageEventLogger: i.LogImageEvent,
  83. MetadataStore: i.distributionMetadataStore,
  84. ImageStore: imageStore,
  85. ReferenceStore: i.referenceStore,
  86. },
  87. DownloadManager: i.downloadManager,
  88. Platform: platform,
  89. }
  90. err = distribution.Pull(ctx, ref, imagePullConfig, cs)
  91. close(progressChan)
  92. <-writesDone
  93. return err
  94. }
  95. func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {
  96. nop := func(context.Context) error { return nil }
  97. _, ok := leases.FromContext(ctx)
  98. if ok {
  99. return ctx, nop, nil
  100. }
  101. // Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc.
  102. opts := []leases.Opt{
  103. leases.WithRandomID(),
  104. leases.WithExpiration(24 * time.Hour),
  105. leases.WithLabels(map[string]string{
  106. "moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano),
  107. }),
  108. }
  109. l, err := mgr.Create(ctx, opts...)
  110. if err != nil {
  111. return ctx, nop, errors.Wrap(err, "error creating temporary lease")
  112. }
  113. ctx = leases.WithLease(ctx, l.ID)
  114. return ctx, func(ctx context.Context) error {
  115. return mgr.Delete(ctx, l)
  116. }, nil
  117. }