fetch_linux.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package plugin
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "time"
  7. "github.com/containerd/containerd/content"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/images"
  10. "github.com/containerd/containerd/log"
  11. "github.com/containerd/containerd/remotes"
  12. "github.com/containerd/containerd/remotes/docker"
  13. "github.com/distribution/reference"
  14. "github.com/docker/docker/api/types/registry"
  15. progressutils "github.com/docker/docker/distribution/utils"
  16. "github.com/docker/docker/pkg/chrootarchive"
  17. "github.com/docker/docker/pkg/ioutils"
  18. "github.com/docker/docker/pkg/progress"
  19. "github.com/docker/docker/pkg/stringid"
  20. "github.com/opencontainers/go-digest"
  21. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  22. "github.com/pkg/errors"
  23. )
  24. const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
  25. // setupProgressOutput sets up the passed in writer to stream progress.
  26. //
  27. // The passed in cancel function is used by the progress writer to signal callers that there
  28. // is an issue writing to the stream.
  29. //
  30. // The returned function is used to wait for the progress writer to be finished.
  31. // Call it to make sure the progress writer is done before returning from your function as needed.
  32. func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) {
  33. var out progress.Output
  34. f := func() {}
  35. if outStream != nil {
  36. ch := make(chan progress.Progress, 100)
  37. out = progress.ChanOutput(ch)
  38. ctx, retCancel := context.WithCancel(context.Background())
  39. go func() {
  40. progressutils.WriteDistributionProgress(cancel, outStream, ch)
  41. retCancel()
  42. }()
  43. f = func() {
  44. close(ch)
  45. <-ctx.Done()
  46. }
  47. } else {
  48. out = progress.DiscardOutput()
  49. }
  50. return out, f
  51. }
  52. // fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers
  53. // There is no need to use remotes.FetchHandler since it already gets set
  54. func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *registry.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) {
  55. // We need to make sure we have a domain on the reference
  56. withDomain, err := reference.ParseNormalizedNamed(ref.String())
  57. if err != nil {
  58. return errors.Wrap(err, "error parsing plugin image reference")
  59. }
  60. // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
  61. ctx = docker.WithScope(ctx, scope(ref, false))
  62. // Make sure the fetch handler knows how to set a ref key for the plugin media type.
  63. // Without this the ref key is "unknown" and we see a nasty warning message in the logs
  64. ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin")
  65. resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false)
  66. if err != nil {
  67. return err
  68. }
  69. resolved, desc, err := resolver.Resolve(ctx, withDomain.String())
  70. if err != nil {
  71. // This is backwards compatible with older versions of the distribution registry.
  72. // The containerd client will add it's own accept header as a comma separated list of supported manifests.
  73. // This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list,
  74. // so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the
  75. // fallback does not support plugin configs...
  76. log.G(ctx).WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format")
  77. headers := http.Header{}
  78. headers.Add("Accept", images.MediaTypeDockerSchema2Manifest)
  79. headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList)
  80. headers.Add("Accept", ocispec.MediaTypeImageManifest)
  81. headers.Add("Accept", ocispec.MediaTypeImageIndex)
  82. resolver, _ = pm.newResolver(ctx, nil, auth, headers, false)
  83. if resolver != nil {
  84. resolved, desc, err = resolver.Resolve(ctx, withDomain.String())
  85. if err != nil {
  86. log.G(ctx).WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format")
  87. }
  88. }
  89. if err != nil {
  90. return errors.Wrap(err, "error resolving plugin reference")
  91. }
  92. }
  93. fetcher, err := resolver.Fetcher(ctx, resolved)
  94. if err != nil {
  95. return errors.Wrap(err, "error creating plugin image fetcher")
  96. }
  97. fp := withFetchProgress(pm.blobStore, out, ref)
  98. handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...)
  99. return images.Dispatch(ctx, images.Handlers(handlers...), nil, desc)
  100. }
  101. // applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory.
  102. //
  103. // TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however
  104. // if there are multiple layers to fetch we may end up extracting layers in the wrong
  105. // order.
  106. func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc {
  107. return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  108. switch desc.MediaType {
  109. case
  110. ocispec.MediaTypeImageLayer,
  111. images.MediaTypeDockerSchema2Layer,
  112. ocispec.MediaTypeImageLayerGzip,
  113. images.MediaTypeDockerSchema2LayerGzip:
  114. default:
  115. return nil, nil
  116. }
  117. ra, err := cs.ReaderAt(ctx, desc)
  118. if err != nil {
  119. return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest)
  120. }
  121. id := stringid.TruncateID(desc.Digest.String())
  122. rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close)
  123. pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting")
  124. defer pr.Close()
  125. if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil {
  126. return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest)
  127. }
  128. progress.Update(out, id, "Complete")
  129. return nil, nil
  130. }
  131. }
  132. func childrenHandler(cs content.Store) images.HandlerFunc {
  133. ch := images.ChildrenHandler(cs)
  134. return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  135. switch desc.MediaType {
  136. case mediaTypePluginConfig:
  137. return nil, nil
  138. default:
  139. return ch(ctx, desc)
  140. }
  141. }
  142. }
  143. type fetchMeta struct {
  144. blobs []digest.Digest
  145. config digest.Digest
  146. manifest digest.Digest
  147. }
  148. func storeFetchMetadata(m *fetchMeta) images.HandlerFunc {
  149. return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  150. switch desc.MediaType {
  151. case
  152. images.MediaTypeDockerSchema2LayerForeignGzip,
  153. images.MediaTypeDockerSchema2Layer,
  154. ocispec.MediaTypeImageLayer,
  155. ocispec.MediaTypeImageLayerGzip:
  156. m.blobs = append(m.blobs, desc.Digest)
  157. case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
  158. m.manifest = desc.Digest
  159. case mediaTypePluginConfig:
  160. m.config = desc.Digest
  161. }
  162. return nil, nil
  163. }
  164. }
  165. func validateFetchedMetadata(md fetchMeta) error {
  166. if md.config == "" {
  167. return errors.New("fetched plugin image but plugin config is missing")
  168. }
  169. if md.manifest == "" {
  170. return errors.New("fetched plugin image but manifest is missing")
  171. }
  172. return nil
  173. }
  174. // withFetchProgress is a fetch handler which registers a descriptor with a progress
  175. func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc {
  176. return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  177. switch desc.MediaType {
  178. case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
  179. tn := reference.TagNameOnly(ref)
  180. tagged := tn.(reference.Tagged)
  181. progress.Messagef(out, tagged.Tag(), "Pulling from %s", reference.FamiliarName(ref))
  182. progress.Messagef(out, "", "Digest: %s", desc.Digest.String())
  183. return nil, nil
  184. case
  185. images.MediaTypeDockerSchema2LayerGzip,
  186. images.MediaTypeDockerSchema2Layer,
  187. ocispec.MediaTypeImageLayer,
  188. ocispec.MediaTypeImageLayerGzip:
  189. default:
  190. return nil, nil
  191. }
  192. id := stringid.TruncateID(desc.Digest.String())
  193. if _, err := cs.Info(ctx, desc.Digest); err == nil {
  194. out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true})
  195. return nil, nil
  196. }
  197. progress.Update(out, id, "Waiting")
  198. key := remotes.MakeRefKey(ctx, desc)
  199. go func() {
  200. timer := time.NewTimer(100 * time.Millisecond)
  201. if !timer.Stop() {
  202. <-timer.C
  203. }
  204. defer timer.Stop()
  205. var pulling bool
  206. var ctxErr error
  207. for {
  208. timer.Reset(100 * time.Millisecond)
  209. select {
  210. case <-ctx.Done():
  211. ctxErr = ctx.Err()
  212. // make sure we can still fetch from the content store
  213. // TODO: Might need to add some sort of timeout
  214. ctx = context.Background()
  215. case <-timer.C:
  216. }
  217. s, err := cs.Status(ctx, key)
  218. if err != nil {
  219. if !cerrdefs.IsNotFound(err) {
  220. log.G(ctx).WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull")
  221. progress.Update(out, id, err.Error())
  222. return
  223. }
  224. if _, err := cs.Info(ctx, desc.Digest); err == nil {
  225. progress.Update(out, id, "Download complete")
  226. return
  227. }
  228. if ctxErr != nil {
  229. progress.Update(out, id, ctxErr.Error())
  230. return
  231. }
  232. continue
  233. }
  234. if !pulling {
  235. progress.Update(out, id, "Pulling fs layer")
  236. pulling = true
  237. }
  238. if s.Offset == s.Total {
  239. out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true})
  240. return
  241. }
  242. out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total})
  243. }
  244. }()
  245. return nil, nil
  246. }
  247. }