123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- package plugin
- import (
- "context"
- "io"
- "net/http"
- "time"
- "github.com/containerd/containerd/content"
- cerrdefs "github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/images"
- "github.com/containerd/containerd/remotes"
- "github.com/containerd/containerd/remotes/docker"
- "github.com/containerd/log"
- "github.com/distribution/reference"
- "github.com/docker/docker/api/types/registry"
- progressutils "github.com/docker/docker/distribution/utils"
- "github.com/docker/docker/pkg/chrootarchive"
- "github.com/docker/docker/pkg/ioutils"
- "github.com/docker/docker/pkg/progress"
- "github.com/docker/docker/pkg/stringid"
- "github.com/opencontainers/go-digest"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- )
- const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
- // setupProgressOutput sets up the passed in writer to stream progress.
- //
- // The passed in cancel function is used by the progress writer to signal callers that there
- // is an issue writing to the stream.
- //
- // The returned function is used to wait for the progress writer to be finished.
- // Call it to make sure the progress writer is done before returning from your function as needed.
- func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) {
- var out progress.Output
- f := func() {}
- if outStream != nil {
- ch := make(chan progress.Progress, 100)
- out = progress.ChanOutput(ch)
- ctx, retCancel := context.WithCancel(context.Background())
- go func() {
- progressutils.WriteDistributionProgress(cancel, outStream, ch)
- retCancel()
- }()
- f = func() {
- close(ch)
- <-ctx.Done()
- }
- } else {
- out = progress.DiscardOutput()
- }
- return out, f
- }
- // fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers
- // There is no need to use remotes.FetchHandler since it already gets set
- func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *registry.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) {
- // We need to make sure we have a domain on the reference
- withDomain, err := reference.ParseNormalizedNamed(ref.String())
- if err != nil {
- return errors.Wrap(err, "error parsing plugin image reference")
- }
- // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
- ctx = docker.WithScope(ctx, scope(ref, false))
- // Make sure the fetch handler knows how to set a ref key for the plugin media type.
- // Without this the ref key is "unknown" and we see a nasty warning message in the logs
- ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin")
- resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false)
- if err != nil {
- return err
- }
- resolved, desc, err := resolver.Resolve(ctx, withDomain.String())
- if err != nil {
- // This is backwards compatible with older versions of the distribution registry.
- // The containerd client will add it's own accept header as a comma separated list of supported manifests.
- // This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list,
- // so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the
- // fallback does not support plugin configs...
- log.G(ctx).WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format")
- headers := http.Header{}
- headers.Add("Accept", images.MediaTypeDockerSchema2Manifest)
- headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList)
- headers.Add("Accept", ocispec.MediaTypeImageManifest)
- headers.Add("Accept", ocispec.MediaTypeImageIndex)
- resolver, _ = pm.newResolver(ctx, nil, auth, headers, false)
- if resolver != nil {
- resolved, desc, err = resolver.Resolve(ctx, withDomain.String())
- if err != nil {
- log.G(ctx).WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format")
- }
- }
- if err != nil {
- return errors.Wrap(err, "error resolving plugin reference")
- }
- }
- fetcher, err := resolver.Fetcher(ctx, resolved)
- if err != nil {
- return errors.Wrap(err, "error creating plugin image fetcher")
- }
- fp := withFetchProgress(pm.blobStore, out, ref)
- handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...)
- return images.Dispatch(ctx, images.Handlers(handlers...), nil, desc)
- }
- // applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory.
- //
- // TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however
- // if there are multiple layers to fetch we may end up extracting layers in the wrong
- // order.
- func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc {
- return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- switch desc.MediaType {
- case
- ocispec.MediaTypeImageLayer,
- images.MediaTypeDockerSchema2Layer,
- ocispec.MediaTypeImageLayerGzip,
- images.MediaTypeDockerSchema2LayerGzip:
- default:
- return nil, nil
- }
- ra, err := cs.ReaderAt(ctx, desc)
- if err != nil {
- return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest)
- }
- id := stringid.TruncateID(desc.Digest.String())
- rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close)
- pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting")
- defer pr.Close()
- if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil {
- return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest)
- }
- progress.Update(out, id, "Complete")
- return nil, nil
- }
- }
- func childrenHandler(cs content.Store) images.HandlerFunc {
- ch := images.ChildrenHandler(cs)
- return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- switch desc.MediaType {
- case mediaTypePluginConfig:
- return nil, nil
- default:
- return ch(ctx, desc)
- }
- }
- }
- type fetchMeta struct {
- blobs []digest.Digest
- config digest.Digest
- manifest digest.Digest
- }
- func storeFetchMetadata(m *fetchMeta) images.HandlerFunc {
- return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- switch desc.MediaType {
- case
- images.MediaTypeDockerSchema2LayerForeignGzip,
- images.MediaTypeDockerSchema2Layer,
- ocispec.MediaTypeImageLayer,
- ocispec.MediaTypeImageLayerGzip:
- m.blobs = append(m.blobs, desc.Digest)
- case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
- m.manifest = desc.Digest
- case mediaTypePluginConfig:
- m.config = desc.Digest
- }
- return nil, nil
- }
- }
- func validateFetchedMetadata(md fetchMeta) error {
- if md.config == "" {
- return errors.New("fetched plugin image but plugin config is missing")
- }
- if md.manifest == "" {
- return errors.New("fetched plugin image but manifest is missing")
- }
- return nil
- }
- // withFetchProgress is a fetch handler which registers a descriptor with a progress
- func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc {
- return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- switch desc.MediaType {
- case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
- tn := reference.TagNameOnly(ref)
- var tagOrDigest string
- if tagged, ok := tn.(reference.Tagged); ok {
- tagOrDigest = tagged.Tag()
- } else {
- tagOrDigest = tn.String()
- }
- progress.Messagef(out, tagOrDigest, "Pulling from %s", reference.FamiliarName(ref))
- progress.Messagef(out, "", "Digest: %s", desc.Digest.String())
- return nil, nil
- case
- images.MediaTypeDockerSchema2LayerGzip,
- images.MediaTypeDockerSchema2Layer,
- ocispec.MediaTypeImageLayer,
- ocispec.MediaTypeImageLayerGzip:
- default:
- return nil, nil
- }
- id := stringid.TruncateID(desc.Digest.String())
- if _, err := cs.Info(ctx, desc.Digest); err == nil {
- out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true})
- return nil, nil
- }
- progress.Update(out, id, "Waiting")
- key := remotes.MakeRefKey(ctx, desc)
- go func() {
- timer := time.NewTimer(100 * time.Millisecond)
- if !timer.Stop() {
- <-timer.C
- }
- defer timer.Stop()
- var pulling bool
- var ctxErr error
- for {
- timer.Reset(100 * time.Millisecond)
- select {
- case <-ctx.Done():
- ctxErr = ctx.Err()
- // make sure we can still fetch from the content store
- // TODO: Might need to add some sort of timeout
- ctx = context.Background()
- case <-timer.C:
- }
- s, err := cs.Status(ctx, key)
- if err != nil {
- if !cerrdefs.IsNotFound(err) {
- log.G(ctx).WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull")
- progress.Update(out, id, err.Error())
- return
- }
- if _, err := cs.Info(ctx, desc.Digest); err == nil {
- progress.Update(out, id, "Download complete")
- return
- }
- if ctxErr != nil {
- progress.Update(out, id, ctxErr.Error())
- return
- }
- continue
- }
- if !pulling {
- progress.Update(out, id, "Pulling fs layer")
- pulling = true
- }
- if s.Offset == s.Total {
- out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true})
- return
- }
- out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total})
- }
- }()
- return nil, nil
- }
- }
|