2020-02-11 00:31:04 +00:00
package plugin
import (
"context"
"io"
"net/http"
"time"
"github.com/containerd/containerd/content"
2023-04-08 17:30:33 +00:00
cerrdefs "github.com/containerd/containerd/errdefs"
2020-02-11 00:31:04 +00:00
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
2023-09-13 15:41:45 +00:00
"github.com/containerd/log"
2023-08-30 16:31:46 +00:00
"github.com/distribution/reference"
2022-03-03 09:31:58 +00:00
"github.com/docker/docker/api/types/registry"
2020-02-11 00:31:04 +00:00
progressutils "github.com/docker/docker/distribution/utils"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/stringid"
2022-03-04 13:49:42 +00:00
"github.com/opencontainers/go-digest"
2023-05-08 09:57:52 +00:00
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2020-02-11 00:31:04 +00:00
"github.com/pkg/errors"
)
const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json"
// setupProgressOutput sets up the passed in writer to stream progress.
//
// The passed in cancel function is used by the progress writer to signal callers that there
// is an issue writing to the stream.
//
// The returned function is used to wait for the progress writer to be finished.
// Call it to make sure the progress writer is done before returning from your function as needed.
func setupProgressOutput ( outStream io . Writer , cancel func ( ) ) ( progress . Output , func ( ) ) {
var out progress . Output
f := func ( ) { }
if outStream != nil {
ch := make ( chan progress . Progress , 100 )
out = progress . ChanOutput ( ch )
ctx , retCancel := context . WithCancel ( context . Background ( ) )
go func ( ) {
progressutils . WriteDistributionProgress ( cancel , outStream , ch )
retCancel ( )
} ( )
f = func ( ) {
close ( ch )
<- ctx . Done ( )
}
} else {
out = progress . DiscardOutput ( )
}
return out , f
}
// fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers
// There is no need to use remotes.FetchHandler since it already gets set
2022-03-03 09:31:58 +00:00
func ( pm * Manager ) fetch ( ctx context . Context , ref reference . Named , auth * registry . AuthConfig , out progress . Output , metaHeader http . Header , handlers ... images . Handler ) ( err error ) {
2020-02-11 00:31:04 +00:00
// We need to make sure we have a domain on the reference
withDomain , err := reference . ParseNormalizedNamed ( ref . String ( ) )
if err != nil {
return errors . Wrap ( err , "error parsing plugin image reference" )
}
// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
ctx = docker . WithScope ( ctx , scope ( ref , false ) )
// Make sure the fetch handler knows how to set a ref key for the plugin media type.
// Without this the ref key is "unknown" and we see a nasty warning message in the logs
ctx = remotes . WithMediaTypeKeyPrefix ( ctx , mediaTypePluginConfig , "docker-plugin" )
resolver , err := pm . newResolver ( ctx , nil , auth , metaHeader , false )
if err != nil {
return err
}
resolved , desc , err := resolver . Resolve ( ctx , withDomain . String ( ) )
if err != nil {
// This is backwards compatible with older versions of the distribution registry.
// The containerd client will add it's own accept header as a comma separated list of supported manifests.
// This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list,
// so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the
// fallback does not support plugin configs...
2023-06-23 00:33:17 +00:00
log . G ( ctx ) . WithError ( err ) . WithField ( "ref" , withDomain ) . Debug ( "Error while resolving reference, falling back to backwards compatible accept header format" )
2020-02-11 00:31:04 +00:00
headers := http . Header { }
headers . Add ( "Accept" , images . MediaTypeDockerSchema2Manifest )
headers . Add ( "Accept" , images . MediaTypeDockerSchema2ManifestList )
2023-05-08 09:57:52 +00:00
headers . Add ( "Accept" , ocispec . MediaTypeImageManifest )
headers . Add ( "Accept" , ocispec . MediaTypeImageIndex )
2020-02-11 00:31:04 +00:00
resolver , _ = pm . newResolver ( ctx , nil , auth , headers , false )
if resolver != nil {
resolved , desc , err = resolver . Resolve ( ctx , withDomain . String ( ) )
if err != nil {
2023-06-23 00:33:17 +00:00
log . G ( ctx ) . WithError ( err ) . WithField ( "ref" , withDomain ) . Debug ( "Failed to resolve reference after falling back to backwards compatible accept header format" )
2020-02-11 00:31:04 +00:00
}
}
if err != nil {
return errors . Wrap ( err , "error resolving plugin reference" )
}
}
fetcher , err := resolver . Fetcher ( ctx , resolved )
if err != nil {
return errors . Wrap ( err , "error creating plugin image fetcher" )
}
fp := withFetchProgress ( pm . blobStore , out , ref )
handlers = append ( [ ] images . Handler { fp , remotes . FetchHandler ( pm . blobStore , fetcher ) } , handlers ... )
2021-05-31 09:32:33 +00:00
return images . Dispatch ( ctx , images . Handlers ( handlers ... ) , nil , desc )
2020-02-11 00:31:04 +00:00
}
// applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory.
//
// TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however
// if there are multiple layers to fetch we may end up extracting layers in the wrong
// order.
func applyLayer ( cs content . Store , dir string , out progress . Output ) images . HandlerFunc {
2023-05-08 09:57:52 +00:00
return func ( ctx context . Context , desc ocispec . Descriptor ) ( [ ] ocispec . Descriptor , error ) {
2020-02-11 00:31:04 +00:00
switch desc . MediaType {
case
2023-05-08 09:57:52 +00:00
ocispec . MediaTypeImageLayer ,
2020-02-11 00:31:04 +00:00
images . MediaTypeDockerSchema2Layer ,
2023-05-08 09:57:52 +00:00
ocispec . MediaTypeImageLayerGzip ,
2020-02-11 00:31:04 +00:00
images . MediaTypeDockerSchema2LayerGzip :
default :
return nil , nil
}
ra , err := cs . ReaderAt ( ctx , desc )
if err != nil {
return nil , errors . Wrapf ( err , "error getting content from content store for digest %s" , desc . Digest )
}
id := stringid . TruncateID ( desc . Digest . String ( ) )
rc := ioutils . NewReadCloserWrapper ( content . NewReader ( ra ) , ra . Close )
pr := progress . NewProgressReader ( rc , out , desc . Size , id , "Extracting" )
defer pr . Close ( )
if _ , err := chrootarchive . ApplyLayer ( dir , pr ) ; err != nil {
return nil , errors . Wrapf ( err , "error applying layer for digest %s" , desc . Digest )
}
progress . Update ( out , id , "Complete" )
return nil , nil
}
}
func childrenHandler ( cs content . Store ) images . HandlerFunc {
ch := images . ChildrenHandler ( cs )
2023-05-08 09:57:52 +00:00
return func ( ctx context . Context , desc ocispec . Descriptor ) ( [ ] ocispec . Descriptor , error ) {
2020-02-11 00:31:04 +00:00
switch desc . MediaType {
case mediaTypePluginConfig :
return nil , nil
default :
return ch ( ctx , desc )
}
}
}
type fetchMeta struct {
blobs [ ] digest . Digest
config digest . Digest
manifest digest . Digest
}
func storeFetchMetadata ( m * fetchMeta ) images . HandlerFunc {
2023-05-08 09:57:52 +00:00
return func ( ctx context . Context , desc ocispec . Descriptor ) ( [ ] ocispec . Descriptor , error ) {
2020-02-11 00:31:04 +00:00
switch desc . MediaType {
case
images . MediaTypeDockerSchema2LayerForeignGzip ,
images . MediaTypeDockerSchema2Layer ,
2023-05-08 09:57:52 +00:00
ocispec . MediaTypeImageLayer ,
ocispec . MediaTypeImageLayerGzip :
2020-02-11 00:31:04 +00:00
m . blobs = append ( m . blobs , desc . Digest )
2023-05-08 09:57:52 +00:00
case ocispec . MediaTypeImageManifest , images . MediaTypeDockerSchema2Manifest :
2020-02-11 00:31:04 +00:00
m . manifest = desc . Digest
case mediaTypePluginConfig :
m . config = desc . Digest
}
return nil , nil
}
}
func validateFetchedMetadata ( md fetchMeta ) error {
if md . config == "" {
return errors . New ( "fetched plugin image but plugin config is missing" )
}
if md . manifest == "" {
return errors . New ( "fetched plugin image but manifest is missing" )
}
return nil
}
// withFetchProgress is a fetch handler which registers a descriptor with a progress
func withFetchProgress ( cs content . Store , out progress . Output , ref reference . Named ) images . HandlerFunc {
2023-05-08 09:57:52 +00:00
return func ( ctx context . Context , desc ocispec . Descriptor ) ( [ ] ocispec . Descriptor , error ) {
2020-02-11 00:31:04 +00:00
switch desc . MediaType {
2023-05-08 09:57:52 +00:00
case ocispec . MediaTypeImageManifest , images . MediaTypeDockerSchema2Manifest :
2020-02-11 00:31:04 +00:00
tn := reference . TagNameOnly ( ref )
2024-02-01 23:25:57 +00:00
var tagOrDigest string
if tagged , ok := tn . ( reference . Tagged ) ; ok {
tagOrDigest = tagged . Tag ( )
} else {
tagOrDigest = tn . String ( )
}
progress . Messagef ( out , tagOrDigest , "Pulling from %s" , reference . FamiliarName ( ref ) )
2020-02-11 00:31:04 +00:00
progress . Messagef ( out , "" , "Digest: %s" , desc . Digest . String ( ) )
return nil , nil
case
images . MediaTypeDockerSchema2LayerGzip ,
images . MediaTypeDockerSchema2Layer ,
2023-05-08 09:57:52 +00:00
ocispec . MediaTypeImageLayer ,
ocispec . MediaTypeImageLayerGzip :
2020-02-11 00:31:04 +00:00
default :
return nil , nil
}
id := stringid . TruncateID ( desc . Digest . String ( ) )
if _ , err := cs . Info ( ctx , desc . Digest ) ; err == nil {
out . WriteProgress ( progress . Progress { ID : id , Action : "Already exists" , LastUpdate : true } )
return nil , nil
}
progress . Update ( out , id , "Waiting" )
key := remotes . MakeRefKey ( ctx , desc )
go func ( ) {
timer := time . NewTimer ( 100 * time . Millisecond )
if ! timer . Stop ( ) {
<- timer . C
}
defer timer . Stop ( )
var pulling bool
var ctxErr error
for {
timer . Reset ( 100 * time . Millisecond )
select {
case <- ctx . Done ( ) :
ctxErr = ctx . Err ( )
// make sure we can still fetch from the content store
// TODO: Might need to add some sort of timeout
ctx = context . Background ( )
case <- timer . C :
}
s , err := cs . Status ( ctx , key )
if err != nil {
2023-04-08 17:30:33 +00:00
if ! cerrdefs . IsNotFound ( err ) {
2023-06-23 00:33:17 +00:00
log . G ( ctx ) . WithError ( err ) . WithField ( "layerDigest" , desc . Digest . String ( ) ) . Error ( "Error looking up status of plugin layer pull" )
2020-02-11 00:31:04 +00:00
progress . Update ( out , id , err . Error ( ) )
return
}
if _ , err := cs . Info ( ctx , desc . Digest ) ; err == nil {
progress . Update ( out , id , "Download complete" )
return
}
if ctxErr != nil {
progress . Update ( out , id , ctxErr . Error ( ) )
return
}
continue
}
if ! pulling {
progress . Update ( out , id , "Pulling fs layer" )
pulling = true
}
if s . Offset == s . Total {
out . WriteProgress ( progress . Progress { ID : id , Action : "Download complete" , Current : s . Offset , LastUpdate : true } )
return
}
out . WriteProgress ( progress . Progress { ID : id , Action : "Downloading" , Current : s . Offset , Total : s . Total } )
}
} ( )
return nil , nil
}
}