diff --git a/daemon/images/image_exporter.go b/daemon/images/image_exporter.go index 0a863dd604..0c41d80e61 100644 --- a/daemon/images/image_exporter.go +++ b/daemon/images/image_exporter.go @@ -16,7 +16,7 @@ import ( // outStream is the writer which the images are written to. func (i *ImageService) ExportImage(ctx context.Context, names []string, outStream io.Writer) error { imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i) - return imageExporter.Save(names, outStream) + return imageExporter.Save(ctx, names, outStream) } func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Container, fn func(root string) error) error { @@ -46,5 +46,5 @@ func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Conta // ball containing images and metadata. func (i *ImageService) LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error { imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i) - return imageExporter.Load(inTar, outStream, quiet) + return imageExporter.Load(ctx, inTar, outStream, quiet) } diff --git a/image/image.go b/image/image.go index 9bfa8602f2..77b6731f5b 100644 --- a/image/image.go +++ b/image/image.go @@ -1,6 +1,7 @@ package image // import "github.com/docker/docker/image" import ( + "context" "encoding/json" "errors" "io" @@ -279,9 +280,9 @@ func NewHistory(author, comment, createdBy string, isEmptyLayer bool) History { // Exporter provides interface for loading and saving images type Exporter interface { - Load(io.ReadCloser, io.Writer, bool) error + Load(context.Context, io.ReadCloser, io.Writer, bool) error // TODO: Load(net.Context, io.ReadCloser, <- chan StatusMessage) error - Save([]string, io.Writer) error + Save(context.Context, []string, io.Writer) error } // NewFromJSON creates an Image configuration from json. diff --git a/image/tarexport/load.go b/image/tarexport/load.go index fe09f7191e..6f47998178 100644 --- a/image/tarexport/load.go +++ b/image/tarexport/load.go @@ -11,6 +11,7 @@ import ( "reflect" "runtime" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/distribution/reference" "github.com/docker/distribution" @@ -20,6 +21,7 @@ import ( "github.com/docker/docker/layer" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/chrootarchive" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" @@ -28,7 +30,13 @@ import ( "github.com/opencontainers/go-digest" ) -func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error { +func (l *tarexporter) Load(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) (outErr error) { + ctx, span := tracing.StartSpan(ctx, "tarexport.Load") + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + var progressOutput progress.Output if !quiet { progressOutput = streamformatter.NewJSONProgressOutput(outStream, false) @@ -41,9 +49,10 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) } defer os.RemoveAll(tmpDir) - if err := chrootarchive.Untar(inTar, tmpDir, nil); err != nil { + if err := untar(ctx, inTar, tmpDir); err != nil { return err } + // read manifest, if no file then load in legacy mode manifestPath, err := safePath(tmpDir, manifestFileName) if err != nil { @@ -72,6 +81,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) var imageRefCount int for _, m := range manifest { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } configPath, err := safePath(tmpDir, m.Config) if err != nil { return err @@ -95,6 +109,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) } for i, diffID := range img.RootFS.DiffIDs { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } layerPath, err := safePath(tmpDir, m.Layers[i]) if err != nil { return err @@ -103,7 +122,7 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) r.Append(diffID) newLayer, err := l.lss.Get(r.ChainID()) if err != nil { - newLayer, err = l.loadLayer(layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput) + newLayer, err = l.loadLayer(ctx, layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput) if err != nil { return err } @@ -155,6 +174,15 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) return nil } +func untar(ctx context.Context, inTar io.ReadCloser, tmpDir string) error { + _, trace := tracing.StartSpan(ctx, "chrootarchive.Untar") + defer trace.End() + + err := chrootarchive.Untar(ioutils.NewCtxReader(ctx, inTar), tmpDir, nil) + trace.SetStatus(err) + return err +} + func (l *tarexporter) setParentID(id, parentID image.ID) error { img, err := l.is.Get(id) if err != nil { @@ -170,7 +198,14 @@ func (l *tarexporter) setParentID(id, parentID image.ID) error { return l.is.SetParent(id, parentID) } -func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (layer.Layer, error) { +func (l *tarexporter) loadLayer(ctx context.Context, filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (_ layer.Layer, outErr error) { + ctx, span := tracing.StartSpan(ctx, "loadLayer") + span.SetAttributes(tracing.Attribute("image.id", id)) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + // We use sequential file access to avoid depleting the standby list on Windows. // On Linux, this equates to a regular os.Open. rawTar, err := sequential.Open(filename) @@ -193,7 +228,7 @@ func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, r = rawTar } - inflatedLayerData, err := archive.DecompressStream(r) + inflatedLayerData, err := archive.DecompressStream(ioutils.NewCtxReader(ctx, r)) if err != nil { return nil, err } @@ -332,7 +367,7 @@ func (l *tarexporter) legacyLoadImage(oldID, sourceDir string, loadedMap map[str if err != nil { return err } - newLayer, err := l.loadLayer(layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput) + newLayer, err := l.loadLayer(context.TODO(), layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput) if err != nil { return err } diff --git a/image/tarexport/save.go b/image/tarexport/save.go index 103c8b3766..a22c40e597 100644 --- a/image/tarexport/save.go +++ b/image/tarexport/save.go @@ -11,6 +11,7 @@ import ( "time" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/distribution/reference" "github.com/docker/distribution" @@ -19,6 +20,7 @@ import ( v1 "github.com/docker/docker/image/v1" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/system" "github.com/moby/sys/sequential" "github.com/opencontainers/go-digest" @@ -42,20 +44,20 @@ type saveSession struct { savedConfigs map[string]struct{} } -func (l *tarexporter) Save(names []string, outStream io.Writer) error { - images, err := l.parseNames(names) +func (l *tarexporter) Save(ctx context.Context, names []string, outStream io.Writer) error { + images, err := l.parseNames(ctx, names) if err != nil { return err } // Release all the image top layer references defer l.releaseLayerReferences(images) - return (&saveSession{tarexporter: l, images: images}).save(outStream) + return (&saveSession{tarexporter: l, images: images}).save(ctx, outStream) } // parseNames will parse the image names to a map which contains image.ID to *imageDescriptor. // Each imageDescriptor holds an image top layer reference named 'layerRef'. It is taken here, should be released later. -func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescriptor, rErr error) { +func (l *tarexporter) parseNames(ctx context.Context, names []string) (desc map[image.ID]*imageDescriptor, rErr error) { imgDescr := make(map[image.ID]*imageDescriptor) defer func() { if rErr != nil { @@ -92,6 +94,12 @@ func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescri } for _, name := range names { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + ref, err := reference.ParseAnyReference(name) if err != nil { return nil, err @@ -179,7 +187,7 @@ func (l *tarexporter) releaseLayerReferences(imgDescr map[image.ID]*imageDescrip return nil } -func (s *saveSession) save(outStream io.Writer) error { +func (s *saveSession) save(ctx context.Context, outStream io.Writer) error { s.savedConfigs = make(map[string]struct{}) s.savedLayers = make(map[layer.DiffID]distribution.Descriptor) @@ -199,7 +207,13 @@ func (s *saveSession) save(outStream io.Writer) error { var manifestDescriptors []ocispec.Descriptor for id, imageDescr := range s.images { - foreignSrcs, err := s.saveImage(id) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + foreignSrcs, err := s.saveImage(ctx, id) if err != nil { return err } @@ -370,17 +384,33 @@ func (s *saveSession) save(outStream io.Writer) error { return errors.Wrap(err, "error writing oci index file") } + return s.writeTar(ctx, tempDir, outStream) +} + +func (s *saveSession) writeTar(ctx context.Context, tempDir string, outStream io.Writer) error { + ctx, span := tracing.StartSpan(ctx, "writeTar") + defer span.End() + fs, err := archive.Tar(tempDir, archive.Uncompressed) if err != nil { + span.SetStatus(err) return err } defer fs.Close() - _, err = io.Copy(outStream, fs) + _, err = io.Copy(outStream, ioutils.NewCtxReader(ctx, fs)) + span.SetStatus(err) return err } -func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Descriptor, error) { +func (s *saveSession) saveImage(ctx context.Context, id image.ID) (_ map[layer.DiffID]distribution.Descriptor, outErr error) { + ctx, span := tracing.StartSpan(ctx, "saveImage") + span.SetAttributes(tracing.Attribute("image.id", id.String())) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + img := s.images[id].image if len(img.RootFS.DiffIDs) == 0 { return nil, fmt.Errorf("empty export - not implemented") @@ -390,6 +420,11 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc var layers []layer.DiffID var foreignSrcs map[layer.DiffID]distribution.Descriptor for i, diffID := range img.RootFS.DiffIDs { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } v1ImgCreated := time.Unix(0, 0) v1Img := image.V1Image{ // This is for backward compatibility used for @@ -412,7 +447,7 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc } v1Img.OS = img.OS - src, err := s.saveConfigAndLayer(rootFS.ChainID(), v1Img, img.Created) + src, err := s.saveConfigAndLayer(ctx, rootFS.ChainID(), v1Img, img.Created) if err != nil { return nil, err } @@ -457,7 +492,17 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc return foreignSrcs, nil } -func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (distribution.Descriptor, error) { +func (s *saveSession) saveConfigAndLayer(ctx context.Context, id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (_ distribution.Descriptor, outErr error) { + ctx, span := tracing.StartSpan(ctx, "saveConfigAndLayer") + span.SetAttributes( + tracing.Attribute("layer.id", id.String()), + tracing.Attribute("image.id", legacyImg.ID), + ) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + outDir := filepath.Join(s.outDir, ocispec.ImageBlobsDir) if _, ok := s.savedConfigs[legacyImg.ID]; !ok { @@ -512,7 +557,7 @@ func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Ima digester := digest.Canonical.Digester() digestedArch := io.TeeReader(arch, digester.Hash()) - tarSize, err := io.Copy(tarFile, digestedArch) + tarSize, err := io.Copy(tarFile, ioutils.NewCtxReader(ctx, digestedArch)) if err != nil { return distribution.Descriptor{}, err } diff --git a/pkg/ioutils/readers.go b/pkg/ioutils/readers.go index e03d3fee75..e977cbc7ae 100644 --- a/pkg/ioutils/readers.go +++ b/pkg/ioutils/readers.go @@ -170,3 +170,22 @@ func subsequentCloseWarn(name string) { log.G(context.TODO()).Errorf("stack trace: %s", string(debug.Stack())) } } + +type readerCtx struct { + ctx context.Context + r io.Reader +} + +func NewCtxReader(ctx context.Context, r io.Reader) io.Reader { + return &readerCtx{ + ctx: ctx, + r: r, + } +} + +func (r *readerCtx) Read(p []byte) (n int, err error) { + if err := r.ctx.Err(); err != nil { + return 0, err + } + return r.r.Read(p) +}