worker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. package worker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. nethttp "net/http"
  8. "runtime"
  9. "strings"
  10. "time"
  11. "github.com/containerd/containerd/content"
  12. "github.com/containerd/containerd/images"
  13. "github.com/containerd/containerd/platforms"
  14. "github.com/containerd/containerd/rootfs"
  15. "github.com/docker/docker/distribution"
  16. distmetadata "github.com/docker/docker/distribution/metadata"
  17. "github.com/docker/docker/distribution/xfer"
  18. "github.com/docker/docker/image"
  19. "github.com/docker/docker/layer"
  20. pkgprogress "github.com/docker/docker/pkg/progress"
  21. "github.com/moby/buildkit/cache"
  22. "github.com/moby/buildkit/cache/metadata"
  23. "github.com/moby/buildkit/client"
  24. "github.com/moby/buildkit/client/llb"
  25. "github.com/moby/buildkit/executor"
  26. "github.com/moby/buildkit/exporter"
  27. localexporter "github.com/moby/buildkit/exporter/local"
  28. tarexporter "github.com/moby/buildkit/exporter/tar"
  29. "github.com/moby/buildkit/frontend"
  30. "github.com/moby/buildkit/session"
  31. "github.com/moby/buildkit/snapshot"
  32. "github.com/moby/buildkit/solver"
  33. "github.com/moby/buildkit/solver/llbsolver/ops"
  34. "github.com/moby/buildkit/solver/pb"
  35. "github.com/moby/buildkit/source"
  36. "github.com/moby/buildkit/source/git"
  37. "github.com/moby/buildkit/source/http"
  38. "github.com/moby/buildkit/source/local"
  39. "github.com/moby/buildkit/util/binfmt_misc"
  40. "github.com/moby/buildkit/util/contentutil"
  41. "github.com/moby/buildkit/util/progress"
  42. digest "github.com/opencontainers/go-digest"
  43. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  44. "github.com/pkg/errors"
  45. "github.com/sirupsen/logrus"
  46. bolt "go.etcd.io/bbolt"
  47. )
  48. const labelCreatedAt = "buildkit/createdat"
  49. // LayerAccess provides access to a moby layer from a snapshot
  50. type LayerAccess interface {
  51. GetDiffIDs(ctx context.Context, key string) ([]layer.DiffID, error)
  52. EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error)
  53. }
  54. // Opt defines a structure for creating a worker.
  55. type Opt struct {
  56. ID string
  57. Labels map[string]string
  58. GCPolicy []client.PruneInfo
  59. MetadataStore *metadata.Store
  60. Executor executor.Executor
  61. Snapshotter snapshot.Snapshotter
  62. ContentStore content.Store
  63. CacheManager cache.Manager
  64. ImageSource source.Source
  65. DownloadManager distribution.RootFSDownloadManager
  66. V2MetadataService distmetadata.V2MetadataService
  67. Transport nethttp.RoundTripper
  68. Exporter exporter.Exporter
  69. Layers LayerAccess
  70. Platforms []ocispec.Platform
  71. }
  72. // Worker is a local worker instance with dedicated snapshotter, cache, and so on.
  73. // TODO: s/Worker/OpWorker/g ?
  74. type Worker struct {
  75. Opt
  76. SourceManager *source.Manager
  77. }
  78. // NewWorker instantiates a local worker
  79. func NewWorker(opt Opt) (*Worker, error) {
  80. sm, err := source.NewManager()
  81. if err != nil {
  82. return nil, err
  83. }
  84. cm := opt.CacheManager
  85. sm.Register(opt.ImageSource)
  86. gs, err := git.NewSource(git.Opt{
  87. CacheAccessor: cm,
  88. MetadataStore: opt.MetadataStore,
  89. })
  90. if err == nil {
  91. sm.Register(gs)
  92. } else {
  93. logrus.Warnf("Could not register builder git source: %s", err)
  94. }
  95. hs, err := http.NewSource(http.Opt{
  96. CacheAccessor: cm,
  97. MetadataStore: opt.MetadataStore,
  98. Transport: opt.Transport,
  99. })
  100. if err == nil {
  101. sm.Register(hs)
  102. } else {
  103. logrus.Warnf("Could not register builder http source: %s", err)
  104. }
  105. ss, err := local.NewSource(local.Opt{
  106. CacheAccessor: cm,
  107. MetadataStore: opt.MetadataStore,
  108. })
  109. if err == nil {
  110. sm.Register(ss)
  111. } else {
  112. logrus.Warnf("Could not register builder local source: %s", err)
  113. }
  114. return &Worker{
  115. Opt: opt,
  116. SourceManager: sm,
  117. }, nil
  118. }
  119. // ID returns worker ID
  120. func (w *Worker) ID() string {
  121. return w.Opt.ID
  122. }
  123. // Labels returns map of all worker labels
  124. func (w *Worker) Labels() map[string]string {
  125. return w.Opt.Labels
  126. }
  127. // Platforms returns one or more platforms supported by the image.
  128. func (w *Worker) Platforms(noCache bool) []ocispec.Platform {
  129. if noCache {
  130. pm := make(map[string]struct{}, len(w.Opt.Platforms))
  131. for _, p := range w.Opt.Platforms {
  132. pm[platforms.Format(p)] = struct{}{}
  133. }
  134. for _, p := range binfmt_misc.SupportedPlatforms(noCache) {
  135. if _, ok := pm[p]; !ok {
  136. pp, _ := platforms.Parse(p)
  137. w.Opt.Platforms = append(w.Opt.Platforms, pp)
  138. }
  139. }
  140. }
  141. if len(w.Opt.Platforms) == 0 {
  142. return []ocispec.Platform{platforms.DefaultSpec()}
  143. }
  144. return w.Opt.Platforms
  145. }
  146. // GCPolicy returns automatic GC Policy
  147. func (w *Worker) GCPolicy() []client.PruneInfo {
  148. return w.Opt.GCPolicy
  149. }
  150. // ContentStore returns content store
  151. func (w *Worker) ContentStore() content.Store {
  152. return w.Opt.ContentStore
  153. }
  154. // LoadRef loads a reference by ID
  155. func (w *Worker) LoadRef(id string, hidden bool) (cache.ImmutableRef, error) {
  156. var opts []cache.RefOption
  157. if hidden {
  158. opts = append(opts, cache.NoUpdateLastUsed)
  159. }
  160. return w.CacheManager.Get(context.TODO(), id, opts...)
  161. }
  162. // ResolveOp converts a LLB vertex into a LLB operation
  163. func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) {
  164. if baseOp, ok := v.Sys().(*pb.Op); ok {
  165. switch op := baseOp.Op.(type) {
  166. case *pb.Op_Source:
  167. return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
  168. case *pb.Op_Exec:
  169. return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager, sm, w.MetadataStore, w.Executor, w)
  170. case *pb.Op_File:
  171. return ops.NewFileOp(v, op, w.CacheManager, w.MetadataStore, w)
  172. case *pb.Op_Build:
  173. return ops.NewBuildOp(v, op, s, w)
  174. }
  175. }
  176. return nil, errors.Errorf("could not resolve %v", v)
  177. }
  178. // ResolveImageConfig returns image config for an image
  179. func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
  180. // ImageSource is typically source/containerimage
  181. resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
  182. if !ok {
  183. return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
  184. }
  185. return resolveImageConfig.ResolveImageConfig(ctx, ref, opt, sm)
  186. }
  187. // Exec executes a process directly on a worker
  188. func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
  189. active, err := w.CacheManager.New(ctx, rootFS)
  190. if err != nil {
  191. return err
  192. }
  193. defer active.Release(context.TODO())
  194. return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
  195. }
  196. // DiskUsage returns disk usage report
  197. func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
  198. return w.CacheManager.DiskUsage(ctx, opt)
  199. }
  200. // Prune deletes reclaimable build cache
  201. func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error {
  202. return w.CacheManager.Prune(ctx, ch, info...)
  203. }
  204. // Exporter returns exporter by name
  205. func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, error) {
  206. switch name {
  207. case "moby":
  208. return w.Opt.Exporter, nil
  209. case client.ExporterLocal:
  210. return localexporter.New(localexporter.Opt{
  211. SessionManager: sm,
  212. })
  213. case client.ExporterTar:
  214. return tarexporter.New(tarexporter.Opt{
  215. SessionManager: sm,
  216. })
  217. default:
  218. return nil, errors.Errorf("exporter %q could not be found", name)
  219. }
  220. }
  221. // GetRemote returns a remote snapshot reference for a local one
  222. func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
  223. var diffIDs []layer.DiffID
  224. var err error
  225. if !createIfNeeded {
  226. diffIDs, err = w.Layers.GetDiffIDs(ctx, ref.ID())
  227. if err != nil {
  228. return nil, err
  229. }
  230. } else {
  231. if err := ref.Finalize(ctx, true); err != nil {
  232. return nil, err
  233. }
  234. diffIDs, err = w.Layers.EnsureLayer(ctx, ref.ID())
  235. if err != nil {
  236. return nil, err
  237. }
  238. }
  239. descriptors := make([]ocispec.Descriptor, len(diffIDs))
  240. for i, dgst := range diffIDs {
  241. descriptors[i] = ocispec.Descriptor{
  242. MediaType: images.MediaTypeDockerSchema2Layer,
  243. Digest: digest.Digest(dgst),
  244. Size: -1,
  245. }
  246. }
  247. return &solver.Remote{
  248. Descriptors: descriptors,
  249. Provider: &emptyProvider{},
  250. }, nil
  251. }
  252. // PruneCacheMounts removes the current cache snapshots for specified IDs
  253. func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
  254. mu := ops.CacheMountsLocker()
  255. mu.Lock()
  256. defer mu.Unlock()
  257. for _, id := range ids {
  258. id = "cache-dir:" + id
  259. sis, err := w.MetadataStore.Search(id)
  260. if err != nil {
  261. return err
  262. }
  263. for _, si := range sis {
  264. for _, k := range si.Indexes() {
  265. if k == id || strings.HasPrefix(k, id+":") {
  266. if siCached := w.CacheManager.Metadata(si.ID()); siCached != nil {
  267. si = siCached
  268. }
  269. if err := cache.CachePolicyDefault(si); err != nil {
  270. return err
  271. }
  272. si.Queue(func(b *bolt.Bucket) error {
  273. return si.SetValue(b, k, nil)
  274. })
  275. if err := si.Commit(); err != nil {
  276. return err
  277. }
  278. // if ref is unused try to clean it up right away by releasing it
  279. if mref, err := w.CacheManager.GetMutable(ctx, si.ID()); err == nil {
  280. go mref.Release(context.TODO())
  281. }
  282. break
  283. }
  284. }
  285. }
  286. }
  287. ops.ClearActiveCacheMounts()
  288. return nil
  289. }
  290. func (w *Worker) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
  291. var parent cache.ImmutableRef
  292. if len(diffIDs) > 1 {
  293. var err error
  294. parent, err = w.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
  295. if err != nil {
  296. return nil, err
  297. }
  298. defer parent.Release(context.TODO())
  299. }
  300. return w.CacheManager.GetByBlob(context.TODO(), ocispec.Descriptor{
  301. Annotations: map[string]string{
  302. "containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
  303. },
  304. }, parent, opts...)
  305. }
  306. // FromRemote converts a remote snapshot reference to a local one
  307. func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
  308. rootfs, err := getLayers(ctx, remote.Descriptors)
  309. if err != nil {
  310. return nil, err
  311. }
  312. layers := make([]xfer.DownloadDescriptor, 0, len(rootfs))
  313. for _, l := range rootfs {
  314. // ongoing.add(desc)
  315. layers = append(layers, &layerDescriptor{
  316. desc: l.Blob,
  317. diffID: layer.DiffID(l.Diff.Digest),
  318. provider: remote.Provider,
  319. w: w,
  320. pctx: ctx,
  321. })
  322. }
  323. defer func() {
  324. for _, l := range rootfs {
  325. w.ContentStore().Delete(context.TODO(), l.Blob.Digest)
  326. }
  327. }()
  328. r := image.NewRootFS()
  329. rootFS, release, err := w.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, &discardProgress{})
  330. if err != nil {
  331. return nil, err
  332. }
  333. defer release()
  334. if len(rootFS.DiffIDs) != len(layers) {
  335. return nil, errors.Errorf("invalid layer count mismatch %d vs %d", len(rootFS.DiffIDs), len(layers))
  336. }
  337. for i := range rootFS.DiffIDs {
  338. tm := time.Now()
  339. if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok {
  340. if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil {
  341. return nil, err
  342. }
  343. }
  344. descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest)
  345. if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok {
  346. descr = v
  347. }
  348. ref, err := w.getRef(ctx, rootFS.DiffIDs[:i+1], cache.WithDescription(descr), cache.WithCreationTime(tm))
  349. if err != nil {
  350. return nil, err
  351. }
  352. if i == len(remote.Descriptors)-1 {
  353. return ref, nil
  354. }
  355. defer ref.Release(context.TODO())
  356. }
  357. return nil, errors.Errorf("unreachable")
  358. }
  359. type discardProgress struct{}
  360. func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
  361. return nil
  362. }
  363. // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
  364. type layerDescriptor struct {
  365. provider content.Provider
  366. desc ocispec.Descriptor
  367. diffID layer.DiffID
  368. // ref ctdreference.Spec
  369. w *Worker
  370. pctx context.Context
  371. }
  372. func (ld *layerDescriptor) Key() string {
  373. return "v2:" + ld.desc.Digest.String()
  374. }
  375. func (ld *layerDescriptor) ID() string {
  376. return ld.desc.Digest.String()
  377. }
  378. func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
  379. return ld.diffID, nil
  380. }
  381. func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
  382. done := oneOffProgress(ld.pctx, fmt.Sprintf("pulling %s", ld.desc.Digest))
  383. if err := contentutil.Copy(ctx, ld.w.ContentStore(), ld.provider, ld.desc); err != nil {
  384. return nil, 0, done(err)
  385. }
  386. _ = done(nil)
  387. ra, err := ld.w.ContentStore().ReaderAt(ctx, ld.desc)
  388. if err != nil {
  389. return nil, 0, err
  390. }
  391. return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
  392. }
  393. func (ld *layerDescriptor) Close() {
  394. // ld.is.ContentStore().Delete(context.TODO(), ld.desc.Digest)
  395. }
  396. func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
  397. // Cache mapping from this layer's DiffID to the blobsum
  398. ld.w.V2MetadataService.Add(diffID, distmetadata.V2Metadata{Digest: ld.desc.Digest})
  399. }
  400. func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
  401. layers := make([]rootfs.Layer, len(descs))
  402. for i, desc := range descs {
  403. diffIDStr := desc.Annotations["containerd.io/uncompressed"]
  404. if diffIDStr == "" {
  405. return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
  406. }
  407. diffID, err := digest.Parse(diffIDStr)
  408. if err != nil {
  409. return nil, err
  410. }
  411. layers[i].Diff = ocispec.Descriptor{
  412. MediaType: ocispec.MediaTypeImageLayer,
  413. Digest: diffID,
  414. }
  415. layers[i].Blob = ocispec.Descriptor{
  416. MediaType: desc.MediaType,
  417. Digest: desc.Digest,
  418. Size: desc.Size,
  419. }
  420. }
  421. return layers, nil
  422. }
  423. func oneOffProgress(ctx context.Context, id string) func(err error) error {
  424. pw, _, _ := progress.FromContext(ctx)
  425. now := time.Now()
  426. st := progress.Status{
  427. Started: &now,
  428. }
  429. _ = pw.Write(id, st)
  430. return func(err error) error {
  431. // TODO: set error on status
  432. now := time.Now()
  433. st.Completed = &now
  434. _ = pw.Write(id, st)
  435. _ = pw.Close()
  436. return err
  437. }
  438. }
  439. type resolveImageConfig interface {
  440. ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
  441. }
  442. type emptyProvider struct {
  443. }
  444. func (p *emptyProvider) ReaderAt(ctx context.Context, dec ocispec.Descriptor) (content.ReaderAt, error) {
  445. return nil, errors.Errorf("ReaderAt not implemented for empty provider")
  446. }