package plugin import ( "context" "io" "net/http" "time" "github.com/containerd/containerd/content" c8derrdefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" progressutils "github.com/docker/docker/distribution/utils" "github.com/docker/docker/pkg/chrootarchive" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/stringid" "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json" // setupProgressOutput sets up the passed in writer to stream progress. // // The passed in cancel function is used by the progress writer to signal callers that there // is an issue writing to the stream. // // The returned function is used to wait for the progress writer to be finished. // Call it to make sure the progress writer is done before returning from your function as needed. func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) { var out progress.Output f := func() {} if outStream != nil { ch := make(chan progress.Progress, 100) out = progress.ChanOutput(ch) ctx, retCancel := context.WithCancel(context.Background()) go func() { progressutils.WriteDistributionProgress(cancel, outStream, ch) retCancel() }() f = func() { close(ch) <-ctx.Done() } } else { out = progress.DiscardOutput() } return out, f } // fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers // There is no need to use remotes.FetchHandler since it already gets set func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *types.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) { // We need to make sure we have a domain on the reference withDomain, err := reference.ParseNormalizedNamed(ref.String()) if err != nil { return errors.Wrap(err, "error parsing plugin image reference") } // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo. ctx = docker.WithScope(ctx, scope(ref, false)) // Make sure the fetch handler knows how to set a ref key for the plugin media type. // Without this the ref key is "unknown" and we see a nasty warning message in the logs ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin") resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false) if err != nil { return err } resolved, desc, err := resolver.Resolve(ctx, withDomain.String()) if err != nil { // This is backwards compatible with older versions of the distribution registry. // The containerd client will add it's own accept header as a comma separated list of supported manifests. // This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list, // so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the // fallback does not support plugin configs... logrus.WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format") headers := http.Header{} headers.Add("Accept", images.MediaTypeDockerSchema2Manifest) headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList) headers.Add("Accept", specs.MediaTypeImageManifest) headers.Add("Accept", specs.MediaTypeImageIndex) resolver, _ = pm.newResolver(ctx, nil, auth, headers, false) if resolver != nil { resolved, desc, err = resolver.Resolve(ctx, withDomain.String()) if err != nil { logrus.WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format") } } if err != nil { return errors.Wrap(err, "error resolving plugin reference") } } fetcher, err := resolver.Fetcher(ctx, resolved) if err != nil { return errors.Wrap(err, "error creating plugin image fetcher") } fp := withFetchProgress(pm.blobStore, out, ref) handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...) return images.Dispatch(ctx, images.Handlers(handlers...), nil, desc) } // applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory. // // TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however // if there are multiple layers to fetch we may end up extracting layers in the wrong // order. func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc { return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { switch desc.MediaType { case specs.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer, specs.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip: default: return nil, nil } ra, err := cs.ReaderAt(ctx, desc) if err != nil { return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest) } id := stringid.TruncateID(desc.Digest.String()) rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close) pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting") defer pr.Close() if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil { return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest) } progress.Update(out, id, "Complete") return nil, nil } } func childrenHandler(cs content.Store) images.HandlerFunc { ch := images.ChildrenHandler(cs) return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { switch desc.MediaType { case mediaTypePluginConfig: return nil, nil default: return ch(ctx, desc) } } } type fetchMeta struct { blobs []digest.Digest config digest.Digest manifest digest.Digest } func storeFetchMetadata(m *fetchMeta) images.HandlerFunc { return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { switch desc.MediaType { case images.MediaTypeDockerSchema2LayerForeignGzip, images.MediaTypeDockerSchema2Layer, specs.MediaTypeImageLayer, specs.MediaTypeImageLayerGzip: m.blobs = append(m.blobs, desc.Digest) case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: m.manifest = desc.Digest case mediaTypePluginConfig: m.config = desc.Digest } return nil, nil } } func validateFetchedMetadata(md fetchMeta) error { if md.config == "" { return errors.New("fetched plugin image but plugin config is missing") } if md.manifest == "" { return errors.New("fetched plugin image but manifest is missing") } return nil } // withFetchProgress is a fetch handler which registers a descriptor with a progress func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc { return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { switch desc.MediaType { case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: tn := reference.TagNameOnly(ref) var tagOrDigest string if tagged, ok := tn.(reference.Tagged); ok { tagOrDigest = tagged.Tag() } else { tagOrDigest = tn.String() } progress.Messagef(out, tagOrDigest, "Pulling from %s", reference.FamiliarName(ref)) progress.Messagef(out, "", "Digest: %s", desc.Digest.String()) return nil, nil case images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2Layer, specs.MediaTypeImageLayer, specs.MediaTypeImageLayerGzip: default: return nil, nil } id := stringid.TruncateID(desc.Digest.String()) if _, err := cs.Info(ctx, desc.Digest); err == nil { out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true}) return nil, nil } progress.Update(out, id, "Waiting") key := remotes.MakeRefKey(ctx, desc) go func() { timer := time.NewTimer(100 * time.Millisecond) if !timer.Stop() { <-timer.C } defer timer.Stop() var pulling bool var ctxErr error for { timer.Reset(100 * time.Millisecond) select { case <-ctx.Done(): ctxErr = ctx.Err() // make sure we can still fetch from the content store // TODO: Might need to add some sort of timeout ctx = context.Background() case <-timer.C: } s, err := cs.Status(ctx, key) if err != nil { if !c8derrdefs.IsNotFound(err) { logrus.WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull") progress.Update(out, id, err.Error()) return } if _, err := cs.Info(ctx, desc.Digest); err == nil { progress.Update(out, id, "Download complete") return } if ctxErr != nil { progress.Update(out, id, ctxErr.Error()) return } continue } if !pulling { progress.Update(out, id, "Pulling fs layer") pulling = true } if s.Offset == s.Total { out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true}) return } out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total}) } }() return nil, nil } }