tarexport: Plumb ctx, add OTEL spans, handle cancellation

Pass `context.Context` through `tarexport.Load` and `tarexport.Save`.
Create OTEL spans for the most time consuming operations.

Also, handle context cancellations to actually end saving/loading when
the operation is cancelled - before this PR the daemon would still be
performing the operation even though the user already cancelled it.

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Paweł Gronowski 2024-03-25 14:31:57 +01:00
parent bfdb8918f9
commit 082acbcbac
No known key found for this signature in database
GPG key ID: B85EFCFE26DEF92A
5 changed files with 121 additions and 21 deletions

View file

@ -16,7 +16,7 @@ import (
// outStream is the writer which the images are written to.
func (i *ImageService) ExportImage(ctx context.Context, names []string, outStream io.Writer) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Save(names, outStream)
return imageExporter.Save(ctx, names, outStream)
}
func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Container, fn func(root string) error) error {
@ -46,5 +46,5 @@ func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Conta
// ball containing images and metadata.
func (i *ImageService) LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i)
return imageExporter.Load(inTar, outStream, quiet)
return imageExporter.Load(ctx, inTar, outStream, quiet)
}

View file

@ -1,6 +1,7 @@
package image // import "github.com/docker/docker/image"
import (
"context"
"encoding/json"
"errors"
"io"
@ -279,9 +280,9 @@ func NewHistory(author, comment, createdBy string, isEmptyLayer bool) History {
// Exporter provides interface for loading and saving images
type Exporter interface {
Load(io.ReadCloser, io.Writer, bool) error
Load(context.Context, io.ReadCloser, io.Writer, bool) error
// TODO: Load(net.Context, io.ReadCloser, <- chan StatusMessage) error
Save([]string, io.Writer) error
Save(context.Context, []string, io.Writer) error
}
// NewFromJSON creates an Image configuration from json.

View file

@ -11,6 +11,7 @@ import (
"reflect"
"runtime"
"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
@ -20,6 +21,7 @@ import (
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
@ -28,7 +30,13 @@ import (
"github.com/opencontainers/go-digest"
)
func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
func (l *tarexporter) Load(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) (outErr error) {
ctx, span := tracing.StartSpan(ctx, "tarexport.Load")
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
var progressOutput progress.Output
if !quiet {
progressOutput = streamformatter.NewJSONProgressOutput(outStream, false)
@ -41,9 +49,10 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}
defer os.RemoveAll(tmpDir)
if err := chrootarchive.Untar(inTar, tmpDir, nil); err != nil {
if err := untar(ctx, inTar, tmpDir); err != nil {
return err
}
// read manifest, if no file then load in legacy mode
manifestPath, err := safePath(tmpDir, manifestFileName)
if err != nil {
@ -72,6 +81,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
var imageRefCount int
for _, m := range manifest {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
configPath, err := safePath(tmpDir, m.Config)
if err != nil {
return err
@ -95,6 +109,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
}
for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
layerPath, err := safePath(tmpDir, m.Layers[i])
if err != nil {
return err
@ -103,7 +122,7 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
r.Append(diffID)
newLayer, err := l.lss.Get(r.ChainID())
if err != nil {
newLayer, err = l.loadLayer(layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
newLayer, err = l.loadLayer(ctx, layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput)
if err != nil {
return err
}
@ -155,6 +174,15 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool)
return nil
}
func untar(ctx context.Context, inTar io.ReadCloser, tmpDir string) error {
_, trace := tracing.StartSpan(ctx, "chrootarchive.Untar")
defer trace.End()
err := chrootarchive.Untar(ioutils.NewCtxReader(ctx, inTar), tmpDir, nil)
trace.SetStatus(err)
return err
}
func (l *tarexporter) setParentID(id, parentID image.ID) error {
img, err := l.is.Get(id)
if err != nil {
@ -170,7 +198,14 @@ func (l *tarexporter) setParentID(id, parentID image.ID) error {
return l.is.SetParent(id, parentID)
}
func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (layer.Layer, error) {
func (l *tarexporter) loadLayer(ctx context.Context, filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (_ layer.Layer, outErr error) {
ctx, span := tracing.StartSpan(ctx, "loadLayer")
span.SetAttributes(tracing.Attribute("image.id", id))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
// We use sequential file access to avoid depleting the standby list on Windows.
// On Linux, this equates to a regular os.Open.
rawTar, err := sequential.Open(filename)
@ -193,7 +228,7 @@ func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string,
r = rawTar
}
inflatedLayerData, err := archive.DecompressStream(r)
inflatedLayerData, err := archive.DecompressStream(ioutils.NewCtxReader(ctx, r))
if err != nil {
return nil, err
}
@ -332,7 +367,7 @@ func (l *tarexporter) legacyLoadImage(oldID, sourceDir string, loadedMap map[str
if err != nil {
return err
}
newLayer, err := l.loadLayer(layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
newLayer, err := l.loadLayer(context.TODO(), layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput)
if err != nil {
return err
}

View file

@ -11,6 +11,7 @@ import (
"time"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/tracing"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/distribution"
@ -19,6 +20,7 @@ import (
v1 "github.com/docker/docker/image/v1"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/system"
"github.com/moby/sys/sequential"
"github.com/opencontainers/go-digest"
@ -42,20 +44,20 @@ type saveSession struct {
savedConfigs map[string]struct{}
}
func (l *tarexporter) Save(names []string, outStream io.Writer) error {
images, err := l.parseNames(names)
func (l *tarexporter) Save(ctx context.Context, names []string, outStream io.Writer) error {
images, err := l.parseNames(ctx, names)
if err != nil {
return err
}
// Release all the image top layer references
defer l.releaseLayerReferences(images)
return (&saveSession{tarexporter: l, images: images}).save(outStream)
return (&saveSession{tarexporter: l, images: images}).save(ctx, outStream)
}
// parseNames will parse the image names to a map which contains image.ID to *imageDescriptor.
// Each imageDescriptor holds an image top layer reference named 'layerRef'. It is taken here, should be released later.
func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
func (l *tarexporter) parseNames(ctx context.Context, names []string) (desc map[image.ID]*imageDescriptor, rErr error) {
imgDescr := make(map[image.ID]*imageDescriptor)
defer func() {
if rErr != nil {
@ -92,6 +94,12 @@ func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescri
}
for _, name := range names {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ref, err := reference.ParseAnyReference(name)
if err != nil {
return nil, err
@ -179,7 +187,7 @@ func (l *tarexporter) releaseLayerReferences(imgDescr map[image.ID]*imageDescrip
return nil
}
func (s *saveSession) save(outStream io.Writer) error {
func (s *saveSession) save(ctx context.Context, outStream io.Writer) error {
s.savedConfigs = make(map[string]struct{})
s.savedLayers = make(map[layer.DiffID]distribution.Descriptor)
@ -199,7 +207,13 @@ func (s *saveSession) save(outStream io.Writer) error {
var manifestDescriptors []ocispec.Descriptor
for id, imageDescr := range s.images {
foreignSrcs, err := s.saveImage(id)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
foreignSrcs, err := s.saveImage(ctx, id)
if err != nil {
return err
}
@ -373,17 +387,33 @@ func (s *saveSession) save(outStream io.Writer) error {
return errors.Wrap(err, "error writing oci index file")
}
return s.writeTar(ctx, tempDir, outStream)
}
func (s *saveSession) writeTar(ctx context.Context, tempDir string, outStream io.Writer) error {
ctx, span := tracing.StartSpan(ctx, "writeTar")
defer span.End()
fs, err := archive.Tar(tempDir, archive.Uncompressed)
if err != nil {
span.SetStatus(err)
return err
}
defer fs.Close()
_, err = io.Copy(outStream, fs)
_, err = io.Copy(outStream, ioutils.NewCtxReader(ctx, fs))
span.SetStatus(err)
return err
}
func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Descriptor, error) {
func (s *saveSession) saveImage(ctx context.Context, id image.ID) (_ map[layer.DiffID]distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveImage")
span.SetAttributes(tracing.Attribute("image.id", id.String()))
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
img := s.images[id].image
if len(img.RootFS.DiffIDs) == 0 {
return nil, fmt.Errorf("empty export - not implemented")
@ -393,6 +423,11 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
var layers []layer.DiffID
var foreignSrcs map[layer.DiffID]distribution.Descriptor
for i, diffID := range img.RootFS.DiffIDs {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
v1ImgCreated := time.Unix(0, 0)
v1Img := image.V1Image{
// This is for backward compatibility used for
@ -415,7 +450,7 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
}
v1Img.OS = img.OS
src, err := s.saveConfigAndLayer(rootFS.ChainID(), v1Img, img.Created)
src, err := s.saveConfigAndLayer(ctx, rootFS.ChainID(), v1Img, img.Created)
if err != nil {
return nil, err
}
@ -460,7 +495,17 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc
return foreignSrcs, nil
}
func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (distribution.Descriptor, error) {
func (s *saveSession) saveConfigAndLayer(ctx context.Context, id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (_ distribution.Descriptor, outErr error) {
ctx, span := tracing.StartSpan(ctx, "saveConfigAndLayer")
span.SetAttributes(
tracing.Attribute("layer.id", id.String()),
tracing.Attribute("image.id", legacyImg.ID),
)
defer span.End()
defer func() {
span.SetStatus(outErr)
}()
outDir := filepath.Join(s.outDir, ocispec.ImageBlobsDir)
if _, ok := s.savedConfigs[legacyImg.ID]; !ok {
@ -515,7 +560,7 @@ func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Ima
digester := digest.Canonical.Digester()
digestedArch := io.TeeReader(arch, digester.Hash())
tarSize, err := io.Copy(tarFile, digestedArch)
tarSize, err := io.Copy(tarFile, ioutils.NewCtxReader(ctx, digestedArch))
if err != nil {
return distribution.Descriptor{}, err
}

View file

@ -170,3 +170,22 @@ func subsequentCloseWarn(name string) {
log.G(context.TODO()).Errorf("stack trace: %s", string(debug.Stack()))
}
}
type readerCtx struct {
ctx context.Context
r io.Reader
}
func NewCtxReader(ctx context.Context, r io.Reader) io.Reader {
return &readerCtx{
ctx: ctx,
r: r,
}
}
func (r *readerCtx) Read(p []byte) (n int, err error) {
if err := r.ctx.Err(); err != nil {
return 0, err
}
return r.r.Read(p)
}