worker.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package worker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. nethttp "net/http"
  7. "time"
  8. "github.com/containerd/containerd/content"
  9. "github.com/containerd/containerd/images"
  10. "github.com/containerd/containerd/platforms"
  11. "github.com/containerd/containerd/rootfs"
  12. "github.com/containerd/log"
  13. "github.com/docker/docker/builder/builder-next/adapters/containerimage"
  14. mobyexporter "github.com/docker/docker/builder/builder-next/exporter"
  15. distmetadata "github.com/docker/docker/distribution/metadata"
  16. "github.com/docker/docker/distribution/xfer"
  17. "github.com/docker/docker/image"
  18. "github.com/docker/docker/internal/mod"
  19. "github.com/docker/docker/layer"
  20. pkgprogress "github.com/docker/docker/pkg/progress"
  21. "github.com/moby/buildkit/cache"
  22. cacheconfig "github.com/moby/buildkit/cache/config"
  23. "github.com/moby/buildkit/client"
  24. "github.com/moby/buildkit/client/llb"
  25. "github.com/moby/buildkit/executor"
  26. "github.com/moby/buildkit/exporter"
  27. localexporter "github.com/moby/buildkit/exporter/local"
  28. tarexporter "github.com/moby/buildkit/exporter/tar"
  29. "github.com/moby/buildkit/frontend"
  30. "github.com/moby/buildkit/session"
  31. "github.com/moby/buildkit/snapshot"
  32. containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
  33. "github.com/moby/buildkit/solver"
  34. "github.com/moby/buildkit/solver/llbsolver/mounts"
  35. "github.com/moby/buildkit/solver/llbsolver/ops"
  36. "github.com/moby/buildkit/solver/pb"
  37. "github.com/moby/buildkit/source"
  38. "github.com/moby/buildkit/source/git"
  39. "github.com/moby/buildkit/source/http"
  40. "github.com/moby/buildkit/source/local"
  41. "github.com/moby/buildkit/util/archutil"
  42. "github.com/moby/buildkit/util/contentutil"
  43. "github.com/moby/buildkit/util/leaseutil"
  44. "github.com/moby/buildkit/util/progress"
  45. "github.com/moby/buildkit/version"
  46. "github.com/opencontainers/go-digest"
  47. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  48. "github.com/pkg/errors"
  49. "golang.org/x/sync/semaphore"
  50. )
  51. func init() {
  52. if v := mod.Version("github.com/moby/buildkit"); v != "" {
  53. version.Version = v
  54. }
  55. }
  56. const labelCreatedAt = "buildkit/createdat"
  57. // LayerAccess provides access to a moby layer from a snapshot
  58. type LayerAccess interface {
  59. GetDiffIDs(ctx context.Context, key string) ([]layer.DiffID, error)
  60. EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error)
  61. }
  62. // Opt defines a structure for creating a worker.
  63. type Opt struct {
  64. ID string
  65. Labels map[string]string
  66. GCPolicy []client.PruneInfo
  67. Executor executor.Executor
  68. Snapshotter snapshot.Snapshotter
  69. ContentStore *containerdsnapshot.Store
  70. CacheManager cache.Manager
  71. LeaseManager *leaseutil.Manager
  72. ImageSource *containerimage.Source
  73. DownloadManager *xfer.LayerDownloadManager
  74. V2MetadataService distmetadata.V2MetadataService
  75. Transport nethttp.RoundTripper
  76. Exporter exporter.Exporter
  77. Layers LayerAccess
  78. Platforms []ocispec.Platform
  79. }
  80. // Worker is a local worker instance with dedicated snapshotter, cache, and so on.
  81. // TODO: s/Worker/OpWorker/g ?
  82. type Worker struct {
  83. Opt
  84. SourceManager *source.Manager
  85. }
  86. var _ interface {
  87. GetRemotes(context.Context, cache.ImmutableRef, bool, cacheconfig.RefConfig, bool, session.Group) ([]*solver.Remote, error)
  88. } = &Worker{}
  89. // NewWorker instantiates a local worker
  90. func NewWorker(opt Opt) (*Worker, error) {
  91. sm, err := source.NewManager()
  92. if err != nil {
  93. return nil, err
  94. }
  95. cm := opt.CacheManager
  96. sm.Register(opt.ImageSource)
  97. gs, err := git.NewSource(git.Opt{
  98. CacheAccessor: cm,
  99. })
  100. if err == nil {
  101. sm.Register(gs)
  102. } else {
  103. log.G(context.TODO()).Warnf("Could not register builder git source: %s", err)
  104. }
  105. hs, err := http.NewSource(http.Opt{
  106. CacheAccessor: cm,
  107. Transport: opt.Transport,
  108. })
  109. if err == nil {
  110. sm.Register(hs)
  111. } else {
  112. log.G(context.TODO()).Warnf("Could not register builder http source: %s", err)
  113. }
  114. ss, err := local.NewSource(local.Opt{
  115. CacheAccessor: cm,
  116. })
  117. if err == nil {
  118. sm.Register(ss)
  119. } else {
  120. log.G(context.TODO()).Warnf("Could not register builder local source: %s", err)
  121. }
  122. return &Worker{
  123. Opt: opt,
  124. SourceManager: sm,
  125. }, nil
  126. }
  127. // ID returns worker ID
  128. func (w *Worker) ID() string {
  129. return w.Opt.ID
  130. }
  131. // Labels returns map of all worker labels
  132. func (w *Worker) Labels() map[string]string {
  133. return w.Opt.Labels
  134. }
  135. // Platforms returns one or more platforms supported by the image.
  136. func (w *Worker) Platforms(noCache bool) []ocispec.Platform {
  137. if noCache {
  138. pm := make(map[string]struct{}, len(w.Opt.Platforms))
  139. for _, p := range w.Opt.Platforms {
  140. pm[platforms.Format(p)] = struct{}{}
  141. }
  142. for _, p := range archutil.SupportedPlatforms(noCache) {
  143. if _, ok := pm[platforms.Format(p)]; !ok {
  144. w.Opt.Platforms = append(w.Opt.Platforms, p)
  145. }
  146. }
  147. }
  148. if len(w.Opt.Platforms) == 0 {
  149. return []ocispec.Platform{platforms.DefaultSpec()}
  150. }
  151. return w.Opt.Platforms
  152. }
  153. // GCPolicy returns automatic GC Policy
  154. func (w *Worker) GCPolicy() []client.PruneInfo {
  155. return w.Opt.GCPolicy
  156. }
  157. // BuildkitVersion returns BuildKit version
  158. func (w *Worker) BuildkitVersion() client.BuildkitVersion {
  159. return client.BuildkitVersion{
  160. Package: version.Package,
  161. Version: version.Version + "-moby",
  162. Revision: version.Revision,
  163. }
  164. }
  165. // Close closes the worker and releases all resources
  166. func (w *Worker) Close() error {
  167. return nil
  168. }
  169. // ContentStore returns the wrapped content store
  170. func (w *Worker) ContentStore() *containerdsnapshot.Store {
  171. return w.Opt.ContentStore
  172. }
  173. // LeaseManager returns the wrapped lease manager
  174. func (w *Worker) LeaseManager() *leaseutil.Manager {
  175. return w.Opt.LeaseManager
  176. }
  177. // LoadRef loads a reference by ID
  178. func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.ImmutableRef, error) {
  179. var opts []cache.RefOption
  180. if hidden {
  181. opts = append(opts, cache.NoUpdateLastUsed)
  182. }
  183. if id == "" {
  184. // results can have nil refs if they are optimized out to be equal to scratch,
  185. // i.e. Diff(A,A) == scratch
  186. return nil, nil
  187. }
  188. return w.CacheManager().Get(ctx, id, nil, opts...)
  189. }
  190. // ResolveOp converts a LLB vertex into a LLB operation
  191. func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
  192. if baseOp, ok := v.Sys().(*pb.Op); ok {
  193. // TODO do we need to pass a value here? Where should it come from? https://github.com/moby/buildkit/commit/b3cf7c43cfefdfd7a945002c0e76b54e346ab6cf
  194. var parallelism *semaphore.Weighted
  195. switch op := baseOp.Op.(type) {
  196. case *pb.Op_Source:
  197. return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, parallelism, sm, w)
  198. case *pb.Op_Exec:
  199. return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager(), parallelism, sm, w.Executor(), w)
  200. case *pb.Op_File:
  201. return ops.NewFileOp(v, op, w.CacheManager(), parallelism, w)
  202. case *pb.Op_Build:
  203. return ops.NewBuildOp(v, op, s, w)
  204. case *pb.Op_Merge:
  205. return ops.NewMergeOp(v, op, w)
  206. case *pb.Op_Diff:
  207. return ops.NewDiffOp(v, op, w)
  208. }
  209. }
  210. return nil, errors.Errorf("could not resolve %v", v)
  211. }
  212. // ResolveImageConfig returns image config for an image
  213. func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (string, digest.Digest, []byte, error) {
  214. return w.ImageSource.ResolveImageConfig(ctx, ref, opt, sm, g)
  215. }
  216. // DiskUsage returns disk usage report
  217. func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
  218. return w.CacheManager().DiskUsage(ctx, opt)
  219. }
  220. // Prune deletes reclaimable build cache
  221. func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error {
  222. return w.CacheManager().Prune(ctx, ch, info...)
  223. }
  224. // Exporter returns exporter by name
  225. func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, error) {
  226. switch name {
  227. case mobyexporter.Moby:
  228. return w.Opt.Exporter, nil
  229. case client.ExporterLocal:
  230. return localexporter.New(localexporter.Opt{
  231. SessionManager: sm,
  232. })
  233. case client.ExporterTar:
  234. return tarexporter.New(tarexporter.Opt{
  235. SessionManager: sm,
  236. })
  237. default:
  238. return nil, errors.Errorf("exporter %q could not be found", name)
  239. }
  240. }
  241. // GetRemotes returns the remote snapshot references given a local reference
  242. func (w *Worker) GetRemotes(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool, _ cacheconfig.RefConfig, all bool, s session.Group) ([]*solver.Remote, error) {
  243. if ref == nil {
  244. return nil, nil
  245. }
  246. var diffIDs []layer.DiffID
  247. var err error
  248. if !createIfNeeded {
  249. diffIDs, err = w.Layers.GetDiffIDs(ctx, ref.ID())
  250. if err != nil {
  251. return nil, err
  252. }
  253. } else {
  254. if err := ref.Finalize(ctx); err != nil {
  255. return nil, err
  256. }
  257. if err := ref.Extract(ctx, s); err != nil {
  258. return nil, err
  259. }
  260. diffIDs, err = w.Layers.EnsureLayer(ctx, ref.ID())
  261. if err != nil {
  262. return nil, err
  263. }
  264. }
  265. descriptors := make([]ocispec.Descriptor, len(diffIDs))
  266. for i, dgst := range diffIDs {
  267. descriptors[i] = ocispec.Descriptor{
  268. MediaType: images.MediaTypeDockerSchema2Layer,
  269. Digest: digest.Digest(dgst),
  270. Size: -1,
  271. }
  272. }
  273. return []*solver.Remote{{
  274. Descriptors: descriptors,
  275. Provider: &emptyProvider{},
  276. }}, nil
  277. }
  278. // PruneCacheMounts removes the current cache snapshots for specified IDs
  279. func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
  280. mu := mounts.CacheMountsLocker()
  281. mu.Lock()
  282. defer mu.Unlock()
  283. for _, id := range ids {
  284. mds, err := mounts.SearchCacheDir(ctx, w.CacheManager(), id)
  285. if err != nil {
  286. return err
  287. }
  288. for _, md := range mds {
  289. if err := md.SetCachePolicyDefault(); err != nil {
  290. return err
  291. }
  292. if err := md.ClearCacheDirIndex(); err != nil {
  293. return err
  294. }
  295. // if ref is unused try to clean it up right away by releasing it
  296. if mref, err := w.CacheManager().GetMutable(ctx, md.ID()); err == nil {
  297. go mref.Release(context.TODO())
  298. }
  299. }
  300. }
  301. mounts.ClearActiveCacheMounts()
  302. return nil
  303. }
  304. func (w *Worker) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
  305. var parent cache.ImmutableRef
  306. if len(diffIDs) > 1 {
  307. var err error
  308. parent, err = w.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
  309. if err != nil {
  310. return nil, err
  311. }
  312. defer parent.Release(context.TODO())
  313. }
  314. return w.CacheManager().GetByBlob(context.TODO(), ocispec.Descriptor{
  315. Annotations: map[string]string{
  316. "containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
  317. },
  318. }, parent, opts...)
  319. }
  320. // FromRemote converts a remote snapshot reference to a local one
  321. func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
  322. rootfs, err := getLayers(ctx, remote.Descriptors)
  323. if err != nil {
  324. return nil, err
  325. }
  326. layers := make([]xfer.DownloadDescriptor, 0, len(rootfs))
  327. for _, l := range rootfs {
  328. // ongoing.add(desc)
  329. layers = append(layers, &layerDescriptor{
  330. desc: l.Blob,
  331. diffID: layer.DiffID(l.Diff.Digest),
  332. provider: remote.Provider,
  333. w: w,
  334. pctx: ctx,
  335. })
  336. }
  337. defer func() {
  338. for _, l := range rootfs {
  339. w.ContentStore().Delete(context.TODO(), l.Blob.Digest)
  340. }
  341. }()
  342. r := image.NewRootFS()
  343. rootFS, release, err := w.DownloadManager.Download(ctx, *r, layers, &discardProgress{})
  344. if err != nil {
  345. return nil, err
  346. }
  347. defer release()
  348. if len(rootFS.DiffIDs) != len(layers) {
  349. return nil, errors.Errorf("invalid layer count mismatch %d vs %d", len(rootFS.DiffIDs), len(layers))
  350. }
  351. for i := range rootFS.DiffIDs {
  352. tm := time.Now()
  353. if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok {
  354. if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil {
  355. return nil, err
  356. }
  357. }
  358. descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest)
  359. if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok {
  360. descr = v
  361. }
  362. ref, err := w.getRef(ctx, rootFS.DiffIDs[:i+1], cache.WithDescription(descr), cache.WithCreationTime(tm))
  363. if err != nil {
  364. return nil, err
  365. }
  366. if i == len(remote.Descriptors)-1 {
  367. return ref, nil
  368. }
  369. defer ref.Release(context.TODO())
  370. }
  371. return nil, errors.Errorf("unreachable")
  372. }
  373. // Executor returns executor.Executor for running processes
  374. func (w *Worker) Executor() executor.Executor {
  375. return w.Opt.Executor
  376. }
  377. // CacheManager returns cache.Manager for accessing local storage
  378. func (w *Worker) CacheManager() cache.Manager {
  379. return w.Opt.CacheManager
  380. }
  381. type discardProgress struct{}
  382. func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
  383. return nil
  384. }
  385. // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
  386. type layerDescriptor struct {
  387. provider content.Provider
  388. desc ocispec.Descriptor
  389. diffID layer.DiffID
  390. // ref ctdreference.Spec
  391. w *Worker
  392. pctx context.Context
  393. }
  394. func (ld *layerDescriptor) Key() string {
  395. return "v2:" + ld.desc.Digest.String()
  396. }
  397. func (ld *layerDescriptor) ID() string {
  398. return ld.desc.Digest.String()
  399. }
  400. func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
  401. return ld.diffID, nil
  402. }
  403. func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
  404. done := oneOffProgress(ld.pctx, fmt.Sprintf("pulling %s", ld.desc.Digest))
  405. // TODO should this write output to progressOutput? Or use something similar to loggerFromContext()? see https://github.com/moby/buildkit/commit/aa29e7729464f3c2a773e27795e584023c751cb8
  406. discardLogs := func(_ []byte) {}
  407. if err := contentutil.Copy(ctx, ld.w.ContentStore(), ld.provider, ld.desc, "", discardLogs); err != nil {
  408. return nil, 0, done(err)
  409. }
  410. _ = done(nil)
  411. ra, err := ld.w.ContentStore().ReaderAt(ctx, ld.desc)
  412. if err != nil {
  413. return nil, 0, err
  414. }
  415. return io.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
  416. }
  417. func (ld *layerDescriptor) Close() {
  418. // ld.is.ContentStore().Delete(context.TODO(), ld.desc.Digest)
  419. }
  420. func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
  421. // Cache mapping from this layer's DiffID to the blobsum
  422. ld.w.V2MetadataService.Add(diffID, distmetadata.V2Metadata{Digest: ld.desc.Digest})
  423. }
  424. func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
  425. layers := make([]rootfs.Layer, len(descs))
  426. for i, desc := range descs {
  427. diffIDStr := desc.Annotations["containerd.io/uncompressed"]
  428. if diffIDStr == "" {
  429. return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
  430. }
  431. diffID, err := digest.Parse(diffIDStr)
  432. if err != nil {
  433. return nil, err
  434. }
  435. layers[i].Diff = ocispec.Descriptor{
  436. MediaType: ocispec.MediaTypeImageLayer,
  437. Digest: diffID,
  438. }
  439. layers[i].Blob = ocispec.Descriptor{
  440. MediaType: desc.MediaType,
  441. Digest: desc.Digest,
  442. Size: desc.Size,
  443. }
  444. }
  445. return layers, nil
  446. }
  447. func oneOffProgress(ctx context.Context, id string) func(err error) error {
  448. pw, _, _ := progress.NewFromContext(ctx)
  449. now := time.Now()
  450. st := progress.Status{
  451. Started: &now,
  452. }
  453. _ = pw.Write(id, st)
  454. return func(err error) error {
  455. // TODO: set error on status
  456. now := time.Now()
  457. st.Completed = &now
  458. _ = pw.Write(id, st)
  459. _ = pw.Close()
  460. return err
  461. }
  462. }
  463. type emptyProvider struct{}
  464. func (p *emptyProvider) ReaderAt(ctx context.Context, dec ocispec.Descriptor) (content.ReaderAt, error) {
  465. return nil, errors.Errorf("ReaderAt not implemented for empty provider")
  466. }