123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754 |
- package gateway
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/docker/distribution/reference"
- apitypes "github.com/moby/buildkit/api/types"
- "github.com/moby/buildkit/cache"
- cacheutil "github.com/moby/buildkit/cache/util"
- "github.com/moby/buildkit/client"
- "github.com/moby/buildkit/client/llb"
- "github.com/moby/buildkit/executor"
- "github.com/moby/buildkit/exporter/containerimage/exptypes"
- "github.com/moby/buildkit/frontend"
- pb "github.com/moby/buildkit/frontend/gateway/pb"
- "github.com/moby/buildkit/identity"
- "github.com/moby/buildkit/session"
- "github.com/moby/buildkit/solver"
- opspb "github.com/moby/buildkit/solver/pb"
- "github.com/moby/buildkit/util/apicaps"
- "github.com/moby/buildkit/util/tracing"
- "github.com/moby/buildkit/worker"
- specs "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "golang.org/x/net/http2"
- spb "google.golang.org/genproto/googleapis/rpc/status"
- "google.golang.org/grpc"
- "google.golang.org/grpc/health"
- "google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/status"
- )
- const (
- keySource = "source"
- keyDevel = "gateway-devel"
- )
- func NewGatewayFrontend(w frontend.WorkerInfos) frontend.Frontend {
- return &gatewayFrontend{
- workers: w,
- }
- }
- type gatewayFrontend struct {
- workers frontend.WorkerInfos
- }
- func filterPrefix(opts map[string]string, pfx string) map[string]string {
- m := map[string]string{}
- for k, v := range opts {
- if strings.HasPrefix(k, pfx) {
- m[strings.TrimPrefix(k, pfx)] = v
- }
- }
- return m
- }
- func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition) (*frontend.Result, error) {
- source, ok := opts[keySource]
- if !ok {
- return nil, errors.Errorf("no source specified for gateway")
- }
- sid := session.FromContext(ctx)
- _, isDevel := opts[keyDevel]
- var img specs.Image
- var rootFS cache.ImmutableRef
- var readonly bool // TODO: try to switch to read-only by default.
- if isDevel {
- devRes, err := llbBridge.Solve(session.NewContext(ctx, "gateway:"+sid),
- frontend.SolveRequest{
- Frontend: source,
- FrontendOpt: filterPrefix(opts, "gateway-"),
- FrontendInputs: inputs,
- })
- if err != nil {
- return nil, err
- }
- defer func() {
- devRes.EachRef(func(ref solver.ResultProxy) error {
- return ref.Release(context.TODO())
- })
- }()
- if devRes.Ref == nil {
- return nil, errors.Errorf("development gateway didn't return default result")
- }
- res, err := devRes.Ref.Result(ctx)
- if err != nil {
- return nil, err
- }
- workerRef, ok := res.Sys().(*worker.WorkerRef)
- if !ok {
- return nil, errors.Errorf("invalid ref: %T", res.Sys())
- }
- rootFS = workerRef.ImmutableRef
- config, ok := devRes.Metadata[exptypes.ExporterImageConfigKey]
- if ok {
- if err := json.Unmarshal(config, &img); err != nil {
- return nil, err
- }
- }
- } else {
- sourceRef, err := reference.ParseNormalizedNamed(source)
- if err != nil {
- return nil, err
- }
- dgst, config, err := llbBridge.ResolveImageConfig(ctx, reference.TagNameOnly(sourceRef).String(), llb.ResolveImageConfigOpt{})
- if err != nil {
- return nil, err
- }
- if err := json.Unmarshal(config, &img); err != nil {
- return nil, err
- }
- if dgst != "" {
- sourceRef, err = reference.WithDigest(sourceRef, dgst)
- if err != nil {
- return nil, err
- }
- }
- src := llb.Image(sourceRef.String(), &markTypeFrontend{})
- def, err := src.Marshal(ctx)
- if err != nil {
- return nil, err
- }
- res, err := llbBridge.Solve(ctx, frontend.SolveRequest{
- Definition: def.ToPB(),
- })
- if err != nil {
- return nil, err
- }
- defer func() {
- res.EachRef(func(ref solver.ResultProxy) error {
- return ref.Release(context.TODO())
- })
- }()
- if res.Ref == nil {
- return nil, errors.Errorf("gateway source didn't return default result")
- }
- r, err := res.Ref.Result(ctx)
- if err != nil {
- return nil, err
- }
- workerRef, ok := r.Sys().(*worker.WorkerRef)
- if !ok {
- return nil, errors.Errorf("invalid ref: %T", r.Sys())
- }
- rootFS = workerRef.ImmutableRef
- }
- lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs)
- defer lbf.conn.Close()
- if err != nil {
- return nil, err
- }
- args := []string{"/run"}
- env := []string{}
- cwd := "/"
- if img.Config.Env != nil {
- env = img.Config.Env
- }
- if img.Config.Entrypoint != nil {
- args = img.Config.Entrypoint
- }
- if img.Config.WorkingDir != "" {
- cwd = img.Config.WorkingDir
- }
- i := 0
- for k, v := range opts {
- env = append(env, fmt.Sprintf("BUILDKIT_FRONTEND_OPT_%d", i)+"="+k+"="+v)
- i++
- }
- env = append(env, "BUILDKIT_SESSION_ID="+sid)
- dt, err := json.Marshal(gf.workers.WorkerInfos())
- if err != nil {
- return nil, errors.Wrap(err, "failed to marshal workers array")
- }
- env = append(env, "BUILDKIT_WORKERS="+string(dt))
- defer lbf.Discard()
- env = append(env, "BUILDKIT_EXPORTEDPRODUCT="+apicaps.ExportedProduct)
- meta := executor.Meta{
- Env: env,
- Args: args,
- Cwd: cwd,
- ReadonlyRootFS: readonly,
- }
- if v, ok := img.Config.Labels["moby.buildkit.frontend.network.none"]; ok {
- if ok, _ := strconv.ParseBool(v); ok {
- meta.NetMode = opspb.NetMode_NONE
- }
- }
- err = llbBridge.Exec(ctx, meta, rootFS, lbf.Stdin, lbf.Stdout, os.Stderr)
- if err != nil {
- if errors.Cause(err) == context.Canceled && lbf.isErrServerClosed {
- err = errors.Errorf("frontend grpc server closed unexpectedly")
- }
- // An existing error (set via Return rpc) takes
- // precedence over this error, which in turn takes
- // precedence over a success reported via Return.
- lbf.mu.Lock()
- if lbf.err == nil {
- lbf.result = nil
- lbf.err = err
- }
- lbf.mu.Unlock()
- }
- return lbf.Result()
- }
- func (lbf *llbBridgeForwarder) Discard() {
- lbf.mu.Lock()
- defer lbf.mu.Unlock()
- for id, r := range lbf.refs {
- if lbf.err == nil && lbf.result != nil {
- keep := false
- lbf.result.EachRef(func(r2 solver.ResultProxy) error {
- if r == r2 {
- keep = true
- }
- return nil
- })
- if keep {
- continue
- }
- }
- r.Release(context.TODO())
- delete(lbf.refs, id)
- }
- }
- func (lbf *llbBridgeForwarder) Done() <-chan struct{} {
- return lbf.doneCh
- }
- func (lbf *llbBridgeForwarder) setResult(r *frontend.Result, err error) (*pb.ReturnResponse, error) {
- lbf.mu.Lock()
- defer lbf.mu.Unlock()
- if (r == nil) == (err == nil) {
- return nil, errors.New("gateway return must be either result or err")
- }
- if lbf.result != nil || lbf.err != nil {
- return nil, errors.New("gateway result is already set")
- }
- lbf.result = r
- lbf.err = err
- close(lbf.doneCh)
- return &pb.ReturnResponse{}, nil
- }
- func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
- lbf.mu.Lock()
- defer lbf.mu.Unlock()
- if lbf.result == nil && lbf.err == nil {
- return nil, errors.New("no result for incomplete build")
- }
- if lbf.err != nil {
- return nil, lbf.err
- }
- return lbf.result, nil
- }
- func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) *llbBridgeForwarder {
- lbf := &llbBridgeForwarder{
- callCtx: ctx,
- llbBridge: llbBridge,
- refs: map[string]solver.ResultProxy{},
- doneCh: make(chan struct{}),
- pipe: newPipe(),
- workers: workers,
- inputs: inputs,
- }
- return lbf
- }
- func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) (*llbBridgeForwarder, context.Context, error) {
- ctx, cancel := context.WithCancel(ctx)
- lbf := NewBridgeForwarder(ctx, llbBridge, workers, inputs)
- server := grpc.NewServer()
- grpc_health_v1.RegisterHealthServer(server, health.NewServer())
- pb.RegisterLLBBridgeServer(server, lbf)
- go func() {
- serve(ctx, server, lbf.conn)
- select {
- case <-ctx.Done():
- default:
- lbf.isErrServerClosed = true
- }
- cancel()
- }()
- return lbf, ctx, nil
- }
- type pipe struct {
- Stdin io.ReadCloser
- Stdout io.WriteCloser
- conn net.Conn
- }
- func newPipe() *pipe {
- pr1, pw1, _ := os.Pipe()
- pr2, pw2, _ := os.Pipe()
- return &pipe{
- Stdin: pr1,
- Stdout: pw2,
- conn: &conn{
- Reader: pr2,
- Writer: pw1,
- Closer: pw2,
- },
- }
- }
- type conn struct {
- io.Reader
- io.Writer
- io.Closer
- }
- func (s *conn) LocalAddr() net.Addr {
- return dummyAddr{}
- }
- func (s *conn) RemoteAddr() net.Addr {
- return dummyAddr{}
- }
- func (s *conn) SetDeadline(t time.Time) error {
- return nil
- }
- func (s *conn) SetReadDeadline(t time.Time) error {
- return nil
- }
- func (s *conn) SetWriteDeadline(t time.Time) error {
- return nil
- }
- type dummyAddr struct {
- }
- func (d dummyAddr) Network() string {
- return "pipe"
- }
- func (d dummyAddr) String() string {
- return "localhost"
- }
- type LLBBridgeForwarder interface {
- pb.LLBBridgeServer
- Done() <-chan struct{}
- Result() (*frontend.Result, error)
- }
- type llbBridgeForwarder struct {
- mu sync.Mutex
- callCtx context.Context
- llbBridge frontend.FrontendLLBBridge
- refs map[string]solver.ResultProxy
- // lastRef solver.CachedResult
- // lastRefs map[string]solver.CachedResult
- // err error
- doneCh chan struct{} // closed when result or err become valid through a call to a Return
- result *frontend.Result
- err error
- exporterAttr map[string][]byte
- workers frontend.WorkerInfos
- inputs map[string]*opspb.Definition
- isErrServerClosed bool
- *pipe
- }
- func (lbf *llbBridgeForwarder) ResolveImageConfig(ctx context.Context, req *pb.ResolveImageConfigRequest) (*pb.ResolveImageConfigResponse, error) {
- ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
- var platform *specs.Platform
- if p := req.Platform; p != nil {
- platform = &specs.Platform{
- OS: p.OS,
- Architecture: p.Architecture,
- Variant: p.Variant,
- OSVersion: p.OSVersion,
- OSFeatures: p.OSFeatures,
- }
- }
- dgst, dt, err := lbf.llbBridge.ResolveImageConfig(ctx, req.Ref, llb.ResolveImageConfigOpt{
- Platform: platform,
- ResolveMode: req.ResolveMode,
- LogName: req.LogName,
- })
- if err != nil {
- return nil, err
- }
- return &pb.ResolveImageConfigResponse{
- Digest: dgst,
- Config: dt,
- }, nil
- }
- func translateLegacySolveRequest(req *pb.SolveRequest) error {
- // translates ImportCacheRefs to new CacheImports (v0.4.0)
- for _, legacyImportRef := range req.ImportCacheRefsDeprecated {
- im := &pb.CacheOptionsEntry{
- Type: "registry",
- Attrs: map[string]string{"ref": legacyImportRef},
- }
- // FIXME(AkihiroSuda): skip append if already exists
- req.CacheImports = append(req.CacheImports, im)
- }
- req.ImportCacheRefsDeprecated = nil
- return nil
- }
- func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest) (*pb.SolveResponse, error) {
- if err := translateLegacySolveRequest(req); err != nil {
- return nil, err
- }
- var cacheImports []frontend.CacheOptionsEntry
- for _, e := range req.CacheImports {
- cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
- Type: e.Type,
- Attrs: e.Attrs,
- })
- }
- ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
- res, err := lbf.llbBridge.Solve(ctx, frontend.SolveRequest{
- Definition: req.Definition,
- Frontend: req.Frontend,
- FrontendOpt: req.FrontendOpt,
- FrontendInputs: req.FrontendInputs,
- CacheImports: cacheImports,
- })
- if err != nil {
- return nil, err
- }
- if len(res.Refs) > 0 && !req.AllowResultReturn {
- // this should never happen because old client shouldn't make a map request
- return nil, errors.Errorf("solve did not return default result")
- }
- pbRes := &pb.Result{}
- var defaultID string
- lbf.mu.Lock()
- if res.Refs != nil {
- ids := make(map[string]string, len(res.Refs))
- defs := make(map[string]*opspb.Definition, len(res.Refs))
- for k, ref := range res.Refs {
- id := identity.NewID()
- if ref == nil {
- id = ""
- } else {
- lbf.refs[id] = ref
- }
- ids[k] = id
- defs[k] = ref.Definition()
- }
- if req.AllowResultArrayRef {
- refMap := make(map[string]*pb.Ref, len(res.Refs))
- for k, id := range ids {
- refMap[k] = &pb.Ref{Id: id, Def: defs[k]}
- }
- pbRes.Result = &pb.Result_Refs{Refs: &pb.RefMap{Refs: refMap}}
- } else {
- pbRes.Result = &pb.Result_RefsDeprecated{RefsDeprecated: &pb.RefMapDeprecated{Refs: ids}}
- }
- } else {
- ref := res.Ref
- id := identity.NewID()
- var def *opspb.Definition
- if ref == nil {
- id = ""
- } else {
- def = ref.Definition()
- lbf.refs[id] = ref
- }
- defaultID = id
- if req.AllowResultArrayRef {
- pbRes.Result = &pb.Result_Ref{Ref: &pb.Ref{Id: id, Def: def}}
- } else {
- pbRes.Result = &pb.Result_RefDeprecated{RefDeprecated: id}
- }
- }
- lbf.mu.Unlock()
- // compatibility mode for older clients
- if req.Final {
- exp := map[string][]byte{}
- if err := json.Unmarshal(req.ExporterAttr, &exp); err != nil {
- return nil, err
- }
- for k, v := range res.Metadata {
- exp[k] = v
- }
- lbf.mu.Lock()
- lbf.result = &frontend.Result{
- Ref: lbf.refs[defaultID],
- Metadata: exp,
- }
- lbf.mu.Unlock()
- }
- resp := &pb.SolveResponse{
- Result: pbRes,
- }
- if !req.AllowResultReturn {
- resp.Ref = defaultID
- }
- return resp, nil
- }
- func (lbf *llbBridgeForwarder) ReadFile(ctx context.Context, req *pb.ReadFileRequest) (*pb.ReadFileResponse, error) {
- ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
- lbf.mu.Lock()
- ref, ok := lbf.refs[req.Ref]
- lbf.mu.Unlock()
- if !ok {
- return nil, errors.Errorf("no such ref: %v", req.Ref)
- }
- if ref == nil {
- return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.FilePath)
- }
- r, err := ref.Result(ctx)
- if err != nil {
- return nil, err
- }
- workerRef, ok := r.Sys().(*worker.WorkerRef)
- if !ok {
- return nil, errors.Errorf("invalid ref: %T", r.Sys())
- }
- newReq := cacheutil.ReadRequest{
- Filename: req.FilePath,
- }
- if r := req.Range; r != nil {
- newReq.Range = &cacheutil.FileRange{
- Offset: int(r.Offset),
- Length: int(r.Length),
- }
- }
- dt, err := cacheutil.ReadFile(ctx, workerRef.ImmutableRef, newReq)
- if err != nil {
- return nil, err
- }
- return &pb.ReadFileResponse{Data: dt}, nil
- }
- func (lbf *llbBridgeForwarder) ReadDir(ctx context.Context, req *pb.ReadDirRequest) (*pb.ReadDirResponse, error) {
- ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
- lbf.mu.Lock()
- ref, ok := lbf.refs[req.Ref]
- lbf.mu.Unlock()
- if !ok {
- return nil, errors.Errorf("no such ref: %v", req.Ref)
- }
- if ref == nil {
- return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.DirPath)
- }
- r, err := ref.Result(ctx)
- if err != nil {
- return nil, err
- }
- workerRef, ok := r.Sys().(*worker.WorkerRef)
- if !ok {
- return nil, errors.Errorf("invalid ref: %T", r.Sys())
- }
- newReq := cacheutil.ReadDirRequest{
- Path: req.DirPath,
- IncludePattern: req.IncludePattern,
- }
- entries, err := cacheutil.ReadDir(ctx, workerRef.ImmutableRef, newReq)
- if err != nil {
- return nil, err
- }
- return &pb.ReadDirResponse{Entries: entries}, nil
- }
- func (lbf *llbBridgeForwarder) StatFile(ctx context.Context, req *pb.StatFileRequest) (*pb.StatFileResponse, error) {
- ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
- lbf.mu.Lock()
- ref, ok := lbf.refs[req.Ref]
- lbf.mu.Unlock()
- if !ok {
- return nil, errors.Errorf("no such ref: %v", req.Ref)
- }
- if ref == nil {
- return nil, errors.Wrapf(os.ErrNotExist, "%s not found", req.Path)
- }
- r, err := ref.Result(ctx)
- if err != nil {
- return nil, err
- }
- workerRef, ok := r.Sys().(*worker.WorkerRef)
- if !ok {
- return nil, errors.Errorf("invalid ref: %T", r.Sys())
- }
- st, err := cacheutil.StatFile(ctx, workerRef.ImmutableRef, req.Path)
- if err != nil {
- return nil, err
- }
- return &pb.StatFileResponse{Stat: st}, nil
- }
- func (lbf *llbBridgeForwarder) Ping(context.Context, *pb.PingRequest) (*pb.PongResponse, error) {
- workers := lbf.workers.WorkerInfos()
- pbWorkers := make([]*apitypes.WorkerRecord, 0, len(workers))
- for _, w := range workers {
- pbWorkers = append(pbWorkers, &apitypes.WorkerRecord{
- ID: w.ID,
- Labels: w.Labels,
- Platforms: opspb.PlatformsFromSpec(w.Platforms),
- })
- }
- return &pb.PongResponse{
- FrontendAPICaps: pb.Caps.All(),
- Workers: pbWorkers,
- LLBCaps: opspb.Caps.All(),
- }, nil
- }
- func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) (*pb.ReturnResponse, error) {
- if in.Error != nil {
- return lbf.setResult(nil, status.ErrorProto(&spb.Status{
- Code: in.Error.Code,
- Message: in.Error.Message,
- // Details: in.Error.Details,
- }))
- } else {
- r := &frontend.Result{
- Metadata: in.Result.Metadata,
- }
- switch res := in.Result.Result.(type) {
- case *pb.Result_RefDeprecated:
- ref, err := lbf.convertRef(res.RefDeprecated)
- if err != nil {
- return nil, err
- }
- r.Ref = ref
- case *pb.Result_RefsDeprecated:
- m := map[string]solver.ResultProxy{}
- for k, id := range res.RefsDeprecated.Refs {
- ref, err := lbf.convertRef(id)
- if err != nil {
- return nil, err
- }
- m[k] = ref
- }
- r.Refs = m
- case *pb.Result_Ref:
- ref, err := lbf.convertRef(res.Ref.Id)
- if err != nil {
- return nil, err
- }
- r.Ref = ref
- case *pb.Result_Refs:
- m := map[string]solver.ResultProxy{}
- for k, ref := range res.Refs.Refs {
- ref, err := lbf.convertRef(ref.Id)
- if err != nil {
- return nil, err
- }
- m[k] = ref
- }
- r.Refs = m
- }
- return lbf.setResult(r, nil)
- }
- }
- func (lbf *llbBridgeForwarder) Inputs(ctx context.Context, in *pb.InputsRequest) (*pb.InputsResponse, error) {
- return &pb.InputsResponse{
- Definitions: lbf.inputs,
- }, nil
- }
- func (lbf *llbBridgeForwarder) convertRef(id string) (solver.ResultProxy, error) {
- if id == "" {
- return nil, nil
- }
- lbf.mu.Lock()
- defer lbf.mu.Unlock()
- r, ok := lbf.refs[id]
- if !ok {
- return nil, errors.Errorf("return reference %s not found", id)
- }
- return r, nil
- }
- func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
- go func() {
- <-ctx.Done()
- conn.Close()
- }()
- logrus.Debugf("serving grpc connection")
- (&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
- }
- type markTypeFrontend struct{}
- func (*markTypeFrontend) SetImageOption(ii *llb.ImageInfo) {
- ii.RecordType = string(client.UsageRecordTypeFrontend)
- }
|