worker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. package worker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. nethttp "net/http"
  7. "runtime"
  8. "strings"
  9. "time"
  10. "github.com/containerd/containerd/content"
  11. "github.com/containerd/containerd/images"
  12. "github.com/containerd/containerd/platforms"
  13. "github.com/containerd/containerd/rootfs"
  14. "github.com/docker/docker/builder/builder-next/adapters/containerimage"
  15. "github.com/docker/docker/distribution"
  16. distmetadata "github.com/docker/docker/distribution/metadata"
  17. "github.com/docker/docker/distribution/xfer"
  18. "github.com/docker/docker/image"
  19. "github.com/docker/docker/layer"
  20. pkgprogress "github.com/docker/docker/pkg/progress"
  21. "github.com/moby/buildkit/cache"
  22. "github.com/moby/buildkit/cache/metadata"
  23. "github.com/moby/buildkit/client"
  24. "github.com/moby/buildkit/client/llb"
  25. "github.com/moby/buildkit/executor"
  26. "github.com/moby/buildkit/exporter"
  27. localexporter "github.com/moby/buildkit/exporter/local"
  28. tarexporter "github.com/moby/buildkit/exporter/tar"
  29. "github.com/moby/buildkit/frontend"
  30. "github.com/moby/buildkit/session"
  31. "github.com/moby/buildkit/snapshot"
  32. "github.com/moby/buildkit/solver"
  33. "github.com/moby/buildkit/solver/llbsolver/mounts"
  34. "github.com/moby/buildkit/solver/llbsolver/ops"
  35. "github.com/moby/buildkit/solver/pb"
  36. "github.com/moby/buildkit/source"
  37. "github.com/moby/buildkit/source/git"
  38. "github.com/moby/buildkit/source/http"
  39. "github.com/moby/buildkit/source/local"
  40. "github.com/moby/buildkit/util/archutil"
  41. "github.com/moby/buildkit/util/compression"
  42. "github.com/moby/buildkit/util/contentutil"
  43. "github.com/moby/buildkit/util/progress"
  44. digest "github.com/opencontainers/go-digest"
  45. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  46. "github.com/pkg/errors"
  47. "github.com/sirupsen/logrus"
  48. bolt "go.etcd.io/bbolt"
  49. "golang.org/x/sync/semaphore"
  50. )
  51. const labelCreatedAt = "buildkit/createdat"
  52. // LayerAccess provides access to a moby layer from a snapshot
  53. type LayerAccess interface {
  54. GetDiffIDs(ctx context.Context, key string) ([]layer.DiffID, error)
  55. EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error)
  56. }
  57. // Opt defines a structure for creating a worker.
  58. type Opt struct {
  59. ID string
  60. Labels map[string]string
  61. GCPolicy []client.PruneInfo
  62. MetadataStore *metadata.Store
  63. Executor executor.Executor
  64. Snapshotter snapshot.Snapshotter
  65. ContentStore content.Store
  66. CacheManager cache.Manager
  67. ImageSource *containerimage.Source
  68. DownloadManager distribution.RootFSDownloadManager
  69. V2MetadataService distmetadata.V2MetadataService
  70. Transport nethttp.RoundTripper
  71. Exporter exporter.Exporter
  72. Layers LayerAccess
  73. Platforms []ocispec.Platform
  74. }
  75. // Worker is a local worker instance with dedicated snapshotter, cache, and so on.
  76. // TODO: s/Worker/OpWorker/g ?
  77. type Worker struct {
  78. Opt
  79. SourceManager *source.Manager
  80. }
  81. // NewWorker instantiates a local worker
  82. func NewWorker(opt Opt) (*Worker, error) {
  83. sm, err := source.NewManager()
  84. if err != nil {
  85. return nil, err
  86. }
  87. cm := opt.CacheManager
  88. sm.Register(opt.ImageSource)
  89. gs, err := git.NewSource(git.Opt{
  90. CacheAccessor: cm,
  91. MetadataStore: opt.MetadataStore,
  92. })
  93. if err == nil {
  94. sm.Register(gs)
  95. } else {
  96. logrus.Warnf("Could not register builder git source: %s", err)
  97. }
  98. hs, err := http.NewSource(http.Opt{
  99. CacheAccessor: cm,
  100. MetadataStore: opt.MetadataStore,
  101. Transport: opt.Transport,
  102. })
  103. if err == nil {
  104. sm.Register(hs)
  105. } else {
  106. logrus.Warnf("Could not register builder http source: %s", err)
  107. }
  108. ss, err := local.NewSource(local.Opt{
  109. CacheAccessor: cm,
  110. MetadataStore: opt.MetadataStore,
  111. })
  112. if err == nil {
  113. sm.Register(ss)
  114. } else {
  115. logrus.Warnf("Could not register builder local source: %s", err)
  116. }
  117. return &Worker{
  118. Opt: opt,
  119. SourceManager: sm,
  120. }, nil
  121. }
  122. // ID returns worker ID
  123. func (w *Worker) ID() string {
  124. return w.Opt.ID
  125. }
  126. // Labels returns map of all worker labels
  127. func (w *Worker) Labels() map[string]string {
  128. return w.Opt.Labels
  129. }
  130. // Platforms returns one or more platforms supported by the image.
  131. func (w *Worker) Platforms(noCache bool) []ocispec.Platform {
  132. if noCache {
  133. pm := make(map[string]struct{}, len(w.Opt.Platforms))
  134. for _, p := range w.Opt.Platforms {
  135. pm[platforms.Format(p)] = struct{}{}
  136. }
  137. for _, p := range archutil.SupportedPlatforms(noCache) {
  138. if _, ok := pm[p]; !ok {
  139. pp, _ := platforms.Parse(p)
  140. w.Opt.Platforms = append(w.Opt.Platforms, pp)
  141. }
  142. }
  143. }
  144. if len(w.Opt.Platforms) == 0 {
  145. return []ocispec.Platform{platforms.DefaultSpec()}
  146. }
  147. return w.Opt.Platforms
  148. }
  149. // GCPolicy returns automatic GC Policy
  150. func (w *Worker) GCPolicy() []client.PruneInfo {
  151. return w.Opt.GCPolicy
  152. }
  153. // ContentStore returns content store
  154. func (w *Worker) ContentStore() content.Store {
  155. return w.Opt.ContentStore
  156. }
  157. // MetadataStore returns the metadata store
  158. func (w *Worker) MetadataStore() *metadata.Store {
  159. return w.Opt.MetadataStore
  160. }
  161. // LoadRef loads a reference by ID
  162. func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.ImmutableRef, error) {
  163. var opts []cache.RefOption
  164. if hidden {
  165. opts = append(opts, cache.NoUpdateLastUsed)
  166. }
  167. return w.CacheManager().Get(ctx, id, opts...)
  168. }
  169. // ResolveOp converts a LLB vertex into a LLB operation
  170. func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
  171. if baseOp, ok := v.Sys().(*pb.Op); ok {
  172. // TODO do we need to pass a value here? Where should it come from? https://github.com/moby/buildkit/commit/b3cf7c43cfefdfd7a945002c0e76b54e346ab6cf
  173. var parallelism *semaphore.Weighted
  174. switch op := baseOp.Op.(type) {
  175. case *pb.Op_Source:
  176. return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, parallelism, sm, w)
  177. case *pb.Op_Exec:
  178. return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager(), parallelism, sm, w.Opt.MetadataStore, w.Executor(), w)
  179. case *pb.Op_File:
  180. return ops.NewFileOp(v, op, w.CacheManager(), parallelism, w.Opt.MetadataStore, w)
  181. case *pb.Op_Build:
  182. return ops.NewBuildOp(v, op, s, w)
  183. }
  184. }
  185. return nil, errors.Errorf("could not resolve %v", v)
  186. }
  187. // ResolveImageConfig returns image config for an image
  188. func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
  189. return w.ImageSource.ResolveImageConfig(ctx, ref, opt, sm, g)
  190. }
  191. // DiskUsage returns disk usage report
  192. func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
  193. return w.CacheManager().DiskUsage(ctx, opt)
  194. }
  195. // Prune deletes reclaimable build cache
  196. func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error {
  197. return w.CacheManager().Prune(ctx, ch, info...)
  198. }
  199. // Exporter returns exporter by name
  200. func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, error) {
  201. switch name {
  202. case "moby":
  203. return w.Opt.Exporter, nil
  204. case client.ExporterLocal:
  205. return localexporter.New(localexporter.Opt{
  206. SessionManager: sm,
  207. })
  208. case client.ExporterTar:
  209. return tarexporter.New(tarexporter.Opt{
  210. SessionManager: sm,
  211. })
  212. default:
  213. return nil, errors.Errorf("exporter %q could not be found", name)
  214. }
  215. }
  216. // GetRemote returns a remote snapshot reference for a local one
  217. func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool, _ compression.Type, _ session.Group) (*solver.Remote, error) {
  218. var diffIDs []layer.DiffID
  219. var err error
  220. if !createIfNeeded {
  221. diffIDs, err = w.Layers.GetDiffIDs(ctx, ref.ID())
  222. if err != nil {
  223. return nil, err
  224. }
  225. } else {
  226. if err := ref.Finalize(ctx, true); err != nil {
  227. return nil, err
  228. }
  229. diffIDs, err = w.Layers.EnsureLayer(ctx, ref.ID())
  230. if err != nil {
  231. return nil, err
  232. }
  233. }
  234. descriptors := make([]ocispec.Descriptor, len(diffIDs))
  235. for i, dgst := range diffIDs {
  236. descriptors[i] = ocispec.Descriptor{
  237. MediaType: images.MediaTypeDockerSchema2Layer,
  238. Digest: digest.Digest(dgst),
  239. Size: -1,
  240. }
  241. }
  242. return &solver.Remote{
  243. Descriptors: descriptors,
  244. Provider: &emptyProvider{},
  245. }, nil
  246. }
  247. // PruneCacheMounts removes the current cache snapshots for specified IDs
  248. func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
  249. mu := mounts.CacheMountsLocker()
  250. mu.Lock()
  251. defer mu.Unlock()
  252. for _, id := range ids {
  253. id = "cache-dir:" + id
  254. sis, err := w.Opt.MetadataStore.Search(id)
  255. if err != nil {
  256. return err
  257. }
  258. for _, si := range sis {
  259. for _, k := range si.Indexes() {
  260. if k == id || strings.HasPrefix(k, id+":") {
  261. if siCached := w.CacheManager().Metadata(si.ID()); siCached != nil {
  262. si = siCached
  263. }
  264. if err := cache.CachePolicyDefault(si); err != nil {
  265. return err
  266. }
  267. si.Queue(func(b *bolt.Bucket) error {
  268. return si.SetValue(b, k, nil)
  269. })
  270. if err := si.Commit(); err != nil {
  271. return err
  272. }
  273. // if ref is unused try to clean it up right away by releasing it
  274. if mref, err := w.CacheManager().GetMutable(ctx, si.ID()); err == nil {
  275. go mref.Release(context.TODO())
  276. }
  277. break
  278. }
  279. }
  280. }
  281. }
  282. mounts.ClearActiveCacheMounts()
  283. return nil
  284. }
  285. func (w *Worker) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
  286. var parent cache.ImmutableRef
  287. if len(diffIDs) > 1 {
  288. var err error
  289. parent, err = w.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
  290. if err != nil {
  291. return nil, err
  292. }
  293. defer parent.Release(context.TODO())
  294. }
  295. return w.CacheManager().GetByBlob(context.TODO(), ocispec.Descriptor{
  296. Annotations: map[string]string{
  297. "containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
  298. },
  299. }, parent, opts...)
  300. }
  301. // FromRemote converts a remote snapshot reference to a local one
  302. func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
  303. rootfs, err := getLayers(ctx, remote.Descriptors)
  304. if err != nil {
  305. return nil, err
  306. }
  307. layers := make([]xfer.DownloadDescriptor, 0, len(rootfs))
  308. for _, l := range rootfs {
  309. // ongoing.add(desc)
  310. layers = append(layers, &layerDescriptor{
  311. desc: l.Blob,
  312. diffID: layer.DiffID(l.Diff.Digest),
  313. provider: remote.Provider,
  314. w: w,
  315. pctx: ctx,
  316. })
  317. }
  318. defer func() {
  319. for _, l := range rootfs {
  320. w.ContentStore().Delete(context.TODO(), l.Blob.Digest)
  321. }
  322. }()
  323. r := image.NewRootFS()
  324. rootFS, release, err := w.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, &discardProgress{})
  325. if err != nil {
  326. return nil, err
  327. }
  328. defer release()
  329. if len(rootFS.DiffIDs) != len(layers) {
  330. return nil, errors.Errorf("invalid layer count mismatch %d vs %d", len(rootFS.DiffIDs), len(layers))
  331. }
  332. for i := range rootFS.DiffIDs {
  333. tm := time.Now()
  334. if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok {
  335. if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil {
  336. return nil, err
  337. }
  338. }
  339. descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest)
  340. if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok {
  341. descr = v
  342. }
  343. ref, err := w.getRef(ctx, rootFS.DiffIDs[:i+1], cache.WithDescription(descr), cache.WithCreationTime(tm))
  344. if err != nil {
  345. return nil, err
  346. }
  347. if i == len(remote.Descriptors)-1 {
  348. return ref, nil
  349. }
  350. defer ref.Release(context.TODO())
  351. }
  352. return nil, errors.Errorf("unreachable")
  353. }
  354. // Executor returns executor.Executor for running processes
  355. func (w *Worker) Executor() executor.Executor {
  356. return w.Opt.Executor
  357. }
  358. // CacheManager returns cache.Manager for accessing local storage
  359. func (w *Worker) CacheManager() cache.Manager {
  360. return w.Opt.CacheManager
  361. }
  362. type discardProgress struct{}
  363. func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
  364. return nil
  365. }
  366. // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
  367. type layerDescriptor struct {
  368. provider content.Provider
  369. desc ocispec.Descriptor
  370. diffID layer.DiffID
  371. // ref ctdreference.Spec
  372. w *Worker
  373. pctx context.Context
  374. }
  375. func (ld *layerDescriptor) Key() string {
  376. return "v2:" + ld.desc.Digest.String()
  377. }
  378. func (ld *layerDescriptor) ID() string {
  379. return ld.desc.Digest.String()
  380. }
  381. func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
  382. return ld.diffID, nil
  383. }
  384. func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
  385. done := oneOffProgress(ld.pctx, fmt.Sprintf("pulling %s", ld.desc.Digest))
  386. // TODO should this write output to progressOutput? Or use something similar to loggerFromContext()? see https://github.com/moby/buildkit/commit/aa29e7729464f3c2a773e27795e584023c751cb8
  387. discardLogs := func(_ []byte) {}
  388. if err := contentutil.Copy(ctx, ld.w.ContentStore(), ld.provider, ld.desc, discardLogs); err != nil {
  389. return nil, 0, done(err)
  390. }
  391. _ = done(nil)
  392. ra, err := ld.w.ContentStore().ReaderAt(ctx, ld.desc)
  393. if err != nil {
  394. return nil, 0, err
  395. }
  396. return io.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
  397. }
  398. func (ld *layerDescriptor) Close() {
  399. // ld.is.ContentStore().Delete(context.TODO(), ld.desc.Digest)
  400. }
  401. func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
  402. // Cache mapping from this layer's DiffID to the blobsum
  403. ld.w.V2MetadataService.Add(diffID, distmetadata.V2Metadata{Digest: ld.desc.Digest})
  404. }
  405. func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
  406. layers := make([]rootfs.Layer, len(descs))
  407. for i, desc := range descs {
  408. diffIDStr := desc.Annotations["containerd.io/uncompressed"]
  409. if diffIDStr == "" {
  410. return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
  411. }
  412. diffID, err := digest.Parse(diffIDStr)
  413. if err != nil {
  414. return nil, err
  415. }
  416. layers[i].Diff = ocispec.Descriptor{
  417. MediaType: ocispec.MediaTypeImageLayer,
  418. Digest: diffID,
  419. }
  420. layers[i].Blob = ocispec.Descriptor{
  421. MediaType: desc.MediaType,
  422. Digest: desc.Digest,
  423. Size: desc.Size,
  424. }
  425. }
  426. return layers, nil
  427. }
  428. func oneOffProgress(ctx context.Context, id string) func(err error) error {
  429. pw, _, _ := progress.FromContext(ctx)
  430. now := time.Now()
  431. st := progress.Status{
  432. Started: &now,
  433. }
  434. _ = pw.Write(id, st)
  435. return func(err error) error {
  436. // TODO: set error on status
  437. now := time.Now()
  438. st.Completed = &now
  439. _ = pw.Write(id, st)
  440. _ = pw.Close()
  441. return err
  442. }
  443. }
  444. type emptyProvider struct {
  445. }
  446. func (p *emptyProvider) ReaderAt(ctx context.Context, dec ocispec.Descriptor) (content.ReaderAt, error) {
  447. return nil, errors.Errorf("ReaderAt not implemented for empty provider")
  448. }