pull.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. package containerimage
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "path"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/containerd/containerd/content"
  12. cerrdefs "github.com/containerd/containerd/errdefs"
  13. "github.com/containerd/containerd/gc"
  14. "github.com/containerd/containerd/images"
  15. "github.com/containerd/containerd/leases"
  16. "github.com/containerd/containerd/platforms"
  17. cdreference "github.com/containerd/containerd/reference"
  18. ctdreference "github.com/containerd/containerd/reference"
  19. "github.com/containerd/containerd/remotes"
  20. "github.com/containerd/containerd/remotes/docker"
  21. "github.com/containerd/containerd/remotes/docker/schema1" //nolint:staticcheck // Ignore SA1019: "github.com/containerd/containerd/remotes/docker/schema1" is deprecated: use images formatted in Docker Image Manifest v2, Schema 2, or OCI Image Spec v1.
  22. "github.com/containerd/log"
  23. distreference "github.com/distribution/reference"
  24. dimages "github.com/docker/docker/daemon/images"
  25. "github.com/docker/docker/distribution/metadata"
  26. "github.com/docker/docker/distribution/xfer"
  27. "github.com/docker/docker/image"
  28. "github.com/docker/docker/layer"
  29. pkgprogress "github.com/docker/docker/pkg/progress"
  30. "github.com/docker/docker/reference"
  31. "github.com/moby/buildkit/cache"
  32. "github.com/moby/buildkit/client/llb"
  33. "github.com/moby/buildkit/session"
  34. "github.com/moby/buildkit/solver"
  35. "github.com/moby/buildkit/solver/pb"
  36. "github.com/moby/buildkit/source"
  37. srctypes "github.com/moby/buildkit/source/types"
  38. "github.com/moby/buildkit/sourcepolicy"
  39. policy "github.com/moby/buildkit/sourcepolicy/pb"
  40. spb "github.com/moby/buildkit/sourcepolicy/pb"
  41. "github.com/moby/buildkit/util/flightcontrol"
  42. "github.com/moby/buildkit/util/imageutil"
  43. "github.com/moby/buildkit/util/leaseutil"
  44. "github.com/moby/buildkit/util/progress"
  45. "github.com/moby/buildkit/util/resolver"
  46. "github.com/opencontainers/go-digest"
  47. "github.com/opencontainers/image-spec/identity"
  48. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  49. "github.com/pkg/errors"
  50. "golang.org/x/time/rate"
  51. )
  52. // SourceOpt is options for creating the image source
  53. type SourceOpt struct {
  54. ContentStore content.Store
  55. CacheAccessor cache.Accessor
  56. ReferenceStore reference.Store
  57. DownloadManager *xfer.LayerDownloadManager
  58. MetadataStore metadata.V2MetadataService
  59. ImageStore image.Store
  60. RegistryHosts docker.RegistryHosts
  61. LayerStore layer.Store
  62. LeaseManager leases.Manager
  63. GarbageCollect func(ctx context.Context) (gc.Stats, error)
  64. }
  65. // Source is the source implementation for accessing container images
  66. type Source struct {
  67. SourceOpt
  68. g flightcontrol.Group[*resolveRemoteResult]
  69. }
  70. // NewSource creates a new image source
  71. func NewSource(opt SourceOpt) (*Source, error) {
  72. return &Source{SourceOpt: opt}, nil
  73. }
  74. // ID returns image scheme identifier
  75. func (is *Source) ID() string {
  76. return srctypes.DockerImageScheme
  77. }
  78. func (is *Source) resolveLocal(refStr string) (*image.Image, error) {
  79. ref, err := distreference.ParseNormalizedNamed(refStr)
  80. if err != nil {
  81. return nil, err
  82. }
  83. dgst, err := is.ReferenceStore.Get(ref)
  84. if err != nil {
  85. return nil, err
  86. }
  87. img, err := is.ImageStore.Get(image.ID(dgst))
  88. if err != nil {
  89. return nil, err
  90. }
  91. return img, nil
  92. }
  93. type resolveRemoteResult struct {
  94. ref string
  95. dgst digest.Digest
  96. dt []byte
  97. }
  98. func (is *Source) resolveRemote(ctx context.Context, ref string, platform *ocispec.Platform, sm *session.Manager, g session.Group) (string, digest.Digest, []byte, error) {
  99. p := platforms.DefaultSpec()
  100. if platform != nil {
  101. p = *platform
  102. }
  103. // key is used to synchronize resolutions that can happen in parallel when doing multi-stage.
  104. key := "getconfig::" + ref + "::" + platforms.Format(p)
  105. res, err := is.g.Do(ctx, key, func(ctx context.Context) (*resolveRemoteResult, error) {
  106. res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g)
  107. ref, dgst, dt, err := imageutil.Config(ctx, ref, res, is.ContentStore, is.LeaseManager, platform, []*policy.Policy{})
  108. if err != nil {
  109. return nil, err
  110. }
  111. return &resolveRemoteResult{ref: ref, dgst: dgst, dt: dt}, nil
  112. })
  113. if err != nil {
  114. return ref, "", nil, err
  115. }
  116. return res.ref, res.dgst, res.dt, nil
  117. }
  118. // ResolveImageConfig returns image config for an image
  119. func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (string, digest.Digest, []byte, error) {
  120. ref, err := applySourcePolicies(ctx, ref, opt.SourcePolicies)
  121. if err != nil {
  122. return "", "", nil, err
  123. }
  124. resolveMode, err := source.ParseImageResolveMode(opt.ResolveMode)
  125. if err != nil {
  126. return ref, "", nil, err
  127. }
  128. switch resolveMode {
  129. case source.ResolveModeForcePull:
  130. ref, dgst, dt, err := is.resolveRemote(ctx, ref, opt.Platform, sm, g)
  131. // TODO: pull should fallback to local in case of failure to allow offline behavior
  132. // the fallback doesn't work currently
  133. return ref, dgst, dt, err
  134. /*
  135. if err == nil {
  136. return dgst, dt, err
  137. }
  138. // fallback to local
  139. dt, err = is.resolveLocal(ref)
  140. return "", dt, err
  141. */
  142. case source.ResolveModeDefault:
  143. // default == prefer local, but in the future could be smarter
  144. fallthrough
  145. case source.ResolveModePreferLocal:
  146. img, err := is.resolveLocal(ref)
  147. if err == nil {
  148. if opt.Platform != nil && !platformMatches(img, opt.Platform) {
  149. log.G(ctx).WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, checking remote",
  150. path.Join(opt.Platform.OS, opt.Platform.Architecture, opt.Platform.Variant),
  151. path.Join(img.OS, img.Architecture, img.Variant),
  152. )
  153. } else {
  154. return ref, "", img.RawJSON(), err
  155. }
  156. }
  157. // fallback to remote
  158. return is.resolveRemote(ctx, ref, opt.Platform, sm, g)
  159. }
  160. // should never happen
  161. return ref, "", nil, fmt.Errorf("builder cannot resolve image %s: invalid mode %q", ref, opt.ResolveMode)
  162. }
  163. // Resolve returns access to pulling for an identifier
  164. func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, vtx solver.Vertex) (source.SourceInstance, error) {
  165. imageIdentifier, ok := id.(*source.ImageIdentifier)
  166. if !ok {
  167. return nil, errors.Errorf("invalid image identifier %v", id)
  168. }
  169. platform := platforms.DefaultSpec()
  170. if imageIdentifier.Platform != nil {
  171. platform = *imageIdentifier.Platform
  172. }
  173. p := &puller{
  174. src: imageIdentifier,
  175. is: is,
  176. // resolver: is.getResolver(is.RegistryHosts, imageIdentifier.Reference.String(), sm, g),
  177. platform: platform,
  178. sm: sm,
  179. }
  180. return p, nil
  181. }
  182. type puller struct {
  183. is *Source
  184. resolveLocalOnce sync.Once
  185. g flightcontrol.Group[struct{}]
  186. src *source.ImageIdentifier
  187. desc ocispec.Descriptor
  188. ref string
  189. config []byte
  190. platform ocispec.Platform
  191. sm *session.Manager
  192. }
  193. func (p *puller) resolver(g session.Group) remotes.Resolver {
  194. return resolver.DefaultPool.GetResolver(p.is.RegistryHosts, p.src.Reference.String(), "pull", p.sm, g)
  195. }
  196. func (p *puller) mainManifestKey(platform ocispec.Platform) (digest.Digest, error) {
  197. dt, err := json.Marshal(struct {
  198. Digest digest.Digest
  199. OS string
  200. Arch string
  201. Variant string `json:",omitempty"`
  202. }{
  203. Digest: p.desc.Digest,
  204. OS: platform.OS,
  205. Arch: platform.Architecture,
  206. Variant: platform.Variant,
  207. })
  208. if err != nil {
  209. return "", err
  210. }
  211. return digest.FromBytes(dt), nil
  212. }
  213. func (p *puller) resolveLocal() {
  214. p.resolveLocalOnce.Do(func() {
  215. dgst := p.src.Reference.Digest()
  216. if dgst != "" {
  217. info, err := p.is.ContentStore.Info(context.TODO(), dgst)
  218. if err == nil {
  219. p.ref = p.src.Reference.String()
  220. desc := ocispec.Descriptor{
  221. Size: info.Size,
  222. Digest: dgst,
  223. }
  224. ra, err := p.is.ContentStore.ReaderAt(context.TODO(), desc)
  225. if err == nil {
  226. mt, err := imageutil.DetectManifestMediaType(ra)
  227. if err == nil {
  228. desc.MediaType = mt
  229. p.desc = desc
  230. }
  231. }
  232. }
  233. }
  234. if p.src.ResolveMode == source.ResolveModeDefault || p.src.ResolveMode == source.ResolveModePreferLocal {
  235. ref := p.src.Reference.String()
  236. img, err := p.is.resolveLocal(ref)
  237. if err == nil {
  238. if !platformMatches(img, &p.platform) {
  239. log.G(context.TODO()).WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, not resolving",
  240. path.Join(p.platform.OS, p.platform.Architecture, p.platform.Variant),
  241. path.Join(img.OS, img.Architecture, img.Variant),
  242. )
  243. } else {
  244. p.config = img.RawJSON()
  245. }
  246. }
  247. }
  248. })
  249. }
  250. func (p *puller) resolve(ctx context.Context, g session.Group) error {
  251. _, err := p.g.Do(ctx, "", func(ctx context.Context) (_ struct{}, err error) {
  252. resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
  253. defer func() {
  254. resolveProgressDone(err)
  255. }()
  256. ref, err := distreference.ParseNormalizedNamed(p.src.Reference.String())
  257. if err != nil {
  258. return struct{}{}, err
  259. }
  260. if p.desc.Digest == "" && p.config == nil {
  261. origRef, desc, err := p.resolver(g).Resolve(ctx, ref.String())
  262. if err != nil {
  263. return struct{}{}, err
  264. }
  265. p.desc = desc
  266. p.ref = origRef
  267. }
  268. // Schema 1 manifests cannot be resolved to an image config
  269. // since the conversion must take place after all the content
  270. // has been read.
  271. // It may be possible to have a mapping between schema 1 manifests
  272. // and the schema 2 manifests they are converted to.
  273. if p.config == nil && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
  274. ref, err := distreference.WithDigest(ref, p.desc.Digest)
  275. if err != nil {
  276. return struct{}{}, err
  277. }
  278. newRef, _, dt, err := p.is.ResolveImageConfig(ctx, ref.String(), llb.ResolveImageConfigOpt{Platform: &p.platform, ResolveMode: p.src.ResolveMode.String()}, p.sm, g)
  279. if err != nil {
  280. return struct{}{}, err
  281. }
  282. p.ref = newRef
  283. p.config = dt
  284. }
  285. return struct{}{}, nil
  286. })
  287. return err
  288. }
  289. func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, string, solver.CacheOpts, bool, error) {
  290. p.resolveLocal()
  291. if p.desc.Digest != "" && index == 0 {
  292. dgst, err := p.mainManifestKey(p.platform)
  293. if err != nil {
  294. return "", "", nil, false, err
  295. }
  296. return dgst.String(), p.desc.Digest.String(), nil, false, nil
  297. }
  298. if p.config != nil {
  299. k := cacheKeyFromConfig(p.config).String()
  300. if k == "" {
  301. return digest.FromBytes(p.config).String(), digest.FromBytes(p.config).String(), nil, true, nil
  302. }
  303. return k, k, nil, true, nil
  304. }
  305. if err := p.resolve(ctx, g); err != nil {
  306. return "", "", nil, false, err
  307. }
  308. if p.desc.Digest != "" && index == 0 {
  309. dgst, err := p.mainManifestKey(p.platform)
  310. if err != nil {
  311. return "", "", nil, false, err
  312. }
  313. return dgst.String(), p.desc.Digest.String(), nil, false, nil
  314. }
  315. if len(p.config) == 0 && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
  316. return "", "", nil, false, errors.Errorf("invalid empty config file resolved for %s", p.src.Reference.String())
  317. }
  318. k := cacheKeyFromConfig(p.config).String()
  319. if k == "" || p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
  320. dgst, err := p.mainManifestKey(p.platform)
  321. if err != nil {
  322. return "", "", nil, false, err
  323. }
  324. return dgst.String(), p.desc.Digest.String(), nil, true, nil
  325. }
  326. return k, k, nil, true, nil
  327. }
  328. func (p *puller) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
  329. var parent cache.ImmutableRef
  330. if len(diffIDs) > 1 {
  331. var err error
  332. parent, err = p.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
  333. if err != nil {
  334. return nil, err
  335. }
  336. defer parent.Release(context.TODO())
  337. }
  338. return p.is.CacheAccessor.GetByBlob(ctx, ocispec.Descriptor{
  339. Annotations: map[string]string{
  340. "containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
  341. },
  342. }, parent, opts...)
  343. }
  344. func (p *puller) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
  345. p.resolveLocal()
  346. if len(p.config) == 0 {
  347. if err := p.resolve(ctx, g); err != nil {
  348. return nil, err
  349. }
  350. }
  351. if p.config != nil {
  352. img, err := p.is.ImageStore.Get(image.ID(digest.FromBytes(p.config)))
  353. if err == nil {
  354. if len(img.RootFS.DiffIDs) == 0 {
  355. return nil, nil
  356. }
  357. l, err := p.is.LayerStore.Get(img.RootFS.ChainID())
  358. if err == nil {
  359. layer.ReleaseAndLog(p.is.LayerStore, l)
  360. ref, err := p.getRef(ctx, img.RootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("from local %s", p.ref)))
  361. if err != nil {
  362. return nil, err
  363. }
  364. return ref, nil
  365. }
  366. }
  367. }
  368. ongoing := newJobs(p.ref)
  369. ctx, done, err := leaseutil.WithLease(ctx, p.is.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
  370. if err != nil {
  371. return nil, err
  372. }
  373. defer func() {
  374. done(context.TODO())
  375. if p.is.GarbageCollect != nil {
  376. go p.is.GarbageCollect(context.TODO())
  377. }
  378. }()
  379. pctx, stopProgress := context.WithCancel(ctx)
  380. pw, _, ctx := progress.NewFromContext(ctx)
  381. defer pw.Close()
  382. progressDone := make(chan struct{})
  383. go func() {
  384. showProgress(pctx, ongoing, p.is.ContentStore, pw)
  385. close(progressDone)
  386. }()
  387. defer func() {
  388. <-progressDone
  389. }()
  390. fetcher, err := p.resolver(g).Fetcher(ctx, p.ref)
  391. if err != nil {
  392. stopProgress()
  393. return nil, err
  394. }
  395. platform := platforms.Only(p.platform)
  396. var nonLayers []digest.Digest
  397. var (
  398. schema1Converter *schema1.Converter
  399. handlers []images.Handler
  400. )
  401. if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
  402. schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
  403. handlers = append(handlers, schema1Converter)
  404. // TODO: Optimize to do dispatch and integrate pulling with download manager,
  405. // leverage existing blob mapping and layer storage
  406. } else {
  407. // TODO: need a wrapper snapshot interface that combines content
  408. // and snapshots as 1) buildkit shouldn't have a dependency on contentstore
  409. // or 2) cachemanager should manage the contentstore
  410. handlers = append(handlers, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  411. switch desc.MediaType {
  412. case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
  413. images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex,
  414. images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
  415. nonLayers = append(nonLayers, desc.Digest)
  416. default:
  417. return nil, images.ErrSkipDesc
  418. }
  419. ongoing.add(desc)
  420. return nil, nil
  421. }))
  422. // Get all the children for a descriptor
  423. childrenHandler := images.ChildrenHandler(p.is.ContentStore)
  424. // Filter the children by the platform
  425. childrenHandler = images.FilterPlatforms(childrenHandler, platform)
  426. // Limit manifests pulled to the best match in an index
  427. childrenHandler = images.LimitManifests(childrenHandler, platform, 1)
  428. handlers = append(handlers,
  429. remotes.FetchHandler(p.is.ContentStore, fetcher),
  430. childrenHandler,
  431. )
  432. }
  433. if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
  434. stopProgress()
  435. return nil, err
  436. }
  437. defer stopProgress()
  438. if schema1Converter != nil {
  439. p.desc, err = schema1Converter.Convert(ctx)
  440. if err != nil {
  441. return nil, err
  442. }
  443. }
  444. mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platform)
  445. if err != nil {
  446. return nil, err
  447. }
  448. config, err := images.Config(ctx, p.is.ContentStore, p.desc, platform)
  449. if err != nil {
  450. return nil, err
  451. }
  452. dt, err := content.ReadBlob(ctx, p.is.ContentStore, config)
  453. if err != nil {
  454. return nil, err
  455. }
  456. var img ocispec.Image
  457. if err := json.Unmarshal(dt, &img); err != nil {
  458. return nil, err
  459. }
  460. if len(mfst.Layers) != len(img.RootFS.DiffIDs) {
  461. return nil, errors.Errorf("invalid config for manifest")
  462. }
  463. pchan := make(chan pkgprogress.Progress, 10)
  464. defer close(pchan)
  465. go func() {
  466. m := map[string]struct {
  467. st time.Time
  468. limiter *rate.Limiter
  469. }{}
  470. for p := range pchan {
  471. if p.Action == "Extracting" {
  472. st, ok := m[p.ID]
  473. if !ok {
  474. st.st = time.Now()
  475. st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
  476. m[p.ID] = st
  477. }
  478. var end *time.Time
  479. if p.LastUpdate || st.limiter.Allow() {
  480. if p.LastUpdate {
  481. tm := time.Now()
  482. end = &tm
  483. }
  484. _ = pw.Write("extracting "+p.ID, progress.Status{
  485. Action: "extract",
  486. Started: &st.st,
  487. Completed: end,
  488. })
  489. }
  490. }
  491. }
  492. }()
  493. if len(mfst.Layers) == 0 {
  494. return nil, nil
  495. }
  496. layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers))
  497. for i, desc := range mfst.Layers {
  498. if err := desc.Digest.Validate(); err != nil {
  499. return nil, errors.Wrap(err, "layer digest could not be validated")
  500. }
  501. ongoing.add(desc)
  502. layers = append(layers, &layerDescriptor{
  503. desc: desc,
  504. diffID: layer.DiffID(img.RootFS.DiffIDs[i]),
  505. fetcher: fetcher,
  506. ref: p.src.Reference,
  507. is: p.is,
  508. })
  509. }
  510. defer func() {
  511. <-progressDone
  512. }()
  513. r := image.NewRootFS()
  514. rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, layers, pkgprogress.ChanOutput(pchan))
  515. stopProgress()
  516. if err != nil {
  517. return nil, err
  518. }
  519. ref, err := p.getRef(ctx, rootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref)))
  520. release()
  521. if err != nil {
  522. return nil, err
  523. }
  524. // keep manifest blobs until ref is alive for cache
  525. for _, nl := range nonLayers {
  526. if err := p.is.LeaseManager.AddResource(ctx, leases.Lease{ID: ref.ID()}, leases.Resource{
  527. ID: nl.String(),
  528. Type: "content",
  529. }); err != nil {
  530. return nil, err
  531. }
  532. }
  533. // TODO: handle windows layers for cross platform builds
  534. if p.src.RecordType != "" && ref.GetRecordType() == "" {
  535. if err := ref.SetRecordType(p.src.RecordType); err != nil {
  536. ref.Release(context.TODO())
  537. return nil, err
  538. }
  539. }
  540. return ref, nil
  541. }
  542. // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
  543. type layerDescriptor struct {
  544. is *Source
  545. fetcher remotes.Fetcher
  546. desc ocispec.Descriptor
  547. diffID layer.DiffID
  548. ref ctdreference.Spec
  549. }
  550. func (ld *layerDescriptor) Key() string {
  551. return "v2:" + ld.desc.Digest.String()
  552. }
  553. func (ld *layerDescriptor) ID() string {
  554. return ld.desc.Digest.String()
  555. }
  556. func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
  557. return ld.diffID, nil
  558. }
  559. func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
  560. rc, err := ld.fetcher.Fetch(ctx, ld.desc)
  561. if err != nil {
  562. return nil, 0, err
  563. }
  564. defer rc.Close()
  565. refKey := remotes.MakeRefKey(ctx, ld.desc)
  566. ld.is.ContentStore.Abort(ctx, refKey)
  567. if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc); err != nil {
  568. ld.is.ContentStore.Abort(ctx, refKey)
  569. return nil, 0, err
  570. }
  571. ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc)
  572. if err != nil {
  573. return nil, 0, err
  574. }
  575. return io.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
  576. }
  577. func (ld *layerDescriptor) Close() {
  578. // ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest))
  579. }
  580. func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
  581. // Cache mapping from this layer's DiffID to the blobsum
  582. ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator})
  583. }
  584. func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) {
  585. var (
  586. ticker = time.NewTicker(100 * time.Millisecond)
  587. statuses = map[string]statusInfo{}
  588. done bool
  589. )
  590. defer ticker.Stop()
  591. for {
  592. select {
  593. case <-ticker.C:
  594. case <-ctx.Done():
  595. done = true
  596. }
  597. resolved := "resolved"
  598. if !ongoing.isResolved() {
  599. resolved = "resolving"
  600. }
  601. statuses[ongoing.name] = statusInfo{
  602. Ref: ongoing.name,
  603. Status: resolved,
  604. }
  605. actives := make(map[string]statusInfo)
  606. if !done {
  607. active, err := cs.ListStatuses(ctx)
  608. if err != nil {
  609. // log.G(ctx).WithError(err).Error("active check failed")
  610. continue
  611. }
  612. // update status of active entries!
  613. for _, active := range active {
  614. actives[active.Ref] = statusInfo{
  615. Ref: active.Ref,
  616. Status: "downloading",
  617. Offset: active.Offset,
  618. Total: active.Total,
  619. StartedAt: active.StartedAt,
  620. UpdatedAt: active.UpdatedAt,
  621. }
  622. }
  623. }
  624. // now, update the items in jobs that are not in active
  625. for _, j := range ongoing.jobs() {
  626. refKey := remotes.MakeRefKey(ctx, j.Descriptor)
  627. if a, ok := actives[refKey]; ok {
  628. started := j.started
  629. _ = pw.Write(j.Digest.String(), progress.Status{
  630. Action: a.Status,
  631. Total: int(a.Total),
  632. Current: int(a.Offset),
  633. Started: &started,
  634. })
  635. continue
  636. }
  637. if !j.done {
  638. info, err := cs.Info(context.TODO(), j.Digest)
  639. if err != nil {
  640. if cerrdefs.IsNotFound(err) {
  641. // _ = pw.Write(j.Digest.String(), progress.Status{
  642. // Action: "waiting",
  643. // })
  644. continue
  645. }
  646. } else {
  647. j.done = true
  648. }
  649. if done || j.done {
  650. started := j.started
  651. createdAt := info.CreatedAt
  652. _ = pw.Write(j.Digest.String(), progress.Status{
  653. Action: "done",
  654. Current: int(info.Size),
  655. Total: int(info.Size),
  656. Completed: &createdAt,
  657. Started: &started,
  658. })
  659. }
  660. }
  661. }
  662. if done {
  663. return
  664. }
  665. }
  666. }
  667. // jobs provides a way of identifying the download keys for a particular task
  668. // encountering during the pull walk.
  669. //
  670. // This is very minimal and will probably be replaced with something more
  671. // featured.
  672. type jobs struct {
  673. name string
  674. added map[digest.Digest]*job
  675. mu sync.Mutex
  676. resolved bool
  677. }
  678. type job struct {
  679. ocispec.Descriptor
  680. done bool
  681. started time.Time
  682. }
  683. func newJobs(name string) *jobs {
  684. return &jobs{
  685. name: name,
  686. added: make(map[digest.Digest]*job),
  687. }
  688. }
  689. func (j *jobs) add(desc ocispec.Descriptor) {
  690. j.mu.Lock()
  691. defer j.mu.Unlock()
  692. if _, ok := j.added[desc.Digest]; ok {
  693. return
  694. }
  695. j.added[desc.Digest] = &job{
  696. Descriptor: desc,
  697. started: time.Now(),
  698. }
  699. }
  700. func (j *jobs) jobs() []*job {
  701. j.mu.Lock()
  702. defer j.mu.Unlock()
  703. descs := make([]*job, 0, len(j.added))
  704. for _, j := range j.added {
  705. descs = append(descs, j)
  706. }
  707. return descs
  708. }
  709. func (j *jobs) isResolved() bool {
  710. j.mu.Lock()
  711. defer j.mu.Unlock()
  712. return j.resolved
  713. }
  714. type statusInfo struct {
  715. Ref string
  716. Status string
  717. Offset int64
  718. Total int64
  719. StartedAt time.Time
  720. UpdatedAt time.Time
  721. }
  722. func oneOffProgress(ctx context.Context, id string) func(err error) error {
  723. pw, _, _ := progress.NewFromContext(ctx)
  724. now := time.Now()
  725. st := progress.Status{
  726. Started: &now,
  727. }
  728. _ = pw.Write(id, st)
  729. return func(err error) error {
  730. // TODO: set error on status
  731. now := time.Now()
  732. st.Completed = &now
  733. _ = pw.Write(id, st)
  734. _ = pw.Close()
  735. return err
  736. }
  737. }
  738. // cacheKeyFromConfig returns a stable digest from image config. If image config
  739. // is a known oci image we will use chainID of layers.
  740. func cacheKeyFromConfig(dt []byte) digest.Digest {
  741. var img ocispec.Image
  742. err := json.Unmarshal(dt, &img)
  743. if err != nil {
  744. log.G(context.TODO()).WithError(err).Errorf("failed to unmarshal image config for cache key %v", err)
  745. return digest.FromBytes(dt)
  746. }
  747. if img.RootFS.Type != "layers" || len(img.RootFS.DiffIDs) == 0 {
  748. return ""
  749. }
  750. return identity.ChainID(img.RootFS.DiffIDs)
  751. }
  752. func platformMatches(img *image.Image, p *ocispec.Platform) bool {
  753. return dimages.OnlyPlatformWithFallback(*p).Match(ocispec.Platform{
  754. Architecture: img.Architecture,
  755. OS: img.OS,
  756. OSVersion: img.OSVersion,
  757. OSFeatures: img.OSFeatures,
  758. Variant: img.Variant,
  759. })
  760. }
  761. func applySourcePolicies(ctx context.Context, str string, spls []*spb.Policy) (string, error) {
  762. ref, err := cdreference.Parse(str)
  763. if err != nil {
  764. return "", errors.WithStack(err)
  765. }
  766. op := &pb.Op{
  767. Op: &pb.Op_Source{
  768. Source: &pb.SourceOp{
  769. Identifier: srctypes.DockerImageScheme + "://" + ref.String(),
  770. },
  771. },
  772. }
  773. mut, err := sourcepolicy.NewEngine(spls).Evaluate(ctx, op)
  774. if err != nil {
  775. return "", errors.Wrap(err, "could not resolve image due to policy")
  776. }
  777. if mut {
  778. var (
  779. t string
  780. ok bool
  781. )
  782. t, newRef, ok := strings.Cut(op.GetSource().GetIdentifier(), "://")
  783. if !ok {
  784. return "", errors.Errorf("could not parse ref: %s", op.GetSource().GetIdentifier())
  785. }
  786. if ok && t != srctypes.DockerImageScheme {
  787. return "", &imageutil.ResolveToNonImageError{Ref: str, Updated: newRef}
  788. }
  789. ref, err = cdreference.Parse(newRef)
  790. if err != nil {
  791. return "", errors.WithStack(err)
  792. }
  793. }
  794. return ref.String(), nil
  795. }