builder: experimental buildkit base

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2018-04-17 15:50:17 -07:00
parent 27fa0e8a7b
commit 22f7caee03
8 changed files with 942 additions and 110 deletions

View file

@ -3,11 +3,13 @@ package build // import "github.com/docker/docker/api/server/backend/build"
import (
"context"
"fmt"
"strings"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/builder"
buildkit "github.com/docker/docker/builder/builder-next"
"github.com/docker/docker/builder/fscache"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/stringid"
@ -30,24 +32,39 @@ type Backend struct {
builder Builder
fsCache *fscache.FSCache
imageComponent ImageComponent
buildkit *buildkit.Builder
}
// NewBackend creates a new build backend from components
func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache) (*Backend, error) {
return &Backend{imageComponent: components, builder: builder, fsCache: fsCache}, nil
func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache, buildkit *buildkit.Builder) (*Backend, error) {
return &Backend{imageComponent: components, builder: builder, fsCache: fsCache, buildkit: buildkit}, nil
}
// Build builds an image from a Source
func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string, error) {
options := config.Options
useBuildKit := false
if strings.HasPrefix(options.SessionID, "buildkit:") {
useBuildKit = true
options.SessionID = strings.TrimPrefix(options.SessionID, "buildkit:")
}
tagger, err := NewTagger(b.imageComponent, config.ProgressWriter.StdoutFormatter, options.Tags)
if err != nil {
return "", err
}
build, err := b.builder.Build(ctx, config)
if err != nil {
return "", err
var build *builder.Result
if useBuildKit {
build, err = b.buildkit.Build(ctx, config)
if err != nil {
return "", err
}
} else {
build, err = b.builder.Build(ctx, config)
if err != nil {
return "", err
}
}
var imageID = build.ImageID

View file

@ -0,0 +1,416 @@
package buildkit
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"sync"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/builder"
"github.com/docker/docker/builder/builder-next/containerimage"
containerimageexp "github.com/docker/docker/builder/builder-next/exporter"
"github.com/docker/docker/builder/builder-next/snapshot"
mobyworker "github.com/docker/docker/builder/builder-next/worker"
"github.com/docker/docker/daemon/graphdriver"
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/pkg/jsonmessage"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/executor/runcexecutor"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/dockerfile"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot/blobmapping"
"github.com/moby/buildkit/solver-next/boltdbcachestorage"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
netcontext "golang.org/x/net/context"
"golang.org/x/sync/errgroup"
grpcmetadata "google.golang.org/grpc/metadata"
)
// Builder defines interface for running a build
// type Builder interface {
// Build(context.Context, backend.BuildConfig) (*builder.Result, error)
// }
// Result is the output produced by a Builder
// type Result struct {
// ImageID string
// // FromImage Image
// }
type Opt struct {
SessionManager *session.Manager
Root string
Dist images.DistributionServices
}
type Builder struct {
controller *control.Controller
results *results
}
func New(opt Opt) (*Builder, error) {
results := newResultsGetter()
c, err := newController(opt, results.ch)
if err != nil {
return nil, err
}
b := &Builder{
controller: c,
results: results,
}
return b, nil
}
func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
id := identity.NewID()
attrs := map[string]string{
"ref": id,
}
frontendAttrs := map[string]string{}
if opt.Options.Target != "" {
frontendAttrs["target"] = opt.Options.Target
}
if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." {
frontendAttrs["filename"] = opt.Options.Dockerfile
}
if opt.Options.RemoteContext != "" {
frontendAttrs["context"] = opt.Options.RemoteContext
}
logrus.Debugf("frontend: %+v", frontendAttrs)
for k, v := range opt.Options.BuildArgs {
if v == nil {
continue
}
frontendAttrs["build-arg:"+k] = *v
}
req := &controlapi.SolveRequest{
Ref: id,
Exporter: "image",
ExporterAttrs: attrs,
Frontend: "dockerfile.v0",
FrontendAttrs: frontendAttrs,
Session: opt.Options.SessionID,
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
_, err := b.controller.Solve(ctx, req)
return err
})
ch := make(chan *controlapi.StatusResponse)
eg.Go(func() error {
defer close(ch)
return b.controller.Status(&controlapi.StatusRequest{
Ref: id,
}, &statusProxy{ctx: ctx, ch: ch})
})
eg.Go(func() error {
for sr := range ch {
dt, err := sr.Marshal()
if err != nil {
return err
}
auxJSONBytes, err := json.Marshal(dt)
if err != nil {
return err
}
auxJSON := new(json.RawMessage)
*auxJSON = auxJSONBytes
msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "buildkit-trace", Aux: auxJSON})
if err != nil {
return err
}
msgJSON = append(msgJSON, []byte("\r\n")...)
n, err := opt.ProgressWriter.Output.Write(msgJSON)
if err != nil {
return err
}
if n != len(msgJSON) {
return io.ErrShortWrite
}
}
return nil
})
out := &builder.Result{}
eg.Go(func() error {
res, err := b.results.wait(ctx, id)
if err != nil {
return err
}
out.ImageID = string(res.ID)
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
return out, nil
}
func newController(opt Opt, reporter chan containerimageexp.Result) (*control.Controller, error) {
if err := os.MkdirAll(opt.Root, 0700); err != nil {
return nil, err
}
dist := opt.Dist
root := opt.Root
var driver graphdriver.Driver
if ls, ok := dist.LayerStore.(interface {
Driver() graphdriver.Driver
}); ok {
driver = ls.Driver()
} else {
return nil, errors.Errorf("could not access graphdriver")
}
sbase, err := snapshot.NewSnapshotter(snapshot.Opt{
GraphDriver: driver,
LayerStore: dist.LayerStore,
Root: root,
})
if err != nil {
return nil, err
}
store, err := local.NewStore(filepath.Join(root, "content"))
if err != nil {
return nil, err
}
store = &contentStoreNoLabels{store}
md, err := metadata.NewStore(filepath.Join(root, "metadata.db"))
if err != nil {
return nil, err
}
snapshotter := blobmapping.NewSnapshotter(blobmapping.Opt{
Content: store,
Snapshotter: sbase,
MetadataStore: md,
})
cm, err := cache.NewManager(cache.ManagerOpt{
Snapshotter: snapshotter,
MetadataStore: md,
})
if err != nil {
return nil, err
}
src, err := containerimage.NewSource(containerimage.SourceOpt{
SessionManager: opt.SessionManager,
CacheAccessor: cm,
ContentStore: store,
DownloadManager: dist.DownloadManager,
MetadataStore: dist.V2MetadataService,
})
if err != nil {
return nil, err
}
exec, err := runcexecutor.New(runcexecutor.Opt{
Root: filepath.Join(root, "executor"),
CommandCandidates: []string{"docker-runc", "runc"},
})
if err != nil {
return nil, err
}
differ, ok := sbase.(containerimageexp.Differ)
if !ok {
return nil, errors.Errorf("snapshotter doesn't support differ")
}
exp, err := containerimageexp.New(containerimageexp.Opt{
ImageStore: dist.ImageStore,
ReferenceStore: dist.ReferenceStore,
Differ: differ,
Reporter: reporter,
})
if err != nil {
return nil, err
}
cacheStorage, err := boltdbcachestorage.NewStore(filepath.Join(opt.Root, "cache.db"))
if err != nil {
return nil, err
}
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = dockerfile.NewDockerfileFrontend()
// frontends["gateway.v0"] = gateway.NewGatewayFrontend()
// mdb := ctdmetadata.NewDB(db, c, map[string]ctdsnapshot.Snapshotter{
// "moby": s,
// })
// if err := mdb.Init(context.TODO()); err != nil {
// return opt, err
// }
//
// throttledGC := throttle.Throttle(time.Second, func() {
// if _, err := mdb.GarbageCollect(context.TODO()); err != nil {
// logrus.Errorf("GC error: %+v", err)
// }
// })
//
// gc := func(ctx context.Context) error {
// throttledGC()
// return nil
// }
wopt := mobyworker.WorkerOpt{
ID: "moby",
SessionManager: opt.SessionManager,
MetadataStore: md,
ContentStore: store,
CacheManager: cm,
Snapshotter: snapshotter,
Executor: exec,
ImageSource: src,
Exporters: map[string]exporter.Exporter{
"image": exp,
},
}
wc := &worker.Controller{}
w, err := mobyworker.NewWorker(wopt)
if err != nil {
return nil, err
}
wc.Add(w)
return control.NewController(control.Opt{
SessionManager: opt.SessionManager,
WorkerController: wc,
Frontends: frontends,
CacheKeyStorage: cacheStorage,
// CacheExporter: ce,
// CacheImporter: ci,
})
}
type statusProxy struct {
ctx context.Context
ch chan *controlapi.StatusResponse
}
func (sp *statusProxy) SetHeader(_ grpcmetadata.MD) error {
return nil
}
func (sp *statusProxy) SendHeader(_ grpcmetadata.MD) error {
return nil
}
func (sp *statusProxy) SetTrailer(_ grpcmetadata.MD) {
}
func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error {
return sp.SendMsg(resp)
}
func (sp *statusProxy) Context() netcontext.Context {
return sp.ctx
}
func (sp *statusProxy) SendMsg(m interface{}) error {
if sr, ok := m.(*controlapi.StatusResponse); ok {
sp.ch <- sr
}
return nil
}
func (sp *statusProxy) RecvMsg(m interface{}) error {
return io.EOF
}
type results struct {
ch chan containerimageexp.Result
res map[string]containerimageexp.Result
mu sync.Mutex
cond *sync.Cond
}
func newResultsGetter() *results {
r := &results{
ch: make(chan containerimageexp.Result),
res: map[string]containerimageexp.Result{},
}
r.cond = sync.NewCond(&r.mu)
go func() {
for res := range r.ch {
r.mu.Lock()
r.res[res.Ref] = res
r.cond.Broadcast()
r.mu.Unlock()
}
}()
return r
}
func (r *results) wait(ctx context.Context, ref string) (*containerimageexp.Result, error) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
r.mu.Lock()
r.cond.Broadcast()
r.mu.Unlock()
case <-done:
}
}()
r.mu.Lock()
for {
select {
case <-ctx.Done():
r.mu.Unlock()
return nil, ctx.Err()
default:
}
res, ok := r.res[ref]
if ok {
r.mu.Unlock()
return &res, nil
}
r.cond.Wait()
}
}
type contentStoreNoLabels struct {
content.Store
}
func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, nil
}

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"runtime"
"sync"
"time"
@ -22,7 +23,6 @@ import (
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
pkgprogress "github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/moby/buildkit/cache"
@ -36,8 +36,8 @@ import (
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
netcontext "golang.org/x/net/context"
"golang.org/x/time/rate"
)
type SourceOpt struct {
@ -92,23 +92,22 @@ func (is *imageSource) getCredentialsFromSession(ctx context.Context) func(strin
}
func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
// type t struct {
// dgst digest.Digest
// dt []byte
// }
// res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) {
// dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore)
// if err != nil {
// return nil, err
// }
// return &t{dgst: dgst, dt: dt}, nil
// })
// if err != nil {
// return "", nil, err
// }
// typed := res.(*t)
// return typed.dgst, typed.dt, nil
return "", nil, errors.Errorf("not-implemented")
type t struct {
dgst digest.Digest
dt []byte
}
res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) {
dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore, "")
if err != nil {
return nil, err
}
return &t{dgst: dgst, dt: dt}, nil
})
if err != nil {
return "", nil, err
}
typed := res.(*t)
return typed.dgst, typed.dt, nil
}
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
@ -189,7 +188,17 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
pctx, stopProgress := context.WithCancel(ctx)
go showProgress(pctx, ongoing, p.is.ContentStore)
pw, _, ctx := progress.FromContext(ctx)
defer pw.Close()
progressDone := make(chan struct{})
go func() {
showProgress(pctx, ongoing, p.is.ContentStore, pw)
close(progressDone)
}()
defer func() {
<-progressDone
}()
fetcher, err := p.resolver.Fetcher(ctx, p.ref)
if err != nil {
@ -213,14 +222,33 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
return nil, nil
}),
}
// var schema1Converter *schema1.Converter
// if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
// schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
// handlers = append(handlers, schema1Converter)
// } else {
// handlers = append(handlers,
// remotes.FetchHandler(p.is.ContentStore, fetcher),
//
// images.ChildrenHandler(p.is.ContentStore),
// )
// }
//
var schema1Converter *schema1.Converter
if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
handlers = append(handlers, schema1Converter)
} else {
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(p.is.ContentStore)
// Set any children labels for that content
childrenHandler = images.SetChildrenLabels(p.is.ContentStore, childrenHandler)
// Filter the childen by the platform
childrenHandler = images.FilterPlatforms(childrenHandler, platforms.Default())
handlers = append(handlers,
remotes.FetchHandler(p.is.ContentStore, fetcher),
images.ChildrenHandler(p.is.ContentStore, platforms.Default()),
childrenHandler,
)
}
@ -228,7 +256,7 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
stopProgress()
return nil, err
}
stopProgress()
defer stopProgress()
mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platforms.Default())
if err != nil {
@ -255,16 +283,41 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
}
pchan := make(chan pkgprogress.Progress, 10)
defer close(pchan)
go func() {
m := map[string]struct {
st time.Time
limiter *rate.Limiter
}{}
for p := range pchan {
logrus.Debugf("progress %+v", p)
if p.Action == "Extracting" {
st, ok := m[p.ID]
if !ok {
st.st = time.Now()
st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
m[p.ID] = st
}
var end *time.Time
if p.LastUpdate || st.limiter.Allow() {
if p.LastUpdate {
tm := time.Now()
end = &tm
}
pw.Write("extracting "+p.ID, progress.Status{
Action: "extract",
Started: &st.st,
Completed: end,
})
}
}
}
}()
layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers))
for i, desc := range mfst.Layers {
ongoing.add(desc)
layers = append(layers, &layerDescriptor{
desc: desc,
diffID: layer.DiffID(img.RootFS.DiffIDs[i]),
@ -274,11 +327,19 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
})
}
defer func() {
<-progressDone
for _, desc := range mfst.Layers {
p.is.ContentStore.Delete(context.TODO(), desc.Digest)
}
}()
r := image.NewRootFS()
rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan))
if err != nil {
return nil, err
}
stopProgress()
ref, err := p.is.CacheAccessor.Get(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref)))
release()
@ -317,8 +378,9 @@ func (ld *layerDescriptor) Download(ctx netcontext.Context, progressOutput pkgpr
}
defer rc.Close()
// TODO: progress
if err := content.WriteBlob(ctx, ld.is.ContentStore, ld.desc.Digest.String(), rc, ld.desc.Size, ld.desc.Digest); err != nil {
refKey := remotes.MakeRefKey(ctx, ld.desc)
if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc.Size, ld.desc.Digest); err != nil {
return nil, 0, err
}
@ -327,9 +389,7 @@ func (ld *layerDescriptor) Download(ctx netcontext.Context, progressOutput pkgpr
return nil, 0, err
}
return ioutils.NewReadCloserWrapper(content.NewReader(ra), func() error {
return ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest)
}), ld.desc.Size, nil
return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
}
func (ld *layerDescriptor) Close() {
@ -341,7 +401,7 @@ func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator})
}
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
statuses = map[string]statusInfo{}
@ -349,9 +409,6 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
)
defer ticker.Stop()
pw, _, ctx := progress.FromContext(ctx)
defer pw.Close()
for {
select {
case <-ticker.C:
@ -371,7 +428,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
actives := make(map[string]statusInfo)
if !done {
active, err := cs.ListStatuses(ctx, "")
active, err := cs.ListStatuses(ctx)
if err != nil {
// log.G(ctx).WithError(err).Error("active check failed")
continue
@ -407,9 +464,9 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store) {
info, err := cs.Info(context.TODO(), j.Digest)
if err != nil {
if errdefs.IsNotFound(err) {
pw.Write(j.Digest.String(), progress.Status{
Action: "waiting",
})
// pw.Write(j.Digest.String(), progress.Status{
// Action: "waiting",
// })
continue
}
} else {

View file

@ -23,10 +23,17 @@ type Differ interface {
EnsureLayer(ctx context.Context, key string) ([]layer.DiffID, error)
}
// TODO: this needs to be handled differently (return from solve)
type Result struct {
Ref string
ID image.ID
}
type Opt struct {
ImageStore image.Store
ReferenceStore reference.Store
Differ Differ
Reporter chan Result
}
type imageExporter struct {
@ -50,6 +57,8 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
i.targetName = ref
case exporterImageConfig:
i.config = []byte(v)
case "ref":
i.ref = v
default:
logrus.Warnf("image exporter: unknown option %s", k)
}
@ -61,6 +70,7 @@ type imageExporterInstance struct {
*imageExporter
targetName distref.Named
config []byte
ref string
}
func (e *imageExporterInstance) Name() string {
@ -131,5 +141,9 @@ func (e *imageExporterInstance) Export(ctx context.Context, ref cache.ImmutableR
}
}
if e.opt.Reporter != nil {
e.opt.Reporter <- Result{ID: id, Ref: e.ref}
}
return nil
}

View file

@ -65,12 +65,12 @@ func patchImageConfig(dt []byte, dps []digest.Digest, history []ocispec.History)
}
m["history"] = dt
now := time.Now()
dt, err = json.Marshal(&now)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal creation time")
}
m["created"] = dt
// now := time.Now()
// dt, err = json.Marshal(&now)
// if err != nil {
// return nil, errors.Wrap(err, "failed to marshal creation time")
// }
// m["created"] = dt
dt, err = json.Marshal(m)
return dt, errors.Wrap(err, "failed to marshal config after patch")
@ -104,9 +104,9 @@ func normalizeLayersAndHistory(diffs []digest.Digest, history []ocispec.History,
if len(diffs) > historyLayers {
// some history items are missing. add them based on the ref metadata
for _, msg := range getRefDesciptions(ref, len(diffs)-historyLayers) {
tm := time.Now().UTC()
// tm := time.Now().UTC()
history = append(history, ocispec.History{
Created: &tm,
// Created: &tm,
CreatedBy: msg,
Comment: "buildkit.exporter.image.v0",
})

View file

@ -15,7 +15,6 @@ import (
"github.com/moby/buildkit/snapshot"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var keyParent = []byte("parent")
@ -121,8 +120,24 @@ func (s *snapshotter) getLayer(key string) (layer.Layer, error) {
if !ok {
id, ok := s.chainID(key)
if !ok {
s.mu.Unlock()
return nil, nil
if err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(key))
if b == nil {
return nil
}
v := b.Get(keyChainID)
if v != nil {
id = layer.ChainID(v)
}
return nil
}); err != nil {
s.mu.Unlock()
return nil, err
}
if id == "" {
s.mu.Unlock()
return nil, nil
}
}
var err error
l, err = s.opt.LayerStore.Get(id)
@ -132,7 +147,7 @@ func (s *snapshotter) getLayer(key string) (layer.Layer, error) {
}
s.refs[string(id)] = l
if err := s.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(id))
_, err := tx.CreateBucketIfNotExists([]byte(key))
return err
}); err != nil {
s.mu.Unlock()
@ -282,7 +297,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
return nil
}
return s.opt.GraphDriver.Remove(key)
id, _ := s.getGraphDriverID(key)
return s.opt.GraphDriver.Remove(id)
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
@ -298,7 +314,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
}); err != nil {
return err
}
logrus.Debugf("committed %s as %s", name, key)
// logrus.Debugf("committed %s as %s", name, key));
return nil
}
@ -307,61 +323,62 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
}
func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error {
allKeys := map[string]struct{}{}
commitedIDs := map[string]string{}
chainIDs := map[string]layer.ChainID{}
// allKeys := map[string]struct{}{}
// commitedIDs := map[string]string{}
// chainIDs := map[string]layer.ChainID{}
//
// if err := s.db.View(func(tx *bolt.Tx) error {
// tx.ForEach(func(name []byte, b *bolt.Bucket) error {
// allKeys[string(name)] = struct{}{}
// v := b.Get(keyCommitted)
// if v != nil {
// commitedIDs[string(v)] = string(name)
// }
//
// v = b.Get(keyChainID)
// if v != nil {
// logrus.Debugf("loaded layer %s %s", name, v)
// chainIDs[string(name)] = layer.ChainID(v)
// }
// return nil
// })
// return nil
// }); err != nil {
// return err
// }
//
// for k := range allKeys {
// if chainID, ok := chainIDs[k]; ok {
// s.mu.Lock()
// if _, ok := s.refs[k]; !ok {
// l, err := s.opt.LayerStore.Get(chainID)
// if err != nil {
// s.mu.Unlock()
// return err
// }
// s.refs[k] = l
// }
// s.mu.Unlock()
// }
// if _, ok := commitedIDs[k]; ok {
// continue
// }
//
// if _, err := s.getLayer(k); err != nil {
// s.Remove(ctx, k)
// continue
// }
// info, err := s.Stat(ctx, k)
// if err != nil {
// s.Remove(ctx, k)
// continue
// }
// if err := fn(ctx, info); err != nil {
// return err
// }
// }
if err := s.db.View(func(tx *bolt.Tx) error {
tx.ForEach(func(name []byte, b *bolt.Bucket) error {
allKeys[string(name)] = struct{}{}
v := b.Get(keyCommitted)
if v != nil {
commitedIDs[string(v)] = string(name)
}
v = b.Get(keyChainID)
if v != nil {
chainIDs[string(name)] = layer.ChainID(v)
}
return nil
})
return nil
}); err != nil {
return err
}
for k := range allKeys {
if _, ok := commitedIDs[k]; ok {
continue
}
if chainID, ok := chainIDs[k]; ok {
s.mu.Lock()
if _, ok := s.refs[k]; !ok {
l, err := s.opt.LayerStore.Get(chainID)
if err != nil {
s.mu.Unlock()
return err
}
s.refs[k] = l
}
s.mu.Unlock()
}
if _, err := s.getLayer(k); err != nil {
s.Remove(ctx, k)
continue
}
info, err := s.Stat(ctx, k)
if err != nil {
s.Remove(ctx, k)
continue
}
if err := fn(ctx, info); err != nil {
return err
}
}
return nil
return errors.Errorf("not-implemented")
}
func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {

View file

@ -0,0 +1,301 @@
package worker
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/rootfs"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver-next"
"github.com/moby/buildkit/solver-next/llbsolver/ops"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/source/git"
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
// TODO: this file should be removed. containerd defines ContainerdWorker, oci defines OCIWorker. There is no base worker.
// WorkerOpt is specific to a worker.
// See also CommonOpt.
type WorkerOpt struct {
ID string
Labels map[string]string
SessionManager *session.Manager
MetadataStore *metadata.Store
Executor executor.Executor
Snapshotter snapshot.Snapshotter
ContentStore content.Store
CacheManager cache.Manager
ImageSource source.Source
Exporters map[string]exporter.Exporter
// ImageStore images.Store // optional
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
// TODO: s/Worker/OpWorker/g ?
type Worker struct {
WorkerOpt
SourceManager *source.Manager
// Exporters map[string]exporter.Exporter
// ImageSource source.Source
}
// NewWorker instantiates a local worker
func NewWorker(opt WorkerOpt) (*Worker, error) {
sm, err := source.NewManager()
if err != nil {
return nil, err
}
cm := opt.CacheManager
sm.Register(opt.ImageSource)
gs, err := git.NewSource(git.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(gs)
hs, err := http.NewSource(http.Opt{
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(hs)
ss, err := local.NewSource(local.Opt{
SessionManager: opt.SessionManager,
CacheAccessor: cm,
MetadataStore: opt.MetadataStore,
})
if err != nil {
return nil, err
}
sm.Register(ss)
return &Worker{
WorkerOpt: opt,
SourceManager: sm,
}, nil
}
func (w *Worker) ID() string {
return w.WorkerOpt.ID
}
func (w *Worker) Labels() map[string]string {
return w.WorkerOpt.Labels
}
func (w *Worker) LoadRef(id string) (cache.ImmutableRef, error) {
return w.CacheManager.Get(context.TODO(), id)
}
func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge) (solver.Op, error) {
switch op := v.Sys().(type) {
case *pb.Op_Source:
return ops.NewSourceOp(v, op, w.SourceManager, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, w.CacheManager, w.Executor, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
default:
return nil, errors.Errorf("could not resolve %v", v)
}
}
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
// ImageSource is typically source/containerimage
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
if !ok {
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
}
return resolveImageConfig.ResolveImageConfig(ctx, ref)
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error)
}
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
active, err := w.CacheManager.New(ctx, rootFS)
if err != nil {
return err
}
defer active.Release(context.TODO())
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
}
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
return w.CacheManager.DiskUsage(ctx, opt)
}
func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo) error {
return w.CacheManager.Prune(ctx, ch)
}
func (w *Worker) Exporter(name string) (exporter.Exporter, error) {
exp, ok := w.Exporters[name]
if !ok {
return nil, errors.Errorf("exporter %q could not be found", name)
}
return exp, nil
}
func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef) (*solver.Remote, error) {
// diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore, w.Snapshotter, w.Differ, ref)
// if err != nil {
// return nil, errors.Wrap(err, "failed calculaing diff pairs for exported snapshot")
// }
// if len(diffPairs) == 0 {
// return nil, nil
// }
//
// descs := make([]ocispec.Descriptor, len(diffPairs))
//
// for i, dp := range diffPairs {
// info, err := w.ContentStore.Info(ctx, dp.Blobsum)
// if err != nil {
// return nil, err
// }
// descs[i] = ocispec.Descriptor{
// Digest: dp.Blobsum,
// Size: info.Size,
// MediaType: schema2.MediaTypeLayer,
// Annotations: map[string]string{
// "containerd.io/uncompressed": dp.DiffID.String(),
// },
// }
// }
//
// return &solver.Remote{
// Descriptors: descs,
// Provider: w.ContentStore,
// }, nil
return nil, errors.Errorf("getremote not implemented")
}
func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) {
// eg, gctx := errgroup.WithContext(ctx)
// for _, desc := range remote.Descriptors {
// func(desc ocispec.Descriptor) {
// eg.Go(func() error {
// done := oneOffProgress(ctx, fmt.Sprintf("pulling %s", desc.Digest))
// return done(contentutil.Copy(gctx, w.ContentStore, remote.Provider, desc))
// })
// }(desc)
// }
//
// if err := eg.Wait(); err != nil {
// return nil, err
// }
//
// csh, release := snapshot.NewCompatibilitySnapshotter(w.Snapshotter)
// defer release()
//
// unpackProgressDone := oneOffProgress(ctx, "unpacking")
// chainID, err := w.unpack(ctx, remote.Descriptors, csh)
// if err != nil {
// return nil, unpackProgressDone(err)
// }
// unpackProgressDone(nil)
//
// return w.CacheManager.Get(ctx, chainID, cache.WithDescription(fmt.Sprintf("imported %s", remote.Descriptors[len(remote.Descriptors)-1].Digest)))
return nil, errors.Errorf("fromremote not implemented")
}
// utility function. could be moved to the constructor logic?
// func Labels(executor, snapshotter string) map[string]string {
// hostname, err := os.Hostname()
// if err != nil {
// hostname = "unknown"
// }
// labels := map[string]string{
// worker.LabelOS: runtime.GOOS,
// worker.LabelArch: runtime.GOOSARCH,
// worker.LabelExecutor: executor,
// worker.LabelSnapshotter: snapshotter,
// worker.LabelHostname: hostname,
// }
// return labels
// }
//
// // ID reads the worker id from the `workerid` file.
// // If not exist, it creates a random one,
// func ID(root string) (string, error) {
// f := filepath.Join(root, "workerid")
// b, err := ioutil.ReadFile(f)
// if err != nil {
// if os.IsNotExist(err) {
// id := identity.NewID()
// err := ioutil.WriteFile(f, []byte(id), 0400)
// return id, err
// } else {
// return "", err
// }
// }
// return string(b), nil
// }
func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) {
layers := make([]rootfs.Layer, len(descs))
for i, desc := range descs {
diffIDStr := desc.Annotations["containerd.io/uncompressed"]
if diffIDStr == "" {
return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest)
}
diffID, err := digest.Parse(diffIDStr)
if err != nil {
return nil, err
}
layers[i].Diff = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Digest: diffID,
}
layers[i].Blob = ocispec.Descriptor{
MediaType: desc.MediaType,
Digest: desc.Digest,
Size: desc.Size,
}
}
return layers, nil
}
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
// TODO: set error on status
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
}

View file

@ -27,6 +27,7 @@ import (
swarmrouter "github.com/docker/docker/api/server/router/swarm"
systemrouter "github.com/docker/docker/api/server/router/system"
"github.com/docker/docker/api/server/router/volume"
buildkit "github.com/docker/docker/builder/builder-next"
"github.com/docker/docker/builder/dockerfile"
"github.com/docker/docker/builder/fscache"
"github.com/docker/docker/cli/debug"
@ -270,7 +271,16 @@ func newRouterOptions(config *config.Config, daemon *daemon.Daemon) (routerOptio
return opts, err
}
bb, err := buildbackend.NewBackend(daemon.ImageService(), manager, buildCache)
buildkit, err := buildkit.New(buildkit.Opt{
SessionManager: sm,
Root: filepath.Join(config.Root, "buildkit"),
Dist: daemon.DistributionServices(),
})
if err != nil {
return opts, err
}
bb, err := buildbackend.NewBackend(daemon.ImageService(), manager, buildCache, buildkit)
if err != nil {
return opts, errors.Wrap(err, "failed to create buildmanager")
}