solve.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. package client
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "time"
  10. "github.com/containerd/containerd/content"
  11. contentlocal "github.com/containerd/containerd/content/local"
  12. controlapi "github.com/moby/buildkit/api/services/control"
  13. "github.com/moby/buildkit/client/llb"
  14. "github.com/moby/buildkit/client/ociindex"
  15. "github.com/moby/buildkit/identity"
  16. "github.com/moby/buildkit/session"
  17. sessioncontent "github.com/moby/buildkit/session/content"
  18. "github.com/moby/buildkit/session/filesync"
  19. "github.com/moby/buildkit/session/grpchijack"
  20. "github.com/moby/buildkit/solver/pb"
  21. "github.com/moby/buildkit/util/entitlements"
  22. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  23. opentracing "github.com/opentracing/opentracing-go"
  24. "github.com/pkg/errors"
  25. "github.com/sirupsen/logrus"
  26. fstypes "github.com/tonistiigi/fsutil/types"
  27. "golang.org/x/sync/errgroup"
  28. )
  29. type SolveOpt struct {
  30. Exports []ExportEntry
  31. LocalDirs map[string]string
  32. SharedKey string
  33. Frontend string
  34. FrontendAttrs map[string]string
  35. FrontendInputs map[string]llb.State
  36. CacheExports []CacheOptionsEntry
  37. CacheImports []CacheOptionsEntry
  38. Session []session.Attachable
  39. AllowedEntitlements []entitlements.Entitlement
  40. SharedSession *session.Session // TODO: refactor to better session syncing
  41. SessionPreInitialized bool // TODO: refactor to better session syncing
  42. }
  43. type ExportEntry struct {
  44. Type string
  45. Attrs map[string]string
  46. Output func(map[string]string) (io.WriteCloser, error) // for ExporterOCI and ExporterDocker
  47. OutputDir string // for ExporterLocal
  48. }
  49. type CacheOptionsEntry struct {
  50. Type string
  51. Attrs map[string]string
  52. }
  53. // Solve calls Solve on the controller.
  54. // def must be nil if (and only if) opt.Frontend is set.
  55. func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) {
  56. defer func() {
  57. if statusChan != nil {
  58. close(statusChan)
  59. }
  60. }()
  61. if opt.Frontend == "" && def == nil {
  62. return nil, errors.New("invalid empty definition")
  63. }
  64. if opt.Frontend != "" && def != nil {
  65. return nil, errors.Errorf("invalid definition for frontend %s", opt.Frontend)
  66. }
  67. return c.solve(ctx, def, nil, opt, statusChan)
  68. }
  69. type runGatewayCB func(ref string, s *session.Session) error
  70. func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runGatewayCB, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) {
  71. if def != nil && runGateway != nil {
  72. return nil, errors.New("invalid with def and cb")
  73. }
  74. syncedDirs, err := prepareSyncedDirs(def, opt.LocalDirs)
  75. if err != nil {
  76. return nil, err
  77. }
  78. ref := identity.NewID()
  79. eg, ctx := errgroup.WithContext(ctx)
  80. statusContext, cancelStatus := context.WithCancel(context.Background())
  81. defer cancelStatus()
  82. if span := opentracing.SpanFromContext(ctx); span != nil {
  83. statusContext = opentracing.ContextWithSpan(statusContext, span)
  84. }
  85. s := opt.SharedSession
  86. if s == nil {
  87. if opt.SessionPreInitialized {
  88. return nil, errors.Errorf("no session provided for preinitialized option")
  89. }
  90. s, err = session.NewSession(statusContext, defaultSessionName(), opt.SharedKey)
  91. if err != nil {
  92. return nil, errors.Wrap(err, "failed to create session")
  93. }
  94. }
  95. cacheOpt, err := parseCacheOptions(opt)
  96. if err != nil {
  97. return nil, err
  98. }
  99. var ex ExportEntry
  100. if len(opt.Exports) > 1 {
  101. return nil, errors.New("currently only single Exports can be specified")
  102. }
  103. if len(opt.Exports) == 1 {
  104. ex = opt.Exports[0]
  105. }
  106. if !opt.SessionPreInitialized {
  107. if len(syncedDirs) > 0 {
  108. s.Allow(filesync.NewFSSyncProvider(syncedDirs))
  109. }
  110. for _, a := range opt.Session {
  111. s.Allow(a)
  112. }
  113. switch ex.Type {
  114. case ExporterLocal:
  115. if ex.Output != nil {
  116. return nil, errors.New("output file writer is not supported by local exporter")
  117. }
  118. if ex.OutputDir == "" {
  119. return nil, errors.New("output directory is required for local exporter")
  120. }
  121. s.Allow(filesync.NewFSSyncTargetDir(ex.OutputDir))
  122. case ExporterOCI, ExporterDocker, ExporterTar:
  123. if ex.OutputDir != "" {
  124. return nil, errors.Errorf("output directory %s is not supported by %s exporter", ex.OutputDir, ex.Type)
  125. }
  126. if ex.Output == nil {
  127. return nil, errors.Errorf("output file writer is required for %s exporter", ex.Type)
  128. }
  129. s.Allow(filesync.NewFSSyncTarget(ex.Output))
  130. default:
  131. if ex.Output != nil {
  132. return nil, errors.Errorf("output file writer is not supported by %s exporter", ex.Type)
  133. }
  134. if ex.OutputDir != "" {
  135. return nil, errors.Errorf("output directory %s is not supported by %s exporter", ex.OutputDir, ex.Type)
  136. }
  137. }
  138. if len(cacheOpt.contentStores) > 0 {
  139. s.Allow(sessioncontent.NewAttachable(cacheOpt.contentStores))
  140. }
  141. eg.Go(func() error {
  142. return s.Run(statusContext, grpchijack.Dialer(c.controlClient()))
  143. })
  144. }
  145. for k, v := range cacheOpt.frontendAttrs {
  146. opt.FrontendAttrs[k] = v
  147. }
  148. solveCtx, cancelSolve := context.WithCancel(ctx)
  149. var res *SolveResponse
  150. eg.Go(func() error {
  151. ctx := solveCtx
  152. defer cancelSolve()
  153. defer func() { // make sure the Status ends cleanly on build errors
  154. go func() {
  155. <-time.After(3 * time.Second)
  156. cancelStatus()
  157. }()
  158. logrus.Debugf("stopping session")
  159. s.Close()
  160. }()
  161. var pbd *pb.Definition
  162. if def != nil {
  163. pbd = def.ToPB()
  164. }
  165. frontendInputs := make(map[string]*pb.Definition)
  166. for key, st := range opt.FrontendInputs {
  167. def, err := st.Marshal(ctx)
  168. if err != nil {
  169. return err
  170. }
  171. frontendInputs[key] = def.ToPB()
  172. }
  173. resp, err := c.controlClient().Solve(ctx, &controlapi.SolveRequest{
  174. Ref: ref,
  175. Definition: pbd,
  176. Exporter: ex.Type,
  177. ExporterAttrs: ex.Attrs,
  178. Session: s.ID(),
  179. Frontend: opt.Frontend,
  180. FrontendAttrs: opt.FrontendAttrs,
  181. FrontendInputs: frontendInputs,
  182. Cache: cacheOpt.options,
  183. Entitlements: opt.AllowedEntitlements,
  184. })
  185. if err != nil {
  186. return errors.Wrap(err, "failed to solve")
  187. }
  188. res = &SolveResponse{
  189. ExporterResponse: resp.ExporterResponse,
  190. }
  191. return nil
  192. })
  193. if runGateway != nil {
  194. eg.Go(func() error {
  195. err := runGateway(ref, s)
  196. if err == nil {
  197. return nil
  198. }
  199. // If the callback failed then the main
  200. // `Solve` (called above) should error as
  201. // well. However as a fallback we wait up to
  202. // 5s for that to happen before failing this
  203. // goroutine.
  204. select {
  205. case <-solveCtx.Done():
  206. case <-time.After(5 * time.Second):
  207. cancelSolve()
  208. }
  209. return err
  210. })
  211. }
  212. eg.Go(func() error {
  213. stream, err := c.controlClient().Status(statusContext, &controlapi.StatusRequest{
  214. Ref: ref,
  215. })
  216. if err != nil {
  217. return errors.Wrap(err, "failed to get status")
  218. }
  219. for {
  220. resp, err := stream.Recv()
  221. if err != nil {
  222. if err == io.EOF {
  223. return nil
  224. }
  225. return errors.Wrap(err, "failed to receive status")
  226. }
  227. s := SolveStatus{}
  228. for _, v := range resp.Vertexes {
  229. s.Vertexes = append(s.Vertexes, &Vertex{
  230. Digest: v.Digest,
  231. Inputs: v.Inputs,
  232. Name: v.Name,
  233. Started: v.Started,
  234. Completed: v.Completed,
  235. Error: v.Error,
  236. Cached: v.Cached,
  237. })
  238. }
  239. for _, v := range resp.Statuses {
  240. s.Statuses = append(s.Statuses, &VertexStatus{
  241. ID: v.ID,
  242. Vertex: v.Vertex,
  243. Name: v.Name,
  244. Total: v.Total,
  245. Current: v.Current,
  246. Timestamp: v.Timestamp,
  247. Started: v.Started,
  248. Completed: v.Completed,
  249. })
  250. }
  251. for _, v := range resp.Logs {
  252. s.Logs = append(s.Logs, &VertexLog{
  253. Vertex: v.Vertex,
  254. Stream: int(v.Stream),
  255. Data: v.Msg,
  256. Timestamp: v.Timestamp,
  257. })
  258. }
  259. if statusChan != nil {
  260. statusChan <- &s
  261. }
  262. }
  263. })
  264. if err := eg.Wait(); err != nil {
  265. return nil, err
  266. }
  267. // Update index.json of exported cache content store
  268. // FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest"
  269. if manifestDescJSON := res.ExporterResponse["cache.manifest"]; manifestDescJSON != "" {
  270. var manifestDesc ocispec.Descriptor
  271. if err = json.Unmarshal([]byte(manifestDescJSON), &manifestDesc); err != nil {
  272. return nil, err
  273. }
  274. for indexJSONPath, tag := range cacheOpt.indicesToUpdate {
  275. if err = ociindex.PutDescToIndexJSONFileLocked(indexJSONPath, manifestDesc, tag); err != nil {
  276. return nil, err
  277. }
  278. }
  279. }
  280. return res, nil
  281. }
  282. func prepareSyncedDirs(def *llb.Definition, localDirs map[string]string) ([]filesync.SyncedDir, error) {
  283. for _, d := range localDirs {
  284. fi, err := os.Stat(d)
  285. if err != nil {
  286. return nil, errors.Wrapf(err, "could not find %s", d)
  287. }
  288. if !fi.IsDir() {
  289. return nil, errors.Errorf("%s not a directory", d)
  290. }
  291. }
  292. resetUIDAndGID := func(p string, st *fstypes.Stat) bool {
  293. st.Uid = 0
  294. st.Gid = 0
  295. return true
  296. }
  297. dirs := make([]filesync.SyncedDir, 0, len(localDirs))
  298. if def == nil {
  299. for name, d := range localDirs {
  300. dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID})
  301. }
  302. } else {
  303. for _, dt := range def.Def {
  304. var op pb.Op
  305. if err := (&op).Unmarshal(dt); err != nil {
  306. return nil, errors.Wrap(err, "failed to parse llb proto op")
  307. }
  308. if src := op.GetSource(); src != nil {
  309. if strings.HasPrefix(src.Identifier, "local://") { // TODO: just make a type property
  310. name := strings.TrimPrefix(src.Identifier, "local://")
  311. d, ok := localDirs[name]
  312. if !ok {
  313. return nil, errors.Errorf("local directory %s not enabled", name)
  314. }
  315. dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID}) // TODO: excludes
  316. }
  317. }
  318. }
  319. }
  320. return dirs, nil
  321. }
  322. func defaultSessionName() string {
  323. wd, err := os.Getwd()
  324. if err != nil {
  325. return "unknown"
  326. }
  327. return filepath.Base(wd)
  328. }
  329. type cacheOptions struct {
  330. options controlapi.CacheOptions
  331. contentStores map[string]content.Store // key: ID of content store ("local:" + csDir)
  332. indicesToUpdate map[string]string // key: index.JSON file name, value: tag
  333. frontendAttrs map[string]string
  334. }
  335. func parseCacheOptions(opt SolveOpt) (*cacheOptions, error) {
  336. var (
  337. cacheExports []*controlapi.CacheOptionsEntry
  338. cacheImports []*controlapi.CacheOptionsEntry
  339. // legacy API is used for registry caches, because the daemon might not support the new API
  340. legacyExportRef string
  341. legacyImportRefs []string
  342. )
  343. contentStores := make(map[string]content.Store)
  344. indicesToUpdate := make(map[string]string) // key: index.JSON file name, value: tag
  345. frontendAttrs := make(map[string]string)
  346. legacyExportAttrs := make(map[string]string)
  347. for _, ex := range opt.CacheExports {
  348. if ex.Type == "local" {
  349. csDir := ex.Attrs["dest"]
  350. if csDir == "" {
  351. return nil, errors.New("local cache exporter requires dest")
  352. }
  353. if err := os.MkdirAll(csDir, 0755); err != nil {
  354. return nil, err
  355. }
  356. cs, err := contentlocal.NewStore(csDir)
  357. if err != nil {
  358. return nil, err
  359. }
  360. contentStores["local:"+csDir] = cs
  361. // TODO(AkihiroSuda): support custom index JSON path and tag
  362. indexJSONPath := filepath.Join(csDir, "index.json")
  363. indicesToUpdate[indexJSONPath] = "latest"
  364. }
  365. if ex.Type == "registry" && legacyExportRef == "" {
  366. legacyExportRef = ex.Attrs["ref"]
  367. for k, v := range ex.Attrs {
  368. if k != "ref" {
  369. legacyExportAttrs[k] = v
  370. }
  371. }
  372. } else {
  373. cacheExports = append(cacheExports, &controlapi.CacheOptionsEntry{
  374. Type: ex.Type,
  375. Attrs: ex.Attrs,
  376. })
  377. }
  378. }
  379. for _, im := range opt.CacheImports {
  380. attrs := im.Attrs
  381. if im.Type == "local" {
  382. csDir := im.Attrs["src"]
  383. if csDir == "" {
  384. return nil, errors.New("local cache importer requires src")
  385. }
  386. cs, err := contentlocal.NewStore(csDir)
  387. if err != nil {
  388. logrus.Warning("local cache import at " + csDir + " not found due to err: " + err.Error())
  389. continue
  390. }
  391. // if digest is not specified, load from "latest" tag
  392. if attrs["digest"] == "" {
  393. idx, err := ociindex.ReadIndexJSONFileLocked(filepath.Join(csDir, "index.json"))
  394. if err != nil {
  395. logrus.Warning("local cache import at " + csDir + " not found due to err: " + err.Error())
  396. continue
  397. }
  398. for _, m := range idx.Manifests {
  399. if (m.Annotations[ocispec.AnnotationRefName] == "latest" && attrs["tag"] == "") || (attrs["tag"] != "" && m.Annotations[ocispec.AnnotationRefName] == attrs["tag"]) {
  400. attrs["digest"] = string(m.Digest)
  401. break
  402. }
  403. }
  404. if attrs["digest"] == "" {
  405. return nil, errors.New("local cache importer requires either explicit digest, \"latest\" tag or custom tag on index.json")
  406. }
  407. }
  408. contentStores["local:"+csDir] = cs
  409. }
  410. if im.Type == "registry" {
  411. legacyImportRef := attrs["ref"]
  412. legacyImportRefs = append(legacyImportRefs, legacyImportRef)
  413. } else {
  414. cacheImports = append(cacheImports, &controlapi.CacheOptionsEntry{
  415. Type: im.Type,
  416. Attrs: attrs,
  417. })
  418. }
  419. }
  420. if opt.Frontend != "" {
  421. // use legacy API for registry importers, because the frontend might not support the new API
  422. if len(legacyImportRefs) > 0 {
  423. frontendAttrs["cache-from"] = strings.Join(legacyImportRefs, ",")
  424. }
  425. // use new API for other importers
  426. if len(cacheImports) > 0 {
  427. s, err := json.Marshal(cacheImports)
  428. if err != nil {
  429. return nil, err
  430. }
  431. frontendAttrs["cache-imports"] = string(s)
  432. }
  433. }
  434. res := cacheOptions{
  435. options: controlapi.CacheOptions{
  436. // old API (for registry caches, planned to be removed in early 2019)
  437. ExportRefDeprecated: legacyExportRef,
  438. ExportAttrsDeprecated: legacyExportAttrs,
  439. ImportRefsDeprecated: legacyImportRefs,
  440. // new API
  441. Exports: cacheExports,
  442. Imports: cacheImports,
  443. },
  444. contentStores: contentStores,
  445. indicesToUpdate: indicesToUpdate,
  446. frontendAttrs: frontendAttrs,
  447. }
  448. return &res, nil
  449. }