builder.go 14 KB


  1. package buildkit
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/containerd/containerd/platforms"
  12. "github.com/containerd/containerd/remotes/docker"
  13. "github.com/docker/docker/api/types"
  14. "github.com/docker/docker/api/types/backend"
  15. "github.com/docker/docker/builder"
  16. "github.com/docker/docker/daemon/config"
  17. "github.com/docker/docker/daemon/images"
  18. "github.com/docker/docker/libnetwork"
  19. "github.com/docker/docker/pkg/idtools"
  20. "github.com/docker/docker/pkg/streamformatter"
  21. controlapi "github.com/moby/buildkit/api/services/control"
  22. "github.com/moby/buildkit/client"
  23. "github.com/moby/buildkit/control"
  24. "github.com/moby/buildkit/identity"
  25. "github.com/moby/buildkit/session"
  26. "github.com/moby/buildkit/util/entitlements"
  27. "github.com/moby/buildkit/util/tracing"
  28. "github.com/pkg/errors"
  29. "golang.org/x/sync/errgroup"
  30. "google.golang.org/grpc"
  31. grpcmetadata "google.golang.org/grpc/metadata"
  32. )
  33. type errMultipleFilterValues struct{}
  34. func (errMultipleFilterValues) Error() string { return "filters expect only one value" }
  35. func (errMultipleFilterValues) InvalidParameter() {}
  36. type errConflictFilter struct {
  37. a, b string
  38. }
  39. func (e errConflictFilter) Error() string {
  40. return fmt.Sprintf("conflicting filters: %q and %q", e.a, e.b)
  41. }
  42. func (errConflictFilter) InvalidParameter() {}
  43. var cacheFields = map[string]bool{
  44. "id": true,
  45. "parent": true,
  46. "type": true,
  47. "description": true,
  48. "inuse": true,
  49. "shared": true,
  50. "private": true,
  51. // fields from buildkit that are not exposed
  52. "mutable": false,
  53. "immutable": false,
  54. }
  55. // Opt is option struct required for creating the builder
  56. type Opt struct {
  57. SessionManager *session.Manager
  58. Root string
  59. Dist images.DistributionServices
  60. NetworkController libnetwork.NetworkController
  61. DefaultCgroupParent string
  62. RegistryHosts docker.RegistryHosts
  63. BuilderConfig config.BuilderConfig
  64. Rootless bool
  65. IdentityMapping *idtools.IdentityMapping
  66. DNSConfig config.DNSConfig
  67. ApparmorProfile string
  68. }
  69. // Builder can build using BuildKit backend
  70. type Builder struct {
  71. controller *control.Controller
  72. reqBodyHandler *reqBodyHandler
  73. mu sync.Mutex
  74. jobs map[string]*buildJob
  75. }
  76. // New creates a new builder
  77. func New(opt Opt) (*Builder, error) {
  78. reqHandler := newReqBodyHandler(tracing.DefaultTransport)
  79. if opt.IdentityMapping != nil && opt.IdentityMapping.Empty() {
  80. opt.IdentityMapping = nil
  81. }
  82. c, err := newController(reqHandler, opt)
  83. if err != nil {
  84. return nil, err
  85. }
  86. b := &Builder{
  87. controller: c,
  88. reqBodyHandler: reqHandler,
  89. jobs: map[string]*buildJob{},
  90. }
  91. return b, nil
  92. }
  93. // RegisterGRPC registers controller to the grpc server.
  94. func (b *Builder) RegisterGRPC(s *grpc.Server) {
  95. b.controller.Register(s)
  96. }
  97. // Cancel cancels a build using ID
  98. func (b *Builder) Cancel(ctx context.Context, id string) error {
  99. b.mu.Lock()
  100. if j, ok := b.jobs[id]; ok && j.cancel != nil {
  101. j.cancel()
  102. }
  103. b.mu.Unlock()
  104. return nil
  105. }
  106. // DiskUsage returns a report about space used by build cache
  107. func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) {
  108. duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{})
  109. if err != nil {
  110. return nil, err
  111. }
  112. var items []*types.BuildCache
  113. for _, r := range duResp.Record {
  114. items = append(items, &types.BuildCache{
  115. ID: r.ID,
  116. Parent: r.Parent,
  117. Type: r.RecordType,
  118. Description: r.Description,
  119. InUse: r.InUse,
  120. Shared: r.Shared,
  121. Size: r.Size_,
  122. CreatedAt: r.CreatedAt,
  123. LastUsedAt: r.LastUsedAt,
  124. UsageCount: int(r.UsageCount),
  125. })
  126. }
  127. return items, nil
  128. }
  129. // Prune clears all reclaimable build cache
  130. func (b *Builder) Prune(ctx context.Context, opts types.BuildCachePruneOptions) (int64, []string, error) {
  131. ch := make(chan *controlapi.UsageRecord)
  132. eg, ctx := errgroup.WithContext(ctx)
  133. validFilters := make(map[string]bool, 1+len(cacheFields))
  134. validFilters["unused-for"] = true
  135. validFilters["until"] = true
  136. validFilters["label"] = true // TODO(tiborvass): handle label
  137. validFilters["label!"] = true // TODO(tiborvass): handle label!
  138. for k, v := range cacheFields {
  139. validFilters[k] = v
  140. }
  141. if err := opts.Filters.Validate(validFilters); err != nil {
  142. return 0, nil, err
  143. }
  144. pi, err := toBuildkitPruneInfo(opts)
  145. if err != nil {
  146. return 0, nil, err
  147. }
  148. eg.Go(func() error {
  149. defer close(ch)
  150. return b.controller.Prune(&controlapi.PruneRequest{
  151. All: pi.All,
  152. KeepDuration: int64(pi.KeepDuration),
  153. KeepBytes: pi.KeepBytes,
  154. Filter: pi.Filter,
  155. }, &pruneProxy{
  156. streamProxy: streamProxy{ctx: ctx},
  157. ch: ch,
  158. })
  159. })
  160. var size int64
  161. var cacheIDs []string
  162. eg.Go(func() error {
  163. for r := range ch {
  164. size += r.Size_
  165. cacheIDs = append(cacheIDs, r.ID)
  166. }
  167. return nil
  168. })
  169. if err := eg.Wait(); err != nil {
  170. return 0, nil, err
  171. }
  172. return size, cacheIDs, nil
  173. }
  174. // Build executes a build request
  175. func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
  176. var rc = opt.Source
  177. if buildID := opt.Options.BuildID; buildID != "" {
  178. b.mu.Lock()
  179. upload := false
  180. if strings.HasPrefix(buildID, "upload-request:") {
  181. upload = true
  182. buildID = strings.TrimPrefix(buildID, "upload-request:")
  183. }
  184. if _, ok := b.jobs[buildID]; !ok {
  185. b.jobs[buildID] = newBuildJob()
  186. }
  187. j := b.jobs[buildID]
  188. var cancel func()
  189. ctx, cancel = context.WithCancel(ctx)
  190. j.cancel = cancel
  191. b.mu.Unlock()
  192. if upload {
  193. ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
  194. defer cancel()
  195. err := j.SetUpload(ctx2, rc)
  196. return nil, err
  197. }
  198. if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" {
  199. ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
  200. defer cancel()
  201. var err error
  202. rc, err = j.WaitUpload(ctx2)
  203. if err != nil {
  204. return nil, err
  205. }
  206. opt.Options.RemoteContext = ""
  207. }
  208. defer func() {
  209. b.mu.Lock()
  210. delete(b.jobs, buildID)
  211. b.mu.Unlock()
  212. }()
  213. }
  214. var out builder.Result
  215. id := identity.NewID()
  216. frontendAttrs := map[string]string{}
  217. if opt.Options.Target != "" {
  218. frontendAttrs["target"] = opt.Options.Target
  219. }
  220. if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." {
  221. frontendAttrs["filename"] = opt.Options.Dockerfile
  222. }
  223. if opt.Options.RemoteContext != "" {
  224. if opt.Options.RemoteContext != "client-session" {
  225. frontendAttrs["context"] = opt.Options.RemoteContext
  226. }
  227. } else {
  228. url, cancel := b.reqBodyHandler.newRequest(rc)
  229. defer cancel()
  230. frontendAttrs["context"] = url
  231. }
  232. cacheFrom := append([]string{}, opt.Options.CacheFrom...)
  233. frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",")
  234. for k, v := range opt.Options.BuildArgs {
  235. if v == nil {
  236. continue
  237. }
  238. frontendAttrs["build-arg:"+k] = *v
  239. }
  240. for k, v := range opt.Options.Labels {
  241. frontendAttrs["label:"+k] = v
  242. }
  243. if opt.Options.NoCache {
  244. frontendAttrs["no-cache"] = ""
  245. }
  246. if opt.Options.PullParent {
  247. frontendAttrs["image-resolve-mode"] = "pull"
  248. } else {
  249. frontendAttrs["image-resolve-mode"] = "default"
  250. }
  251. if opt.Options.Platform != "" {
  252. // same as in newBuilder in builder/dockerfile.builder.go
  253. // TODO: remove once opt.Options.Platform is of type specs.Platform
  254. _, err := platforms.Parse(opt.Options.Platform)
  255. if err != nil {
  256. return nil, err
  257. }
  258. frontendAttrs["platform"] = opt.Options.Platform
  259. }
  260. switch opt.Options.NetworkMode {
  261. case "host", "none":
  262. frontendAttrs["force-network-mode"] = opt.Options.NetworkMode
  263. case "", "default":
  264. default:
  265. return nil, errors.Errorf("network mode %q not supported by buildkit", opt.Options.NetworkMode)
  266. }
  267. extraHosts, err := toBuildkitExtraHosts(opt.Options.ExtraHosts)
  268. if err != nil {
  269. return nil, err
  270. }
  271. frontendAttrs["add-hosts"] = extraHosts
  272. exporterName := ""
  273. exporterAttrs := map[string]string{}
  274. if len(opt.Options.Outputs) > 1 {
  275. return nil, errors.Errorf("multiple outputs not supported")
  276. } else if len(opt.Options.Outputs) == 0 {
  277. exporterName = "moby"
  278. } else {
  279. // cacheonly is a special type for triggering skipping all exporters
  280. if opt.Options.Outputs[0].Type != "cacheonly" {
  281. exporterName = opt.Options.Outputs[0].Type
  282. exporterAttrs = opt.Options.Outputs[0].Attrs
  283. }
  284. }
  285. if exporterName == "moby" {
  286. if len(opt.Options.Tags) > 0 {
  287. exporterAttrs["name"] = strings.Join(opt.Options.Tags, ",")
  288. }
  289. }
  290. cache := controlapi.CacheOptions{}
  291. if inlineCache := opt.Options.BuildArgs["BUILDKIT_INLINE_CACHE"]; inlineCache != nil {
  292. if b, err := strconv.ParseBool(*inlineCache); err == nil && b {
  293. cache.Exports = append(cache.Exports, &controlapi.CacheOptionsEntry{
  294. Type: "inline",
  295. })
  296. }
  297. }
  298. req := &controlapi.SolveRequest{
  299. Ref: id,
  300. Exporter: exporterName,
  301. ExporterAttrs: exporterAttrs,
  302. Frontend: "dockerfile.v0",
  303. FrontendAttrs: frontendAttrs,
  304. Session: opt.Options.SessionID,
  305. Cache: cache,
  306. }
  307. if opt.Options.NetworkMode == "host" {
  308. req.Entitlements = append(req.Entitlements, entitlements.EntitlementNetworkHost)
  309. }
  310. aux := streamformatter.AuxFormatter{Writer: opt.ProgressWriter.Output}
  311. eg, ctx := errgroup.WithContext(ctx)
  312. eg.Go(func() error {
  313. resp, err := b.controller.Solve(ctx, req)
  314. if err != nil {
  315. return err
  316. }
  317. if exporterName != "moby" {
  318. return nil
  319. }
  320. id, ok := resp.ExporterResponse["containerimage.digest"]
  321. if !ok {
  322. return errors.Errorf("missing image id")
  323. }
  324. out.ImageID = id
  325. return aux.Emit("moby.image.id", types.BuildResult{ID: id})
  326. })
  327. ch := make(chan *controlapi.StatusResponse)
  328. eg.Go(func() error {
  329. defer close(ch)
  330. // streamProxy.ctx is not set to ctx because when request is cancelled,
  331. // only the build request has to be cancelled, not the status request.
  332. stream := &statusProxy{streamProxy: streamProxy{ctx: context.TODO()}, ch: ch}
  333. return b.controller.Status(&controlapi.StatusRequest{Ref: id}, stream)
  334. })
  335. eg.Go(func() error {
  336. for sr := range ch {
  337. dt, err := sr.Marshal()
  338. if err != nil {
  339. return err
  340. }
  341. if err := aux.Emit("moby.buildkit.trace", dt); err != nil {
  342. return err
  343. }
  344. }
  345. return nil
  346. })
  347. if err := eg.Wait(); err != nil {
  348. return nil, err
  349. }
  350. return &out, nil
  351. }
  352. type streamProxy struct {
  353. ctx context.Context
  354. }
  355. func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error {
  356. return nil
  357. }
  358. func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error {
  359. return nil
  360. }
  361. func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) {
  362. }
  363. func (sp *streamProxy) Context() context.Context {
  364. return sp.ctx
  365. }
  366. func (sp *streamProxy) RecvMsg(m interface{}) error {
  367. return io.EOF
  368. }
  369. type statusProxy struct {
  370. streamProxy
  371. ch chan *controlapi.StatusResponse
  372. }
  373. func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error {
  374. return sp.SendMsg(resp)
  375. }
  376. func (sp *statusProxy) SendMsg(m interface{}) error {
  377. if sr, ok := m.(*controlapi.StatusResponse); ok {
  378. sp.ch <- sr
  379. }
  380. return nil
  381. }
  382. type pruneProxy struct {
  383. streamProxy
  384. ch chan *controlapi.UsageRecord
  385. }
  386. func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error {
  387. return sp.SendMsg(resp)
  388. }
  389. func (sp *pruneProxy) SendMsg(m interface{}) error {
  390. if sr, ok := m.(*controlapi.UsageRecord); ok {
  391. sp.ch <- sr
  392. }
  393. return nil
  394. }
  395. type wrapRC struct {
  396. io.ReadCloser
  397. once sync.Once
  398. err error
  399. waitCh chan struct{}
  400. }
  401. func (w *wrapRC) Read(b []byte) (int, error) {
  402. n, err := w.ReadCloser.Read(b)
  403. if err != nil {
  404. e := err
  405. if e == io.EOF {
  406. e = nil
  407. }
  408. w.close(e)
  409. }
  410. return n, err
  411. }
  412. func (w *wrapRC) Close() error {
  413. err := w.ReadCloser.Close()
  414. w.close(err)
  415. return err
  416. }
  417. func (w *wrapRC) close(err error) {
  418. w.once.Do(func() {
  419. w.err = err
  420. close(w.waitCh)
  421. })
  422. }
  423. func (w *wrapRC) wait() error {
  424. <-w.waitCh
  425. return w.err
  426. }
  427. type buildJob struct {
  428. cancel func()
  429. waitCh chan func(io.ReadCloser) error
  430. }
  431. func newBuildJob() *buildJob {
  432. return &buildJob{waitCh: make(chan func(io.ReadCloser) error)}
  433. }
  434. func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) {
  435. done := make(chan struct{})
  436. var upload io.ReadCloser
  437. fn := func(rc io.ReadCloser) error {
  438. w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})}
  439. upload = w
  440. close(done)
  441. return w.wait()
  442. }
  443. select {
  444. case <-ctx.Done():
  445. return nil, ctx.Err()
  446. case j.waitCh <- fn:
  447. <-done
  448. return upload, nil
  449. }
  450. }
  451. func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error {
  452. select {
  453. case <-ctx.Done():
  454. return ctx.Err()
  455. case fn := <-j.waitCh:
  456. return fn(rc)
  457. }
  458. }
  459. // toBuildkitExtraHosts converts hosts from docker key:value format to buildkit's csv format
  460. func toBuildkitExtraHosts(inp []string) (string, error) {
  461. if len(inp) == 0 {
  462. return "", nil
  463. }
  464. hosts := make([]string, 0, len(inp))
  465. for _, h := range inp {
  466. parts := strings.Split(h, ":")
  467. if len(parts) != 2 || parts[0] == "" || net.ParseIP(parts[1]) == nil {
  468. return "", errors.Errorf("invalid host %s", h)
  469. }
  470. hosts = append(hosts, parts[0]+"="+parts[1])
  471. }
  472. return strings.Join(hosts, ","), nil
  473. }
  474. func toBuildkitPruneInfo(opts types.BuildCachePruneOptions) (client.PruneInfo, error) {
  475. var until time.Duration
  476. untilValues := opts.Filters.Get("until") // canonical
  477. unusedForValues := opts.Filters.Get("unused-for") // deprecated synonym for "until" filter
  478. if len(untilValues) > 0 && len(unusedForValues) > 0 {
  479. return client.PruneInfo{}, errConflictFilter{"until", "unused-for"}
  480. }
  481. filterKey := "until"
  482. if len(unusedForValues) > 0 {
  483. filterKey = "unused-for"
  484. }
  485. untilValues = append(untilValues, unusedForValues...)
  486. switch len(untilValues) {
  487. case 0:
  488. // nothing to do
  489. case 1:
  490. var err error
  491. until, err = time.ParseDuration(untilValues[0])
  492. if err != nil {
  493. return client.PruneInfo{}, errors.Wrapf(err, "%q filter expects a duration (e.g., '24h')", filterKey)
  494. }
  495. default:
  496. return client.PruneInfo{}, errMultipleFilterValues{}
  497. }
  498. bkFilter := make([]string, 0, opts.Filters.Len())
  499. for cacheField := range cacheFields {
  500. if opts.Filters.Contains(cacheField) {
  501. values := opts.Filters.Get(cacheField)
  502. switch len(values) {
  503. case 0:
  504. bkFilter = append(bkFilter, cacheField)
  505. case 1:
  506. if cacheField == "id" {
  507. bkFilter = append(bkFilter, cacheField+"~="+values[0])
  508. } else {
  509. bkFilter = append(bkFilter, cacheField+"=="+values[0])
  510. }
  511. default:
  512. return client.PruneInfo{}, errMultipleFilterValues{}
  513. }
  514. }
  515. }
  516. return client.PruneInfo{
  517. All: opts.All,
  518. KeepDuration: until,
  519. KeepBytes: opts.KeepStorage,
  520. Filter: []string{strings.Join(bkFilter, ",")},
  521. }, nil
  522. }