123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897 |
- package containerimage
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "path"
- "strings"
- "sync"
- "time"
- "github.com/containerd/containerd/content"
- cerrdefs "github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/gc"
- "github.com/containerd/containerd/images"
- "github.com/containerd/containerd/leases"
- "github.com/containerd/containerd/platforms"
- cdreference "github.com/containerd/containerd/reference"
- ctdreference "github.com/containerd/containerd/reference"
- "github.com/containerd/containerd/remotes"
- "github.com/containerd/containerd/remotes/docker"
- "github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019: "github.com/containerd/containerd/remotes/docker/schema1" is deprecated: use images formatted in Docker Image Manifest v2, Schema 2, or OCI Image Spec v1.
- "github.com/containerd/log"
- distreference "github.com/distribution/reference"
- dimages "github.com/docker/docker/daemon/images"
- "github.com/docker/docker/distribution/metadata"
- "github.com/docker/docker/distribution/xfer"
- "github.com/docker/docker/image"
- "github.com/docker/docker/layer"
- pkgprogress "github.com/docker/docker/pkg/progress"
- "github.com/docker/docker/reference"
- "github.com/moby/buildkit/cache"
- "github.com/moby/buildkit/client/llb"
- "github.com/moby/buildkit/session"
- "github.com/moby/buildkit/solver"
- "github.com/moby/buildkit/solver/pb"
- "github.com/moby/buildkit/source"
- srctypes "github.com/moby/buildkit/source/types"
- "github.com/moby/buildkit/sourcepolicy"
- policy "github.com/moby/buildkit/sourcepolicy/pb"
- spb "github.com/moby/buildkit/sourcepolicy/pb"
- "github.com/moby/buildkit/util/flightcontrol"
- "github.com/moby/buildkit/util/imageutil"
- "github.com/moby/buildkit/util/leaseutil"
- "github.com/moby/buildkit/util/progress"
- "github.com/moby/buildkit/util/resolver"
- "github.com/opencontainers/go-digest"
- "github.com/opencontainers/image-spec/identity"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- "golang.org/x/time/rate"
- )
- // SourceOpt is options for creating the image source
- type SourceOpt struct {
- ContentStore content.Store
- CacheAccessor cache.Accessor
- ReferenceStore reference.Store
- DownloadManager *xfer.LayerDownloadManager
- MetadataStore metadata.V2MetadataService
- ImageStore image.Store
- RegistryHosts docker.RegistryHosts
- LayerStore layer.Store
- LeaseManager leases.Manager
- GarbageCollect func(ctx context.Context) (gc.Stats, error)
- }
- // Source is the source implementation for accessing container images
- type Source struct {
- SourceOpt
- g flightcontrol.Group[*resolveRemoteResult]
- }
- // NewSource creates a new image source
- func NewSource(opt SourceOpt) (*Source, error) {
- return &Source{SourceOpt: opt}, nil
- }
- // ID returns image scheme identifier
- func (is *Source) ID() string {
- return srctypes.DockerImageScheme
- }
- func (is *Source) resolveLocal(refStr string) (*image.Image, error) {
- ref, err := distreference.ParseNormalizedNamed(refStr)
- if err != nil {
- return nil, err
- }
- dgst, err := is.ReferenceStore.Get(ref)
- if err != nil {
- return nil, err
- }
- img, err := is.ImageStore.Get(image.ID(dgst))
- if err != nil {
- return nil, err
- }
- return img, nil
- }
- type resolveRemoteResult struct {
- ref string
- dgst digest.Digest
- dt []byte
- }
- func (is *Source) resolveRemote(ctx context.Context, ref string, platform *ocispec.Platform, sm *session.Manager, g session.Group) (string, digest.Digest, []byte, error) {
- p := platforms.DefaultSpec()
- if platform != nil {
- p = *platform
- }
- // key is used to synchronize resolutions that can happen in parallel when doing multi-stage.
- key := "getconfig::" + ref + "::" + platforms.Format(p)
- res, err := is.g.Do(ctx, key, func(ctx context.Context) (*resolveRemoteResult, error) {
- res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g)
- ref, dgst, dt, err := imageutil.Config(ctx, ref, res, is.ContentStore, is.LeaseManager, platform, []*policy.Policy{})
- if err != nil {
- return nil, err
- }
- return &resolveRemoteResult{ref: ref, dgst: dgst, dt: dt}, nil
- })
- if err != nil {
- return ref, "", nil, err
- }
- return res.ref, res.dgst, res.dt, nil
- }
- // ResolveImageConfig returns image config for an image
- func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (string, digest.Digest, []byte, error) {
- ref, err := applySourcePolicies(ctx, ref, opt.SourcePolicies)
- if err != nil {
- return "", "", nil, err
- }
- resolveMode, err := source.ParseImageResolveMode(opt.ResolveMode)
- if err != nil {
- return ref, "", nil, err
- }
- switch resolveMode {
- case source.ResolveModeForcePull:
- ref, dgst, dt, err := is.resolveRemote(ctx, ref, opt.Platform, sm, g)
- // TODO: pull should fallback to local in case of failure to allow offline behavior
- // the fallback doesn't work currently
- return ref, dgst, dt, err
- /*
- if err == nil {
- return dgst, dt, err
- }
- // fallback to local
- dt, err = is.resolveLocal(ref)
- return "", dt, err
- */
- case source.ResolveModeDefault:
- // default == prefer local, but in the future could be smarter
- fallthrough
- case source.ResolveModePreferLocal:
- img, err := is.resolveLocal(ref)
- if err == nil {
- if opt.Platform != nil && !platformMatches(img, opt.Platform) {
- log.G(ctx).WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, checking remote",
- path.Join(opt.Platform.OS, opt.Platform.Architecture, opt.Platform.Variant),
- path.Join(img.OS, img.Architecture, img.Variant),
- )
- } else {
- return ref, "", img.RawJSON(), err
- }
- }
- // fallback to remote
- return is.resolveRemote(ctx, ref, opt.Platform, sm, g)
- }
- // should never happen
- return ref, "", nil, fmt.Errorf("builder cannot resolve image %s: invalid mode %q", ref, opt.ResolveMode)
- }
- // Resolve returns access to pulling for an identifier
- func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, vtx solver.Vertex) (source.SourceInstance, error) {
- imageIdentifier, ok := id.(*source.ImageIdentifier)
- if !ok {
- return nil, errors.Errorf("invalid image identifier %v", id)
- }
- platform := platforms.DefaultSpec()
- if imageIdentifier.Platform != nil {
- platform = *imageIdentifier.Platform
- }
- p := &puller{
- src: imageIdentifier,
- is: is,
- // resolver: is.getResolver(is.RegistryHosts, imageIdentifier.Reference.String(), sm, g),
- platform: platform,
- sm: sm,
- }
- return p, nil
- }
- type puller struct {
- is *Source
- resolveLocalOnce sync.Once
- g flightcontrol.Group[struct{}]
- src *source.ImageIdentifier
- desc ocispec.Descriptor
- ref string
- config []byte
- platform ocispec.Platform
- sm *session.Manager
- }
- func (p *puller) resolver(g session.Group) remotes.Resolver {
- return resolver.DefaultPool.GetResolver(p.is.RegistryHosts, p.src.Reference.String(), "pull", p.sm, g)
- }
- func (p *puller) mainManifestKey(platform ocispec.Platform) (digest.Digest, error) {
- dt, err := json.Marshal(struct {
- Digest digest.Digest
- OS string
- Arch string
- Variant string `json:",omitempty"`
- }{
- Digest: p.desc.Digest,
- OS: platform.OS,
- Arch: platform.Architecture,
- Variant: platform.Variant,
- })
- if err != nil {
- return "", err
- }
- return digest.FromBytes(dt), nil
- }
- func (p *puller) resolveLocal() {
- p.resolveLocalOnce.Do(func() {
- dgst := p.src.Reference.Digest()
- if dgst != "" {
- info, err := p.is.ContentStore.Info(context.TODO(), dgst)
- if err == nil {
- p.ref = p.src.Reference.String()
- desc := ocispec.Descriptor{
- Size: info.Size,
- Digest: dgst,
- }
- ra, err := p.is.ContentStore.ReaderAt(context.TODO(), desc)
- if err == nil {
- mt, err := imageutil.DetectManifestMediaType(ra)
- if err == nil {
- desc.MediaType = mt
- p.desc = desc
- }
- }
- }
- }
- if p.src.ResolveMode == source.ResolveModeDefault || p.src.ResolveMode == source.ResolveModePreferLocal {
- ref := p.src.Reference.String()
- img, err := p.is.resolveLocal(ref)
- if err == nil {
- if !platformMatches(img, &p.platform) {
- log.G(context.TODO()).WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, not resolving",
- path.Join(p.platform.OS, p.platform.Architecture, p.platform.Variant),
- path.Join(img.OS, img.Architecture, img.Variant),
- )
- } else {
- p.config = img.RawJSON()
- }
- }
- }
- })
- }
- func (p *puller) resolve(ctx context.Context, g session.Group) error {
- _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ struct{}, err error) {
- resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
- defer func() {
- resolveProgressDone(err)
- }()
- ref, err := distreference.ParseNormalizedNamed(p.src.Reference.String())
- if err != nil {
- return struct{}{}, err
- }
- if p.desc.Digest == "" && p.config == nil {
- origRef, desc, err := p.resolver(g).Resolve(ctx, ref.String())
- if err != nil {
- return struct{}{}, err
- }
- p.desc = desc
- p.ref = origRef
- }
- // Schema 1 manifests cannot be resolved to an image config
- // since the conversion must take place after all the content
- // has been read.
- // It may be possible to have a mapping between schema 1 manifests
- // and the schema 2 manifests they are converted to.
- if p.config == nil && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
- ref, err := distreference.WithDigest(ref, p.desc.Digest)
- if err != nil {
- return struct{}{}, err
- }
- newRef, _, dt, err := p.is.ResolveImageConfig(ctx, ref.String(), llb.ResolveImageConfigOpt{Platform: &p.platform, ResolveMode: p.src.ResolveMode.String()}, p.sm, g)
- if err != nil {
- return struct{}{}, err
- }
- p.ref = newRef
- p.config = dt
- }
- return struct{}{}, nil
- })
- return err
- }
- func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, string, solver.CacheOpts, bool, error) {
- p.resolveLocal()
- if p.desc.Digest != "" && index == 0 {
- dgst, err := p.mainManifestKey(p.platform)
- if err != nil {
- return "", "", nil, false, err
- }
- return dgst.String(), p.desc.Digest.String(), nil, false, nil
- }
- if p.config != nil {
- k := cacheKeyFromConfig(p.config).String()
- if k == "" {
- return digest.FromBytes(p.config).String(), digest.FromBytes(p.config).String(), nil, true, nil
- }
- return k, k, nil, true, nil
- }
- if err := p.resolve(ctx, g); err != nil {
- return "", "", nil, false, err
- }
- if p.desc.Digest != "" && index == 0 {
- dgst, err := p.mainManifestKey(p.platform)
- if err != nil {
- return "", "", nil, false, err
- }
- return dgst.String(), p.desc.Digest.String(), nil, false, nil
- }
- if len(p.config) == 0 && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
- return "", "", nil, false, errors.Errorf("invalid empty config file resolved for %s", p.src.Reference.String())
- }
- k := cacheKeyFromConfig(p.config).String()
- if k == "" || p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
- dgst, err := p.mainManifestKey(p.platform)
- if err != nil {
- return "", "", nil, false, err
- }
- return dgst.String(), p.desc.Digest.String(), nil, true, nil
- }
- return k, k, nil, true, nil
- }
- func (p *puller) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
- var parent cache.ImmutableRef
- if len(diffIDs) > 1 {
- var err error
- parent, err = p.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
- if err != nil {
- return nil, err
- }
- defer parent.Release(context.TODO())
- }
- return p.is.CacheAccessor.GetByBlob(ctx, ocispec.Descriptor{
- Annotations: map[string]string{
- "containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
- },
- }, parent, opts...)
- }
- func (p *puller) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
- p.resolveLocal()
- if len(p.config) == 0 {
- if err := p.resolve(ctx, g); err != nil {
- return nil, err
- }
- }
- if p.config != nil {
- img, err := p.is.ImageStore.Get(image.ID(digest.FromBytes(p.config)))
- if err == nil {
- if len(img.RootFS.DiffIDs) == 0 {
- return nil, nil
- }
- l, err := p.is.LayerStore.Get(img.RootFS.ChainID())
- if err == nil {
- layer.ReleaseAndLog(p.is.LayerStore, l)
- ref, err := p.getRef(ctx, img.RootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("from local %s", p.ref)))
- if err != nil {
- return nil, err
- }
- return ref, nil
- }
- }
- }
- ongoing := newJobs(p.ref)
- ctx, done, err := leaseutil.WithLease(ctx, p.is.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
- if err != nil {
- return nil, err
- }
- defer func() {
- done(context.TODO())
- if p.is.GarbageCollect != nil {
- go p.is.GarbageCollect(context.TODO())
- }
- }()
- pctx, stopProgress := context.WithCancel(ctx)
- pw, _, ctx := progress.NewFromContext(ctx)
- defer pw.Close()
- progressDone := make(chan struct{})
- go func() {
- showProgress(pctx, ongoing, p.is.ContentStore, pw)
- close(progressDone)
- }()
- defer func() {
- <-progressDone
- }()
- fetcher, err := p.resolver(g).Fetcher(ctx, p.ref)
- if err != nil {
- stopProgress()
- return nil, err
- }
- platform := platforms.Only(p.platform)
- var nonLayers []digest.Digest
- var (
- schema1Converter *schema1.Converter
- handlers []images.Handler
- )
- if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
- schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
- handlers = append(handlers, schema1Converter)
- // TODO: Optimize to do dispatch and integrate pulling with download manager,
- // leverage existing blob mapping and layer storage
- } else {
- // TODO: need a wrapper snapshot interface that combines content
- // and snapshots as 1) buildkit shouldn't have a dependency on contentstore
- // or 2) cachemanager should manage the contentstore
- handlers = append(handlers, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
- switch desc.MediaType {
- case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
- images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex,
- images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
- nonLayers = append(nonLayers, desc.Digest)
- default:
- return nil, images.ErrSkipDesc
- }
- ongoing.add(desc)
- return nil, nil
- }))
- // Get all the children for a descriptor
- childrenHandler := images.ChildrenHandler(p.is.ContentStore)
- // Filter the children by the platform
- childrenHandler = images.FilterPlatforms(childrenHandler, platform)
- // Limit manifests pulled to the best match in an index
- childrenHandler = images.LimitManifests(childrenHandler, platform, 1)
- handlers = append(handlers,
- remotes.FetchHandler(p.is.ContentStore, fetcher),
- childrenHandler,
- )
- }
- if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
- stopProgress()
- return nil, err
- }
- defer stopProgress()
- if schema1Converter != nil {
- p.desc, err = schema1Converter.Convert(ctx)
- if err != nil {
- return nil, err
- }
- }
- mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platform)
- if err != nil {
- return nil, err
- }
- config, err := images.Config(ctx, p.is.ContentStore, p.desc, platform)
- if err != nil {
- return nil, err
- }
- dt, err := content.ReadBlob(ctx, p.is.ContentStore, config)
- if err != nil {
- return nil, err
- }
- var img ocispec.Image
- if err := json.Unmarshal(dt, &img); err != nil {
- return nil, err
- }
- if len(mfst.Layers) != len(img.RootFS.DiffIDs) {
- return nil, errors.Errorf("invalid config for manifest")
- }
- pchan := make(chan pkgprogress.Progress, 10)
- defer close(pchan)
- go func() {
- m := map[string]struct {
- st time.Time
- limiter *rate.Limiter
- }{}
- for p := range pchan {
- if p.Action == "Extracting" {
- st, ok := m[p.ID]
- if !ok {
- st.st = time.Now()
- st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
- m[p.ID] = st
- }
- var end *time.Time
- if p.LastUpdate || st.limiter.Allow() {
- if p.LastUpdate {
- tm := time.Now()
- end = &tm
- }
- _ = pw.Write("extracting "+p.ID, progress.Status{
- Action: "extract",
- Started: &st.st,
- Completed: end,
- })
- }
- }
- }
- }()
- if len(mfst.Layers) == 0 {
- return nil, nil
- }
- layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers))
- for i, desc := range mfst.Layers {
- if err := desc.Digest.Validate(); err != nil {
- return nil, errors.Wrap(err, "layer digest could not be validated")
- }
- ongoing.add(desc)
- layers = append(layers, &layerDescriptor{
- desc: desc,
- diffID: layer.DiffID(img.RootFS.DiffIDs[i]),
- fetcher: fetcher,
- ref: p.src.Reference,
- is: p.is,
- })
- }
- defer func() {
- <-progressDone
- }()
- r := image.NewRootFS()
- rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, layers, pkgprogress.ChanOutput(pchan))
- stopProgress()
- if err != nil {
- return nil, err
- }
- ref, err := p.getRef(ctx, rootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref)))
- release()
- if err != nil {
- return nil, err
- }
- // keep manifest blobs until ref is alive for cache
- for _, nl := range nonLayers {
- if err := p.is.LeaseManager.AddResource(ctx, leases.Lease{ID: ref.ID()}, leases.Resource{
- ID: nl.String(),
- Type: "content",
- }); err != nil {
- return nil, err
- }
- }
- // TODO: handle windows layers for cross platform builds
- if p.src.RecordType != "" && ref.GetRecordType() == "" {
- if err := ref.SetRecordType(p.src.RecordType); err != nil {
- ref.Release(context.TODO())
- return nil, err
- }
- }
- return ref, nil
- }
- // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
- type layerDescriptor struct {
- is *Source
- fetcher remotes.Fetcher
- desc ocispec.Descriptor
- diffID layer.DiffID
- ref ctdreference.Spec
- }
- func (ld *layerDescriptor) Key() string {
- return "v2:" + ld.desc.Digest.String()
- }
- func (ld *layerDescriptor) ID() string {
- return ld.desc.Digest.String()
- }
- func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
- return ld.diffID, nil
- }
- func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
- rc, err := ld.fetcher.Fetch(ctx, ld.desc)
- if err != nil {
- return nil, 0, err
- }
- defer rc.Close()
- refKey := remotes.MakeRefKey(ctx, ld.desc)
- ld.is.ContentStore.Abort(ctx, refKey)
- if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc); err != nil {
- ld.is.ContentStore.Abort(ctx, refKey)
- return nil, 0, err
- }
- ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc)
- if err != nil {
- return nil, 0, err
- }
- return io.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
- }
- func (ld *layerDescriptor) Close() {
- // ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest))
- }
- func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
- // Cache mapping from this layer's DiffID to the blobsum
- ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator})
- }
- func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) {
- var (
- ticker = time.NewTicker(100 * time.Millisecond)
- statuses = map[string]statusInfo{}
- done bool
- )
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- case <-ctx.Done():
- done = true
- }
- resolved := "resolved"
- if !ongoing.isResolved() {
- resolved = "resolving"
- }
- statuses[ongoing.name] = statusInfo{
- Ref: ongoing.name,
- Status: resolved,
- }
- actives := make(map[string]statusInfo)
- if !done {
- active, err := cs.ListStatuses(ctx)
- if err != nil {
- // log.G(ctx).WithError(err).Error("active check failed")
- continue
- }
- // update status of active entries!
- for _, active := range active {
- actives[active.Ref] = statusInfo{
- Ref: active.Ref,
- Status: "downloading",
- Offset: active.Offset,
- Total: active.Total,
- StartedAt: active.StartedAt,
- UpdatedAt: active.UpdatedAt,
- }
- }
- }
- // now, update the items in jobs that are not in active
- for _, j := range ongoing.jobs() {
- refKey := remotes.MakeRefKey(ctx, j.Descriptor)
- if a, ok := actives[refKey]; ok {
- started := j.started
- _ = pw.Write(j.Digest.String(), progress.Status{
- Action: a.Status,
- Total: int(a.Total),
- Current: int(a.Offset),
- Started: &started,
- })
- continue
- }
- if !j.done {
- info, err := cs.Info(context.TODO(), j.Digest)
- if err != nil {
- if cerrdefs.IsNotFound(err) {
- // _ = pw.Write(j.Digest.String(), progress.Status{
- // Action: "waiting",
- // })
- continue
- }
- } else {
- j.done = true
- }
- if done || j.done {
- started := j.started
- createdAt := info.CreatedAt
- _ = pw.Write(j.Digest.String(), progress.Status{
- Action: "done",
- Current: int(info.Size),
- Total: int(info.Size),
- Completed: &createdAt,
- Started: &started,
- })
- }
- }
- }
- if done {
- return
- }
- }
- }
- // jobs provides a way of identifying the download keys for a particular task
- // encountering during the pull walk.
- //
- // This is very minimal and will probably be replaced with something more
- // featured.
- type jobs struct {
- name string
- added map[digest.Digest]*job
- mu sync.Mutex
- resolved bool
- }
- type job struct {
- ocispec.Descriptor
- done bool
- started time.Time
- }
- func newJobs(name string) *jobs {
- return &jobs{
- name: name,
- added: make(map[digest.Digest]*job),
- }
- }
- func (j *jobs) add(desc ocispec.Descriptor) {
- j.mu.Lock()
- defer j.mu.Unlock()
- if _, ok := j.added[desc.Digest]; ok {
- return
- }
- j.added[desc.Digest] = &job{
- Descriptor: desc,
- started: time.Now(),
- }
- }
- func (j *jobs) jobs() []*job {
- j.mu.Lock()
- defer j.mu.Unlock()
- descs := make([]*job, 0, len(j.added))
- for _, j := range j.added {
- descs = append(descs, j)
- }
- return descs
- }
- func (j *jobs) isResolved() bool {
- j.mu.Lock()
- defer j.mu.Unlock()
- return j.resolved
- }
- type statusInfo struct {
- Ref string
- Status string
- Offset int64
- Total int64
- StartedAt time.Time
- UpdatedAt time.Time
- }
- func oneOffProgress(ctx context.Context, id string) func(err error) error {
- pw, _, _ := progress.NewFromContext(ctx)
- now := time.Now()
- st := progress.Status{
- Started: &now,
- }
- _ = pw.Write(id, st)
- return func(err error) error {
- // TODO: set error on status
- now := time.Now()
- st.Completed = &now
- _ = pw.Write(id, st)
- _ = pw.Close()
- return err
- }
- }
- // cacheKeyFromConfig returns a stable digest from image config. If image config
- // is a known oci image we will use chainID of layers.
- func cacheKeyFromConfig(dt []byte) digest.Digest {
- var img ocispec.Image
- err := json.Unmarshal(dt, &img)
- if err != nil {
- log.G(context.TODO()).WithError(err).Errorf("failed to unmarshal image config for cache key %v", err)
- return digest.FromBytes(dt)
- }
- if img.RootFS.Type != "layers" || len(img.RootFS.DiffIDs) == 0 {
- return ""
- }
- return identity.ChainID(img.RootFS.DiffIDs)
- }
- func platformMatches(img *image.Image, p *ocispec.Platform) bool {
- return dimages.OnlyPlatformWithFallback(*p).Match(ocispec.Platform{
- Architecture: img.Architecture,
- OS: img.OS,
- OSVersion: img.OSVersion,
- OSFeatures: img.OSFeatures,
- Variant: img.Variant,
- })
- }
- func applySourcePolicies(ctx context.Context, str string, spls []*spb.Policy) (string, error) {
- ref, err := cdreference.Parse(str)
- if err != nil {
- return "", errors.WithStack(err)
- }
- op := &pb.Op{
- Op: &pb.Op_Source{
- Source: &pb.SourceOp{
- Identifier: srctypes.DockerImageScheme + "://" + ref.String(),
- },
- },
- }
- mut, err := sourcepolicy.NewEngine(spls).Evaluate(ctx, op)
- if err != nil {
- return "", errors.Wrap(err, "could not resolve image due to policy")
- }
- if mut {
- var (
- t string
- ok bool
- )
- t, newRef, ok := strings.Cut(op.GetSource().GetIdentifier(), "://")
- if !ok {
- return "", errors.Errorf("could not parse ref: %s", op.GetSource().GetIdentifier())
- }
- if ok && t != srctypes.DockerImageScheme {
- return "", &imageutil.ResolveToNonImageError{Ref: str, Updated: newRef}
- }
- ref, err = cdreference.Parse(newRef)
- if err != nil {
- return "", errors.WithStack(err)
- }
- }
- return ref.String(), nil
- }
|