|
@@ -86,7 +86,7 @@ func filterPrefix(opts map[string]string, pfx string) map[string]string {
|
|
|
return m
|
|
|
}
|
|
|
|
|
|
-func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) {
|
|
|
+func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) {
|
|
|
source, ok := opts[keySource]
|
|
|
if !ok {
|
|
|
return nil, errors.Errorf("no source specified for gateway")
|
|
@@ -141,7 +141,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm)
|
|
|
+ c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, exec, opts, inputs, gf.workers, sid, sm)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -281,18 +281,13 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid, sm)
|
|
|
+ lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, exec, gf.workers, inputs, sid, sm)
|
|
|
defer lbf.conn.Close() //nolint
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
defer lbf.Discard()
|
|
|
|
|
|
- w, err := gf.workers.GetDefault()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
mdmnt, release, err := metadataMount(frontendDef)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -305,7 +300,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
|
|
|
mnts = append(mnts, *mdmnt)
|
|
|
}
|
|
|
|
|
|
- _, err = w.Executor().Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
|
|
|
+ _, err = exec.Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
|
|
|
if err != nil {
|
|
|
if errdefs.IsCanceled(ctx, err) && lbf.isErrServerClosed {
|
|
|
err = errors.Errorf("frontend grpc server closed unexpectedly")
|
|
@@ -434,11 +429,11 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
|
|
|
return lbf.result, nil
|
|
|
}
|
|
|
|
|
|
-func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
|
|
|
- return newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
|
|
|
+func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
|
|
|
+ return newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm)
|
|
|
}
|
|
|
|
|
|
-func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
|
|
|
+func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
|
|
|
lbf := &llbBridgeForwarder{
|
|
|
callCtx: ctx,
|
|
|
llbBridge: llbBridge,
|
|
@@ -451,13 +446,14 @@ func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
|
|
|
sid: sid,
|
|
|
sm: sm,
|
|
|
ctrs: map[string]gwclient.Container{},
|
|
|
+ executor: exec,
|
|
|
}
|
|
|
return lbf
|
|
|
}
|
|
|
|
|
|
-func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
|
|
|
+func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
- lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
|
|
|
+ lbf := newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm)
|
|
|
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
|
|
|
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
|
|
|
pb.RegisterLLBBridgeServer(server, lbf)
|
|
@@ -552,6 +548,7 @@ type llbBridgeForwarder struct {
|
|
|
isErrServerClosed bool
|
|
|
sid string
|
|
|
sm *session.Manager
|
|
|
+ executor executor.Executor
|
|
|
*pipe
|
|
|
ctrs map[string]gwclient.Container
|
|
|
ctrsMu sync.Mutex
|
|
@@ -646,12 +643,21 @@ func (lbf *llbBridgeForwarder) registerResultIDs(results ...solver.Result) (ids
|
|
|
func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest) (*pb.SolveResponse, error) {
|
|
|
var cacheImports []frontend.CacheOptionsEntry
|
|
|
for _, e := range req.CacheImports {
|
|
|
+ if e == nil {
|
|
|
+ return nil, errors.Errorf("invalid nil cache import")
|
|
|
+ }
|
|
|
cacheImports = append(cacheImports, frontend.CacheOptionsEntry{
|
|
|
Type: e.Type,
|
|
|
Attrs: e.Attrs,
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ for _, p := range req.SourcePolicies {
|
|
|
+ if p == nil {
|
|
|
+ return nil, errors.Errorf("invalid nil source policy")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
ctx = tracing.ContextWithSpanFromContext(ctx, lbf.callCtx)
|
|
|
res, err := lbf.llbBridge.Solve(ctx, frontend.SolveRequest{
|
|
|
Evaluate: req.Evaluate,
|
|
@@ -1033,7 +1039,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
|
|
|
// and we want the context to live for the duration of the container.
|
|
|
group := session.NewGroup(lbf.sid)
|
|
|
|
|
|
- w, err := lbf.workers.GetDefault()
|
|
|
+ cm, err := lbf.workers.DefaultCacheManager()
|
|
|
if err != nil {
|
|
|
return nil, stack.Enable(err)
|
|
|
}
|
|
@@ -1043,7 +1049,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
|
|
|
return nil, stack.Enable(err)
|
|
|
}
|
|
|
|
|
|
- ctr, err := container.NewContainer(context.Background(), w, lbf.sm, group, ctrReq)
|
|
|
+ ctr, err := container.NewContainer(context.Background(), cm, lbf.executor, lbf.sm, group, ctrReq)
|
|
|
if err != nil {
|
|
|
return nil, stack.Enable(err)
|
|
|
}
|
|
@@ -1077,6 +1083,12 @@ func (lbf *llbBridgeForwarder) ReleaseContainer(ctx context.Context, in *pb.Rele
|
|
|
}
|
|
|
|
|
|
func (lbf *llbBridgeForwarder) Warn(ctx context.Context, in *pb.WarnRequest) (*pb.WarnResponse, error) {
|
|
|
+ // validate ranges are valid
|
|
|
+ for _, r := range in.Ranges {
|
|
|
+ if r == nil {
|
|
|
+ return nil, status.Errorf(codes.InvalidArgument, "invalid source range")
|
|
|
+ }
|
|
|
+ }
|
|
|
err := lbf.llbBridge.Warn(ctx, in.Digest, string(in.Short), frontend.WarnOpts{
|
|
|
Level: int(in.Level),
|
|
|
SourceInfo: in.Info,
|