diff --git a/daemon/containerd/image_import.go b/daemon/containerd/image_import.go index 38f3df7d9c..288c7202bf 100644 --- a/daemon/containerd/image_import.go +++ b/daemon/containerd/image_import.go @@ -1,18 +1,400 @@ package containerd import ( + "bufio" + "bytes" "context" - "errors" + "encoding/json" "io" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/content" + cerrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/builder/dockerfile" "github.com/docker/docker/errdefs" - specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/docker/docker/image" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/pools" + "github.com/google/uuid" + "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/specs-go" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) -// ImportImage imports an image, getting the archived layer data either from -// inConfig (if src is "-"), or from a URI specified in src. Progress output is -// written to outStream. Repository and tag names can optionally be given in -// the repo and tag arguments, respectively. -func (i *ImageService) ImportImage(ctx context.Context, src string, repository string, platform *specs.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error { - return errdefs.NotImplemented(errors.New("not implemented")) +// ImportImage imports an image, getting the archived layer data from layerReader. +// Layer archive is imported as-is if the compression is gzip or zstd. +// Uncompressed, xz and bzip2 archives are recompressed into gzip. +// The image is tagged with the given reference. +// If the platform is nil, the default host platform is used. +// The message is used as the history comment. +// Image configuration is derived from the dockerfile instructions in changes. +func (i *ImageService) ImportImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) { + refString := "" + if ref != nil { + refString = ref.String() + } + logger := logrus.WithField("ref", refString) + + ctx, release, err := i.client.WithLease(ctx) + if err != nil { + return "", errdefs.System(err) + } + defer release(ctx) + + if platform == nil { + def := platforms.DefaultSpec() + platform = &def + } + + imageConfig, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS) + if err != nil { + logger.WithError(err).Debug("failed to process changes") + return "", errdefs.InvalidParameter(err) + } + + cs := i.client.ContentStore() + + compressedDigest, uncompressedDigest, mt, err := saveArchive(ctx, cs, layerReader) + if err != nil { + logger.WithError(err).Debug("failed to write layer blob") + return "", err + } + logger = logger.WithFields(logrus.Fields{ + "compressedDigest": compressedDigest, + "uncompressedDigest": uncompressedDigest, + }) + + size, err := fillUncompressedLabel(ctx, cs, compressedDigest, uncompressedDigest) + if err != nil { + logger.WithError(err).Debug("failed to set uncompressed label on the compressed blob") + return "", err + } + + compressedRootfsDesc := ocispec.Descriptor{ + MediaType: mt, + Digest: compressedDigest, + Size: size, + } + + ociCfg := containerConfigToOciImageConfig(imageConfig) + createdAt := time.Now() + config := ocispec.Image{ + Architecture: platform.Architecture, + OS: platform.OS, + Created: &createdAt, + Author: "", + Config: ociCfg, + RootFS: ocispec.RootFS{ + Type: "layers", + DiffIDs: []digest.Digest{uncompressedDigest}, + }, + History: []ocispec.History{ + { + Created: &createdAt, + CreatedBy: "", + Author: "", + Comment: msg, + EmptyLayer: false, + }, + }, + } + configDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageConfig, config, nil) + if err != nil { + return "", err + } + + manifest := ocispec.Manifest{ + MediaType: ocispec.MediaTypeImageManifest, + Versioned: specs.Versioned{ + SchemaVersion: 2, + }, + Config: configDesc, + Layers: []ocispec.Descriptor{ + compressedRootfsDesc, + }, + } + manifestDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageManifest, manifest, map[string]string{ + "containerd.io/gc.ref.content.config": configDesc.Digest.String(), + "containerd.io/gc.ref.content.l.0": compressedDigest.String(), + }) + if err != nil { + return "", err + } + + id := image.ID(manifestDesc.Digest.String()) + img := images.Image{ + Name: refString, + Target: manifestDesc, + CreatedAt: createdAt, + } + if img.Name == "" { + // TODO(vvoland): danglingImageName(manifestDesc.Digest) + img.Name = "dangling@" + manifestDesc.Digest.String() + + } + + err = i.saveImage(ctx, img) + if err != nil { + logger.WithError(err).Debug("failed to save image") + return "", err + } + err = i.unpackImage(ctx, img, *platform) + if err != nil { + logger.WithError(err).Debug("failed to unpack image") + } + + return id, err +} + +// saveArchive saves the archive from bufRd to the content store, compressing it if necessary. +// Returns compressed blob digest, digest of the uncompressed data and media type of the stored blob. +func saveArchive(ctx context.Context, cs content.Store, layerReader io.Reader) (digest.Digest, digest.Digest, string, error) { + // Wrap the reader in buffered reader to allow peeks. + p := pools.BufioReader32KPool + bufRd := p.Get(layerReader) + defer p.Put(bufRd) + + compression, err := detectCompression(bufRd) + if err != nil { + return "", "", "", err + } + + var uncompressedReader io.Reader = bufRd + switch compression { + case archive.Gzip, archive.Zstd: + // If the input is already a compressed layer, just save it as is. + mediaType := ocispec.MediaTypeImageLayerGzip + if compression == archive.Zstd { + mediaType = ocispec.MediaTypeImageLayerZstd + } + + compressedDigest, uncompressedDigest, err := writeCompressedBlob(ctx, cs, mediaType, bufRd) + if err != nil { + return "", "", "", err + } + + return compressedDigest, uncompressedDigest, mediaType, nil + case archive.Bzip2, archive.Xz: + r, err := archive.DecompressStream(bufRd) + if err != nil { + return "", "", "", errdefs.InvalidParameter(err) + } + defer r.Close() + uncompressedReader = r + fallthrough + case archive.Uncompressed: + mediaType := ocispec.MediaTypeImageLayerGzip + compression := archive.Gzip + + compressedDigest, uncompressedDigest, err := compressAndWriteBlob(ctx, cs, compression, mediaType, uncompressedReader) + if err != nil { + return "", "", "", err + } + + return compressedDigest, uncompressedDigest, mediaType, nil + } + + return "", "", "", errdefs.InvalidParameter(errors.New("unsupported archive compression")) +} + +// writeCompressedBlob writes the blob and simultaneously computes the digest of the uncompressed data. +func writeCompressedBlob(ctx context.Context, cs content.Store, mediaType string, bufRd *bufio.Reader) (digest.Digest, digest.Digest, error) { + pr, pw := io.Pipe() + defer pw.Close() + defer pr.Close() + + c := make(chan digest.Digest) + // Start copying the blob to the content store from the pipe and tee it to the pipe. + go func() { + compressedDigest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, io.TeeReader(bufRd, pw)) + pw.CloseWithError(err) + c <- compressedDigest + }() + + digester := digest.Canonical.Digester() + + // Decompress the piped blob. + decompressedStream, err := archive.DecompressStream(pr) + if err == nil { + // Feed the digester with decompressed data. + _, err = io.Copy(digester.Hash(), decompressedStream) + decompressedStream.Close() + } + pr.CloseWithError(err) + + compressedDigest := <-c + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return "", "", errdefs.Cancelled(err) + } + return "", "", errdefs.System(err) + } + + uncompressedDigest := digester.Digest() + return compressedDigest, uncompressedDigest, nil +} + +// compressAndWriteBlob compresses the uncompressedReader and stores it in the content store. +func compressAndWriteBlob(ctx context.Context, cs content.Store, compression archive.Compression, mediaType string, uncompressedLayerReader io.Reader) (digest.Digest, digest.Digest, error) { + pr, pw := io.Pipe() + defer pr.Close() + defer pw.Close() + + compressor, err := archive.CompressStream(pw, compression) + if err != nil { + return "", "", errdefs.InvalidParameter(err) + } + defer compressor.Close() + + writeChan := make(chan digest.Digest) + // Start copying the blob to the content store from the pipe. + go func() { + digest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, pr) + pr.CloseWithError(err) + writeChan <- digest + }() + + // Copy archive to the pipe and tee it to a digester. + // This will feed the pipe the above goroutine is reading from. + uncompressedDigester := digest.Canonical.Digester() + readFromInputAndDigest := io.TeeReader(uncompressedLayerReader, uncompressedDigester.Hash()) + _, err = io.Copy(compressor, readFromInputAndDigest) + compressor.Close() + pw.CloseWithError(err) + + compressedDigest := <-writeChan + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return "", "", errdefs.Cancelled(err) + } + return "", "", errdefs.System(err) + } + + return compressedDigest, uncompressedDigester.Digest(), err +} + +// writeBlobAndReturnDigest writes a blob to the content store and returns the digest. +func writeBlobAndReturnDigest(ctx context.Context, cs content.Store, mt string, reader io.Reader) (digest.Digest, error) { + digester := digest.Canonical.Digester() + if err := content.WriteBlob(ctx, cs, uuid.New().String(), io.TeeReader(reader, digester.Hash()), ocispec.Descriptor{MediaType: mt}); err != nil { + return "", errdefs.System(err) + } + return digester.Digest(), nil +} + +// saveImage creates an image in the ImageService or updates it if it exists. +func (i *ImageService) saveImage(ctx context.Context, img images.Image) error { + is := i.client.ImageService() + + if _, err := is.Update(ctx, img); err != nil { + if cerrdefs.IsNotFound(err) { + if _, err := is.Create(ctx, img); err != nil { + return errdefs.Unknown(err) + } + } else { + return errdefs.Unknown(err) + } + } + + return nil +} + +// unpackImage unpacks the image into the snapshotter. +func (i *ImageService) unpackImage(ctx context.Context, img images.Image, platform ocispec.Platform) error { + c8dImg := containerd.NewImageWithPlatform(i.client, img, platforms.Only(platform)) + unpacked, err := c8dImg.IsUnpacked(ctx, i.snapshotter) + if err != nil { + return err + } + if !unpacked { + err = c8dImg.Unpack(ctx, i.snapshotter) + } + + return err +} + +// detectCompression dectects the reader compression type. +func detectCompression(bufRd *bufio.Reader) (archive.Compression, error) { + bs, err := bufRd.Peek(10) + if err != nil && err != io.EOF { + // Note: we'll ignore any io.EOF error because there are some odd + // cases where the layer.tar file will be empty (zero bytes) and + // that results in an io.EOF from the Peek() call. So, in those + // cases we'll just treat it as a non-compressed stream and + // that means just create an empty layer. + // See Issue 18170 + return archive.Uncompressed, errdefs.Unknown(err) + } + + return archive.DetectCompression(bs), nil +} + +// fillUncompressedLabel sets the uncompressed digest label on the compressed blob metadata +// and returns the compressed blob size. +func fillUncompressedLabel(ctx context.Context, cs content.Store, compressedDigest digest.Digest, uncompressedDigest digest.Digest) (int64, error) { + info, err := cs.Info(ctx, compressedDigest) + if err != nil { + return 0, errdefs.Unknown(errors.Wrapf(err, "couldn't open previously written blob")) + } + size := info.Size + info.Labels = map[string]string{"containerd.io/uncompressed": uncompressedDigest.String()} + + _, err = cs.Update(ctx, info, "labels.*") + if err != nil { + return 0, errdefs.System(errors.Wrapf(err, "couldn't set uncompressed label")) + } + return size, nil +} + +// storeJson marshals the provided object as json and stores it. +func storeJson(ctx context.Context, cs content.Ingester, mt string, obj interface{}, labels map[string]string) (ocispec.Descriptor, error) { + configData, err := json.Marshal(obj) + if err != nil { + return ocispec.Descriptor{}, errdefs.InvalidParameter(err) + } + configDigest := digest.FromBytes(configData) + if err != nil { + return ocispec.Descriptor{}, errdefs.InvalidParameter(err) + } + desc := ocispec.Descriptor{ + MediaType: mt, + Digest: configDigest, + Size: int64(len(configData)), + } + + var opts []content.Opt + if labels != nil { + opts = append(opts, content.WithLabels(labels)) + } + + err = content.WriteBlob(ctx, cs, configDigest.String(), bytes.NewReader(configData), desc, opts...) + if err != nil { + return ocispec.Descriptor{}, errdefs.System(err) + } + return desc, nil +} + +func containerConfigToOciImageConfig(cfg *container.Config) ocispec.ImageConfig { + ociCfg := ocispec.ImageConfig{ + User: cfg.User, + Env: cfg.Env, + Entrypoint: cfg.Entrypoint, + Cmd: cfg.Cmd, + Volumes: cfg.Volumes, + WorkingDir: cfg.WorkingDir, + Labels: cfg.Labels, + StopSignal: cfg.StopSignal, + } + for k, v := range cfg.ExposedPorts { + ociCfg.ExposedPorts[string(k)] = v + } + + return ociCfg }