gateway.go 16 KB


  1. package gateway
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/docker/distribution/reference"
  14. apitypes "github.com/moby/buildkit/api/types"
  15. "github.com/moby/buildkit/cache"
  16. cacheutil "github.com/moby/buildkit/cache/util"
  17. "github.com/moby/buildkit/client"
  18. "github.com/moby/buildkit/client/llb"
  19. "github.com/moby/buildkit/executor"
  20. "github.com/moby/buildkit/exporter/containerimage/exptypes"
  21. "github.com/moby/buildkit/frontend"
  22. gw "github.com/moby/buildkit/frontend/gateway/client"
  23. pb "github.com/moby/buildkit/frontend/gateway/pb"
  24. "github.com/moby/buildkit/identity"
  25. "github.com/moby/buildkit/session"
  26. "github.com/moby/buildkit/solver"
  27. opspb "github.com/moby/buildkit/solver/pb"
  28. "github.com/moby/buildkit/util/apicaps"
  29. "github.com/moby/buildkit/util/tracing"
  30. "github.com/moby/buildkit/worker"
  31. specs "github.com/opencontainers/image-spec/specs-go/v1"
  32. "github.com/pkg/errors"
  33. "github.com/sirupsen/logrus"
  34. "golang.org/x/net/http2"
  35. spb "google.golang.org/genproto/googleapis/rpc/status"
  36. "google.golang.org/grpc"
  37. "google.golang.org/grpc/health"
  38. "google.golang.org/grpc/health/grpc_health_v1"
  39. "google.golang.org/grpc/status"
  40. )
  41. const (
  42. keySource = "source"
  43. keyDevel = "gateway-devel"
  44. )
  45. func NewGatewayFrontend(w frontend.WorkerInfos) frontend.Frontend {
  46. return &gatewayFrontend{
  47. workers: w,
  48. }
  49. }
  50. type gatewayFrontend struct {
  51. workers frontend.WorkerInfos
  52. }
  53. func filterPrefix(opts map[string]string, pfx string) map[string]string {
  54. m := map[string]string{}
  55. for k, v := range opts {
  56. if strings.HasPrefix(k, pfx) {
  57. m[strings.TrimPrefix(k, pfx)] = v
  58. }
  59. }
  60. return m
  61. }
  62. func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string) (*frontend.Result, error) {
  63. source, ok := opts[keySource]
  64. if !ok {
  65. return nil, errors.Errorf("no source specified for gateway")
  66. }
  67. sid := session.FromContext(ctx)
  68. _, isDevel := opts[keyDevel]
  69. var img specs.Image
  70. var rootFS cache.ImmutableRef
  71. var readonly bool // TODO: try to switch to read-only by default.
  72. if isDevel {
  73. devRes, err := llbBridge.Solve(session.NewContext(ctx, "gateway:"+sid),
  74. frontend.SolveRequest{
  75. Frontend: source,
  76. FrontendOpt: filterPrefix(opts, "gateway-"),
  77. })
  78. if err != nil {
  79. return nil, err
  80. }
  81. defer func() {
  82. devRes.EachRef(func(ref solver.CachedResult) error {
  83. return ref.Release(context.TODO())
  84. })
  85. }()
  86. if devRes.Ref == nil {
  87. return nil, errors.Errorf("development gateway didn't return default result")
  88. }
  89. workerRef, ok := devRes.Ref.Sys().(*worker.WorkerRef)
  90. if !ok {
  91. return nil, errors.Errorf("invalid ref: %T", devRes.Ref.Sys())
  92. }
  93. rootFS = workerRef.ImmutableRef
  94. config, ok := devRes.Metadata[exptypes.ExporterImageConfigKey]
  95. if ok {
  96. if err := json.Unmarshal(config, &img); err != nil {
  97. return nil, err
  98. }
  99. }
  100. } else {
  101. sourceRef, err := reference.ParseNormalizedNamed(source)
  102. if err != nil {
  103. return nil, err
  104. }
  105. dgst, config, err := llbBridge.ResolveImageConfig(ctx, reference.TagNameOnly(sourceRef).String(), gw.ResolveImageConfigOpt{})
  106. if err != nil {
  107. return nil, err
  108. }
  109. if err := json.Unmarshal(config, &img); err != nil {
  110. return nil, err
  111. }
  112. if dgst != "" {
  113. sourceRef, err = reference.WithDigest(sourceRef, dgst)
  114. if err != nil {
  115. return nil, err
  116. }
  117. }
  118. src := llb.Image(sourceRef.String(), &markTypeFrontend{})
  119. def, err := src.Marshal()
  120. if err != nil {
  121. return nil, err
  122. }
  123. res, err := llbBridge.Solve(ctx, frontend.SolveRequest{
  124. Definition: def.ToPB(),
  125. })
  126. if err != nil {
  127. return nil, err
  128. }
  129. defer func() {
  130. res.EachRef(func(ref solver.CachedResult) error {
  131. return ref.Release(context.TODO())
  132. })
  133. }()
  134. if res.Ref == nil {
  135. return nil, errors.Errorf("gateway source didn't return default result")
  136. }
  137. workerRef, ok := res.Ref.Sys().(*worker.WorkerRef)
  138. if !ok {
  139. return nil, errors.Errorf("invalid ref: %T", res.Ref.Sys())
  140. }
  141. rootFS = workerRef.ImmutableRef
  142. }
  143. lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers)
  144. defer lbf.conn.Close()
  145. if err != nil {
  146. return nil, err
  147. }
  148. args := []string{"/run"}
  149. env := []string{}
  150. cwd := "/"
  151. if img.Config.Env != nil {
  152. env = img.Config.Env
  153. }
  154. if img.Config.Entrypoint != nil {
  155. args = img.Config.Entrypoint
  156. }
  157. if img.Config.WorkingDir != "" {
  158. cwd = img.Config.WorkingDir
  159. }
  160. i := 0
  161. for k, v := range opts {
  162. env = append(env, fmt.Sprintf("BUILDKIT_FRONTEND_OPT_%d", i)+"="+k+"="+v)
  163. i++
  164. }
  165. env = append(env, "BUILDKIT_SESSION_ID="+sid)
  166. dt, err := json.Marshal(gf.workers.WorkerInfos())
  167. if err != nil {
  168. return nil, errors.Wrap(err, "failed to marshal workers array")
  169. }
  170. env = append(env, "BUILDKIT_WORKERS="+string(dt))
  171. defer lbf.Discard()
  172. env = append(env, "BUILDKIT_EXPORTEDPRODUCT="+apicaps.ExportedProduct)
  173. meta := executor.Meta{
  174. Env: env,
  175. Args: args,
  176. Cwd: cwd,
  177. ReadonlyRootFS: readonly,
  178. }
  179. if v, ok := img.Config.Labels["moby.buildkit.frontend.network.none"]; ok {
  180. if ok, _ := strconv.ParseBool(v); ok {
  181. meta.NetMode = opspb.NetMode_NONE
  182. }
  183. }
  184. err = llbBridge.Exec(ctx, meta, rootFS, lbf.Stdin, lbf.Stdout, os.Stderr)
  185. if err != nil {
  186. if errors.Cause(err) == context.Canceled && lbf.isErrServerClosed {
  187. err = errors.Errorf("frontend grpc server closed unexpectedly")
  188. }
  189. // An existing error (set via Return rpc) takes
  190. // precedence over this error, which in turn takes
  191. // precedence over a success reported via Return.
  192. lbf.mu.Lock()
  193. if lbf.err == nil {
  194. lbf.result = nil
  195. lbf.err = err
  196. }
  197. lbf.mu.Unlock()
  198. }
  199. return lbf.Result()
  200. }
  201. func (lbf *llbBridgeForwarder) Discard() {
  202. lbf.mu.Lock()
  203. defer lbf.mu.Unlock()
  204. for id, r := range lbf.refs {
  205. if lbf.err == nil && lbf.result != nil {
  206. keep := false
  207. lbf.result.EachRef(func(r2 solver.CachedResult) error {
  208. if r == r2 {
  209. keep = true
  210. }
  211. return nil
  212. })
  213. if keep {
  214. continue
  215. }
  216. }
  217. r.Release(context.TODO())
  218. delete(lbf.refs, id)
  219. }
  220. }
  221. func (lbf *llbBridgeForwarder) Done() <-chan struct{} {
  222. return lbf.doneCh
  223. }
  224. func (lbf *llbBridgeForwarder) setResult(r *frontend.Result, err error) (*pb.ReturnResponse, error) {
  225. lbf.mu.Lock()
  226. defer lbf.mu.Unlock()
  227. if (r == nil) == (err == nil) {
  228. return nil, errors.New("gateway return must be either result or err")
  229. }
  230. if lbf.result != nil || lbf.err != nil {
  231. return nil, errors.New("gateway result is already set")
  232. }
  233. lbf.result = r
  234. lbf.err = err
  235. close(lbf.doneCh)
  236. return &pb.ReturnResponse{}, nil
  237. }
  238. func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
  239. lbf.mu.Lock()
  240. defer lbf.mu.Unlock()
  241. if lbf.result == nil && lbf.err == nil {
  242. return nil, errors.New("no result for incomplete build")
  243. }
  244. if lbf.err != nil {
  245. return nil, lbf.err
  246. }
  247. return lbf.result, nil
  248. }
  249. func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos) *llbBridgeForwarder {
  250. lbf := &llbBridgeForwarder{
  251. callCtx: ctx,
  252. llbBridge: llbBridge,
  253. refs: map[string]solver.CachedResult{},
  254. doneCh: make(chan struct{}),
  255. pipe: newPipe(),
  256. workers: workers,
  257. }
  258. return lbf
  259. }
  260. func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos) (*llbBridgeForwarder, context.Context, error) {
  261. ctx, cancel := context.WithCancel(ctx)
  262. lbf := NewBridgeForwarder(ctx, llbBridge, workers)
  263. server := grpc.NewServer()
  264. grpc_health_v1.RegisterHealthServer(server, health.NewServer())
  265. pb.RegisterLLBBridgeServer(server, lbf)
  266. go func() {
  267. serve(ctx, server, lbf.conn)
  268. select {
  269. case <-ctx.Done():
  270. default:
  271. lbf.isErrServerClosed = true
  272. }
  273. cancel()
  274. }()
  275. return lbf, ctx, nil
  276. }
  277. type pipe struct {
  278. Stdin io.ReadCloser
  279. Stdout io.WriteCloser
  280. conn net.Conn
  281. }
  282. func newPipe() *pipe {
  283. pr1, pw1, _ := os.Pipe()
  284. pr2, pw2, _ := os.Pipe()
  285. return &pipe{
  286. Stdin: pr1,
  287. Stdout: pw2,
  288. conn: &conn{
  289. Reader: pr2,
  290. Writer: pw1,
  291. Closer: pw2,
  292. },
  293. }
  294. }
  295. type conn struct {
  296. io.Reader
  297. io.Writer
  298. io.Closer
  299. }
  300. func (s *conn) LocalAddr() net.Addr {
  301. return dummyAddr{}
  302. }
  303. func (s *conn) RemoteAddr() net.Addr {
  304. return dummyAddr{}
  305. }
  306. func (s *conn) SetDeadline(t time.Time) error {
  307. return nil
  308. }
  309. func (s *conn) SetReadDeadline(t time.Time) error {
  310. return nil
  311. }
  312. func (s *conn) SetWriteDeadline(t time.Time) error {
  313. return nil
  314. }
  315. type dummyAddr struct {
  316. }
  317. func (d dummyAddr) Network() string {
  318. return "pipe"
  319. }
  320. func (d dummyAddr) String() string {
  321. return "localhost"
  322. }
  323. type LLBBridgeForwarder interface {
  324. pb.LLBBridgeServer
  325. Done() <-chan struct{}
  326. Result() (*frontend.Result, error)
  327. }
  328. type llbBridgeForwarder struct {
  329. mu sync.Mutex
  330. callCtx context.Context
  331. llbBridge frontend.FrontendLLBBridge
  332. refs map[string]solver.CachedResult
  333. // lastRef solver.CachedResult
  334. // lastRefs map[string]solver.CachedResult
  335. // err error
  336. doneCh chan struct{} // closed when result or err become valid through a call to a Return
  337. result *frontend.Result
  338. err error
  339. exporterAttr map[string][]byte
  340. workers frontend.WorkerInfos
  341. isErrServerClosed bool
  342. *pipe
  343. }
  344. func (lbf *llbBridgeForwarder) ResolveImageConfig(ctx context.Context, req *pb.ResolveImageConfigRequest) (*pb.ResolveImageConfigResponse, error) {
  345. ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
  346. var platform *specs.Platform
  347. if p := req.Platform; p != nil {
  348. platform = &specs.Platform{
  349. OS: p.OS,
  350. Architecture: p.Architecture,
  351. Variant: p.Variant,
  352. OSVersion: p.OSVersion,
  353. OSFeatures: p.OSFeatures,
  354. }
  355. }
  356. dgst, dt, err := lbf.llbBridge.ResolveImageConfig(ctx, req.Ref, gw.ResolveImageConfigOpt{
  357. Platform: platform,
  358. ResolveMode: req.ResolveMode,
  359. LogName: req.LogName,
  360. })
  361. if err != nil {
  362. return nil, err
  363. }
  364. return &pb.ResolveImageConfigResponse{
  365. Digest: dgst,
  366. Config: dt,
  367. }, nil
  368. }
  369. func translateLegacySolveRequest(req *pb.SolveRequest) error {
  370. // translates ImportCacheRefs to new CacheImports (v0.4.0)
  371. for _, legacyImportRef := range req.ImportCacheRefsDeprecated {
  372. im := &pb.CacheOptionsEntry{
  373. Type: "registry",
  374. Attrs: map[string]string{"ref": legacyImportRef},
  375. }
  376. // FIXME(AkihiroSuda): skip append if already exists
  377. req.CacheImports = append(req.CacheImports, im)
  378. }
  379. req.ImportCacheRefsDeprecated = nil
  380. return nil
  381. }
  382. func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest) (*pb.SolveResponse, error) {
  383. if err := translateLegacySolveRequest(req); err != nil {
  384. return nil, err
  385. }
  386. var cacheImports []frontend.CacheOptionsEntry
  387. for _, e := range req.CacheImports {
  388. cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
  389. Type: e.Type,
  390. Attrs: e.Attrs,
  391. })
  392. }
  393. ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
  394. res, err := lbf.llbBridge.Solve(ctx, frontend.SolveRequest{
  395. Definition: req.Definition,
  396. Frontend: req.Frontend,
  397. FrontendOpt: req.FrontendOpt,
  398. CacheImports: cacheImports,
  399. })
  400. if err != nil {
  401. return nil, err
  402. }
  403. if len(res.Refs) > 0 && !req.AllowResultReturn {
  404. // this should never happen because old client shouldn't make a map request
  405. return nil, errors.Errorf("solve did not return default result")
  406. }
  407. pbRes := &pb.Result{}
  408. var defaultID string
  409. lbf.mu.Lock()
  410. if res.Refs != nil {
  411. ids := make(map[string]string, len(res.Refs))
  412. for k, ref := range res.Refs {
  413. id := identity.NewID()
  414. if ref == nil {
  415. id = ""
  416. } else {
  417. lbf.refs[id] = ref
  418. }
  419. ids[k] = id
  420. }
  421. pbRes.Result = &pb.Result_Refs{Refs: &pb.RefMap{Refs: ids}}
  422. } else {
  423. id := identity.NewID()
  424. if res.Ref == nil {
  425. id = ""
  426. } else {
  427. lbf.refs[id] = res.Ref
  428. }
  429. defaultID = id
  430. pbRes.Result = &pb.Result_Ref{Ref: id}
  431. }
  432. lbf.mu.Unlock()
  433. // compatibility mode for older clients
  434. if req.Final {
  435. exp := map[string][]byte{}
  436. if err := json.Unmarshal(req.ExporterAttr, &exp); err != nil {
  437. return nil, err
  438. }
  439. for k, v := range res.Metadata {
  440. exp[k] = v
  441. }
  442. lbf.mu.Lock()
  443. lbf.result = &frontend.Result{
  444. Ref: lbf.refs[defaultID],
  445. Metadata: exp,
  446. }
  447. lbf.mu.Unlock()
  448. }
  449. resp := &pb.SolveResponse{
  450. Result: pbRes,
  451. }
  452. if !req.AllowResultReturn {
  453. resp.Ref = defaultID
  454. }
  455. return resp, nil
  456. }
  457. func (lbf *llbBridgeForwarder) ReadFile(ctx context.Context, req *pb.ReadFileRequest) (*pb.ReadFileResponse, error) {
  458. ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
  459. lbf.mu.Lock()
  460. ref, ok := lbf.refs[req.Ref]
  461. lbf.mu.Unlock()
  462. if !ok {
  463. return nil, errors.Errorf("no such ref: %v", req.Ref)
  464. }
  465. if ref == nil {
  466. return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.FilePath)
  467. }
  468. workerRef, ok := ref.Sys().(*worker.WorkerRef)
  469. if !ok {
  470. return nil, errors.Errorf("invalid ref: %T", ref.Sys())
  471. }
  472. newReq := cacheutil.ReadRequest{
  473. Filename: req.FilePath,
  474. }
  475. if r := req.Range; r != nil {
  476. newReq.Range = &cacheutil.FileRange{
  477. Offset: int(r.Offset),
  478. Length: int(r.Length),
  479. }
  480. }
  481. dt, err := cacheutil.ReadFile(ctx, workerRef.ImmutableRef, newReq)
  482. if err != nil {
  483. return nil, err
  484. }
  485. return &pb.ReadFileResponse{Data: dt}, nil
  486. }
  487. func (lbf *llbBridgeForwarder) ReadDir(ctx context.Context, req *pb.ReadDirRequest) (*pb.ReadDirResponse, error) {
  488. ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
  489. lbf.mu.Lock()
  490. ref, ok := lbf.refs[req.Ref]
  491. lbf.mu.Unlock()
  492. if !ok {
  493. return nil, errors.Errorf("no such ref: %v", req.Ref)
  494. }
  495. if ref == nil {
  496. return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.DirPath)
  497. }
  498. workerRef, ok := ref.Sys().(*worker.WorkerRef)
  499. if !ok {
  500. return nil, errors.Errorf("invalid ref: %T", ref.Sys())
  501. }
  502. newReq := cacheutil.ReadDirRequest{
  503. Path: req.DirPath,
  504. IncludePattern: req.IncludePattern,
  505. }
  506. entries, err := cacheutil.ReadDir(ctx, workerRef.ImmutableRef, newReq)
  507. if err != nil {
  508. return nil, err
  509. }
  510. return &pb.ReadDirResponse{Entries: entries}, nil
  511. }
  512. func (lbf *llbBridgeForwarder) StatFile(ctx context.Context, req *pb.StatFileRequest) (*pb.StatFileResponse, error) {
  513. ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
  514. lbf.mu.Lock()
  515. ref, ok := lbf.refs[req.Ref]
  516. lbf.mu.Unlock()
  517. if !ok {
  518. return nil, errors.Errorf("no such ref: %v", req.Ref)
  519. }
  520. if ref == nil {
  521. return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.Path)
  522. }
  523. workerRef, ok := ref.Sys().(*worker.WorkerRef)
  524. if !ok {
  525. return nil, errors.Errorf("invalid ref: %T", ref.Sys())
  526. }
  527. st, err := cacheutil.StatFile(ctx, workerRef.ImmutableRef, req.Path)
  528. if err != nil {
  529. return nil, err
  530. }
  531. return &pb.StatFileResponse{Stat: st}, nil
  532. }
  533. func (lbf *llbBridgeForwarder) Ping(context.Context, *pb.PingRequest) (*pb.PongResponse, error) {
  534. workers := lbf.workers.WorkerInfos()
  535. pbWorkers := make([]*apitypes.WorkerRecord, 0, len(workers))
  536. for _, w := range workers {
  537. pbWorkers = append(pbWorkers, &apitypes.WorkerRecord{
  538. ID: w.ID,
  539. Labels: w.Labels,
  540. Platforms: opspb.PlatformsFromSpec(w.Platforms),
  541. })
  542. }
  543. return &pb.PongResponse{
  544. FrontendAPICaps: pb.Caps.All(),
  545. Workers: pbWorkers,
  546. LLBCaps: opspb.Caps.All(),
  547. }, nil
  548. }
  549. func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) (*pb.ReturnResponse, error) {
  550. if in.Error != nil {
  551. return lbf.setResult(nil, status.ErrorProto(&spb.Status{
  552. Code: in.Error.Code,
  553. Message: in.Error.Message,
  554. // Details: in.Error.Details,
  555. }))
  556. } else {
  557. r := &frontend.Result{
  558. Metadata: in.Result.Metadata,
  559. }
  560. switch res := in.Result.Result.(type) {
  561. case *pb.Result_Ref:
  562. ref, err := lbf.convertRef(res.Ref)
  563. if err != nil {
  564. return nil, err
  565. }
  566. r.Ref = ref
  567. case *pb.Result_Refs:
  568. m := map[string]solver.CachedResult{}
  569. for k, v := range res.Refs.Refs {
  570. ref, err := lbf.convertRef(v)
  571. if err != nil {
  572. return nil, err
  573. }
  574. m[k] = ref
  575. }
  576. r.Refs = m
  577. }
  578. return lbf.setResult(r, nil)
  579. }
  580. }
  581. func (lbf *llbBridgeForwarder) convertRef(id string) (solver.CachedResult, error) {
  582. if id == "" {
  583. return nil, nil
  584. }
  585. lbf.mu.Lock()
  586. defer lbf.mu.Unlock()
  587. r, ok := lbf.refs[id]
  588. if !ok {
  589. return nil, errors.Errorf("return reference %s not found", id)
  590. }
  591. return r, nil
  592. }
  593. func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
  594. go func() {
  595. <-ctx.Done()
  596. conn.Close()
  597. }()
  598. logrus.Debugf("serving grpc connection")
  599. (&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
  600. }
  601. type markTypeFrontend struct{}
  602. func (*markTypeFrontend) SetImageOption(ii *llb.ImageInfo) {
  603. ii.RecordType = string(client.UsageRecordTypeFrontend)
  604. }