worker.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package worker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. nethttp "net/http"
  8. "runtime"
  9. "time"
  10. "github.com/containerd/containerd/content"
  11. "github.com/containerd/containerd/platforms"
  12. "github.com/containerd/containerd/rootfs"
  13. "github.com/docker/docker/distribution"
  14. distmetadata "github.com/docker/docker/distribution/metadata"
  15. "github.com/docker/docker/distribution/xfer"
  16. "github.com/docker/docker/image"
  17. "github.com/docker/docker/layer"
  18. pkgprogress "github.com/docker/docker/pkg/progress"
  19. "github.com/moby/buildkit/cache"
  20. "github.com/moby/buildkit/cache/metadata"
  21. "github.com/moby/buildkit/client"
  22. "github.com/moby/buildkit/executor"
  23. "github.com/moby/buildkit/exporter"
  24. "github.com/moby/buildkit/frontend"
  25. "github.com/moby/buildkit/session"
  26. "github.com/moby/buildkit/snapshot"
  27. "github.com/moby/buildkit/solver"
  28. "github.com/moby/buildkit/solver/llbsolver/ops"
  29. "github.com/moby/buildkit/solver/pb"
  30. "github.com/moby/buildkit/source"
  31. "github.com/moby/buildkit/source/git"
  32. "github.com/moby/buildkit/source/http"
  33. "github.com/moby/buildkit/source/local"
  34. "github.com/moby/buildkit/util/contentutil"
  35. "github.com/moby/buildkit/util/progress"
  36. digest "github.com/opencontainers/go-digest"
  37. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  38. "github.com/pkg/errors"
  39. "github.com/sirupsen/logrus"
  40. )
  41. // Opt defines a structure for creating a worker.
  42. type Opt struct {
  43. ID string
  44. Labels map[string]string
  45. SessionManager *session.Manager
  46. MetadataStore *metadata.Store
  47. Executor executor.Executor
  48. Snapshotter snapshot.Snapshotter
  49. ContentStore content.Store
  50. CacheManager cache.Manager
  51. ImageSource source.Source
  52. Exporters map[string]exporter.Exporter
  53. DownloadManager distribution.RootFSDownloadManager
  54. V2MetadataService distmetadata.V2MetadataService
  55. Transport nethttp.RoundTripper
  56. }
  57. // Worker is a local worker instance with dedicated snapshotter, cache, and so on.
  58. // TODO: s/Worker/OpWorker/g ?
  59. type Worker struct {
  60. Opt
  61. SourceManager *source.Manager
  62. }
  63. // NewWorker instantiates a local worker
  64. func NewWorker(opt Opt) (*Worker, error) {
  65. sm, err := source.NewManager()
  66. if err != nil {
  67. return nil, err
  68. }
  69. cm := opt.CacheManager
  70. sm.Register(opt.ImageSource)
  71. gs, err := git.NewSource(git.Opt{
  72. CacheAccessor: cm,
  73. MetadataStore: opt.MetadataStore,
  74. })
  75. if err == nil {
  76. sm.Register(gs)
  77. } else {
  78. logrus.Warnf("Could not register builder git source: %s", err)
  79. }
  80. hs, err := http.NewSource(http.Opt{
  81. CacheAccessor: cm,
  82. MetadataStore: opt.MetadataStore,
  83. Transport: opt.Transport,
  84. })
  85. if err == nil {
  86. sm.Register(hs)
  87. } else {
  88. logrus.Warnf("Could not register builder http source: %s", err)
  89. }
  90. ss, err := local.NewSource(local.Opt{
  91. SessionManager: opt.SessionManager,
  92. CacheAccessor: cm,
  93. MetadataStore: opt.MetadataStore,
  94. })
  95. if err == nil {
  96. sm.Register(ss)
  97. } else {
  98. logrus.Warnf("Could not register builder local source: %s", err)
  99. }
  100. return &Worker{
  101. Opt: opt,
  102. SourceManager: sm,
  103. }, nil
  104. }
  105. // ID returns worker ID
  106. func (w *Worker) ID() string {
  107. return w.Opt.ID
  108. }
  109. // Labels returns map of all worker labels
  110. func (w *Worker) Labels() map[string]string {
  111. return w.Opt.Labels
  112. }
  113. // Platforms returns one or more platforms supported by the image.
  114. func (w *Worker) Platforms() []ocispec.Platform {
  115. // does not handle lcow
  116. return []ocispec.Platform{platforms.DefaultSpec()}
  117. }
  118. // LoadRef loads a reference by ID
  119. func (w *Worker) LoadRef(id string) (cache.ImmutableRef, error) {
  120. return w.CacheManager.Get(context.TODO(), id)
  121. }
  122. // ResolveOp converts a LLB vertex into a LLB operation
  123. func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error) {
  124. if baseOp, ok := v.Sys().(*pb.Op); ok {
  125. switch op := baseOp.Op.(type) {
  126. case *pb.Op_Source:
  127. return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w)
  128. case *pb.Op_Exec:
  129. return ops.NewExecOp(v, op, w.CacheManager, w.MetadataStore, w.Executor, w)
  130. case *pb.Op_Build:
  131. return ops.NewBuildOp(v, op, s, w)
  132. }
  133. }
  134. return nil, errors.Errorf("could not resolve %v", v)
  135. }
  136. // ResolveImageConfig returns image config for an image
  137. func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, platform *ocispec.Platform) (digest.Digest, []byte, error) {
  138. // ImageSource is typically source/containerimage
  139. resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
  140. if !ok {
  141. return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
  142. }
  143. return resolveImageConfig.ResolveImageConfig(ctx, ref, platform)
  144. }
  145. // Exec executes a process directly on a worker
  146. func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
  147. active, err := w.CacheManager.New(ctx, rootFS)
  148. if err != nil {
  149. return err
  150. }
  151. defer active.Release(context.TODO())
  152. return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
  153. }
  154. // DiskUsage returns disk usage report
  155. func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
  156. return w.CacheManager.DiskUsage(ctx, opt)
  157. }
  158. // Prune deletes reclaimable build cache
  159. func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo) error {
  160. return w.CacheManager.Prune(ctx, ch)
  161. }
  162. // Exporter returns exporter by name
  163. func (w *Worker) Exporter(name string) (exporter.Exporter, error) {
  164. exp, ok := w.Exporters[name]
  165. if !ok {
  166. return nil, errors.Errorf("exporter %q could not be found", name)
  167. }
  168. return exp, nil
  169. }
  170. // GetRemote returns a remote snapshot reference for a local one
  171. func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) {
  172. return nil, errors.Errorf("getremote not implemented")
  173. }
  174. // FromRemote converts a remote snapshot reference to a local one
  175. func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
  176. rootfs, err := getLayers(ctx, remote.Descriptors)
  177. if err != nil {
  178. return nil, err
  179. }
  180. layers := make([]xfer.DownloadDescriptor, 0, len(rootfs))
  181. for _, l := range rootfs {
  182. // ongoing.add(desc)
  183. layers = append(layers, &layerDescriptor{
  184. desc: l.Blob,
  185. diffID: layer.DiffID(l.Diff.Digest),
  186. provider: remote.Provider,
  187. w: w,
  188. pctx: ctx,
  189. })
  190. }
  191. defer func() {
  192. for _, l := range rootfs {
  193. w.ContentStore.Delete(context.TODO(), l.Blob.Digest)
  194. }
  195. }()
  196. r := image.NewRootFS()
  197. rootFS, release, err := w.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, &discardProgress{})
  198. if err != nil {
  199. return nil, err
  200. }
  201. defer release()
  202. ref, err := w.CacheManager.GetFromSnapshotter(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("imported %s", remote.Descriptors[len(remote.Descriptors)-1].Digest)))
  203. if err != nil {
  204. return nil, err
  205. }
  206. return ref, nil
  207. }
  208. type discardProgress struct{}
  209. func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
  210. return nil
  211. }
  212. // Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
  213. type layerDescriptor struct {
  214. provider content.Provider
  215. desc ocispec.Descriptor
  216. diffID layer.DiffID
  217. // ref ctdreference.Spec
  218. w *Worker
  219. pctx context.Context
  220. }
  221. func (ld *layerDescriptor) Key() string {
  222. return "v2:" + ld.desc.Digest.String()
  223. }
  224. func (ld *layerDescriptor) ID() string {
  225. return ld.desc.Digest.String()
  226. }
  227. func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
  228. return ld.diffID, nil
  229. }
  230. func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
  231. done := oneOffProgress(ld.pctx, fmt.Sprintf("pulling %s", ld.desc.Digest))
  232. if err := contentutil.Copy(ctx, ld.w.ContentStore, ld.provider, ld.desc); err != nil {
  233. return nil, 0, done(err)
  234. }
  235. done(nil)
  236. ra, err := ld.w.ContentStore.ReaderAt(ctx, ld.desc)
  237. if err != nil {
  238. return nil, 0, err
  239. }
  240. return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
  241. }
  242. func (ld *layerDescriptor) Close() {
  243. // ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest)
  244. }
  245. func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
  246. // Cache mapping from this layer's DiffID to the blobsum
  247. ld.w.V2MetadataService.Add(diffID, distmetadata.V2Metadata{Digest: ld.desc.Digest})
  248. }
  249. func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
  250. layers := make([]rootfs.Layer, len(descs))
  251. for i, desc := range descs {
  252. diffIDStr := desc.Annotations["containerd.io/uncompressed"]
  253. if diffIDStr == "" {
  254. return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
  255. }
  256. diffID, err := digest.Parse(diffIDStr)
  257. if err != nil {
  258. return nil, err
  259. }
  260. layers[i].Diff = ocispec.Descriptor{
  261. MediaType: ocispec.MediaTypeImageLayer,
  262. Digest: diffID,
  263. }
  264. layers[i].Blob = ocispec.Descriptor{
  265. MediaType: desc.MediaType,
  266. Digest: desc.Digest,
  267. Size: desc.Size,
  268. }
  269. }
  270. return layers, nil
  271. }
  272. func oneOffProgress(ctx context.Context, id string) func(err error) error {
  273. pw, _, _ := progress.FromContext(ctx)
  274. now := time.Now()
  275. st := progress.Status{
  276. Started: &now,
  277. }
  278. pw.Write(id, st)
  279. return func(err error) error {
  280. // TODO: set error on status
  281. now := time.Now()
  282. st.Completed = &now
  283. pw.Write(id, st)
  284. pw.Close()
  285. return err
  286. }
  287. }
  288. type resolveImageConfig interface {
  289. ResolveImageConfig(ctx context.Context, ref string, platform *ocispec.Platform) (digest.Digest, []byte, error)
  290. }