123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- package containerd
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "io"
- "time"
- "github.com/containerd/containerd"
- "github.com/containerd/containerd/content"
- cerrdefs "github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/images"
- "github.com/containerd/containerd/log"
- "github.com/containerd/containerd/platforms"
- "github.com/docker/distribution/reference"
- "github.com/docker/docker/api/types/container"
- "github.com/docker/docker/builder/dockerfile"
- "github.com/docker/docker/errdefs"
- "github.com/docker/docker/image"
- "github.com/docker/docker/pkg/archive"
- "github.com/docker/docker/pkg/pools"
- "github.com/google/uuid"
- "github.com/opencontainers/go-digest"
- "github.com/opencontainers/image-spec/specs-go"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- )
- // ImportImage imports an image, getting the archived layer data from layerReader.
- // Layer archive is imported as-is if the compression is gzip or zstd.
- // Uncompressed, xz and bzip2 archives are recompressed into gzip.
- // The image is tagged with the given reference.
- // If the platform is nil, the default host platform is used.
- // The message is used as the history comment.
- // Image configuration is derived from the dockerfile instructions in changes.
- func (i *ImageService) ImportImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) {
- refString := ""
- if ref != nil {
- refString = ref.String()
- }
- logger := log.G(ctx).WithField("ref", refString)
- ctx, release, err := i.client.WithLease(ctx)
- if err != nil {
- return "", errdefs.System(err)
- }
- defer release(ctx)
- if platform == nil {
- def := platforms.DefaultSpec()
- platform = &def
- }
- imageConfig, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS)
- if err != nil {
- logger.WithError(err).Debug("failed to process changes")
- return "", errdefs.InvalidParameter(err)
- }
- cs := i.client.ContentStore()
- compressedDigest, uncompressedDigest, mt, err := saveArchive(ctx, cs, layerReader)
- if err != nil {
- logger.WithError(err).Debug("failed to write layer blob")
- return "", err
- }
- logger = logger.WithFields(logrus.Fields{
- "compressedDigest": compressedDigest,
- "uncompressedDigest": uncompressedDigest,
- })
- size, err := fillUncompressedLabel(ctx, cs, compressedDigest, uncompressedDigest)
- if err != nil {
- logger.WithError(err).Debug("failed to set uncompressed label on the compressed blob")
- return "", err
- }
- compressedRootfsDesc := ocispec.Descriptor{
- MediaType: mt,
- Digest: compressedDigest,
- Size: size,
- }
- ociCfg := containerConfigToOciImageConfig(imageConfig)
- createdAt := time.Now()
- config := ocispec.Image{
- Platform: *platform,
- Created: &createdAt,
- Author: "",
- Config: ociCfg,
- RootFS: ocispec.RootFS{
- Type: "layers",
- DiffIDs: []digest.Digest{uncompressedDigest},
- },
- History: []ocispec.History{
- {
- Created: &createdAt,
- CreatedBy: "",
- Author: "",
- Comment: msg,
- EmptyLayer: false,
- },
- },
- }
- configDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageConfig, config, nil)
- if err != nil {
- return "", err
- }
- manifest := ocispec.Manifest{
- MediaType: ocispec.MediaTypeImageManifest,
- Versioned: specs.Versioned{
- SchemaVersion: 2,
- },
- Config: configDesc,
- Layers: []ocispec.Descriptor{
- compressedRootfsDesc,
- },
- }
- manifestDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageManifest, manifest, map[string]string{
- "containerd.io/gc.ref.content.config": configDesc.Digest.String(),
- "containerd.io/gc.ref.content.l.0": compressedDigest.String(),
- })
- if err != nil {
- return "", err
- }
- id := image.ID(manifestDesc.Digest.String())
- img := images.Image{
- Name: refString,
- Target: manifestDesc,
- CreatedAt: createdAt,
- }
- if img.Name == "" {
- img.Name = danglingImageName(manifestDesc.Digest)
- }
- err = i.saveImage(ctx, img)
- if err != nil {
- logger.WithError(err).Debug("failed to save image")
- return "", err
- }
- err = i.unpackImage(ctx, img, *platform)
- if err != nil {
- logger.WithError(err).Debug("failed to unpack image")
- } else {
- i.LogImageEvent(id.String(), id.String(), "import")
- }
- return id, err
- }
- // saveArchive saves the archive from bufRd to the content store, compressing it if necessary.
- // Returns compressed blob digest, digest of the uncompressed data and media type of the stored blob.
- func saveArchive(ctx context.Context, cs content.Store, layerReader io.Reader) (digest.Digest, digest.Digest, string, error) {
- // Wrap the reader in buffered reader to allow peeks.
- p := pools.BufioReader32KPool
- bufRd := p.Get(layerReader)
- defer p.Put(bufRd)
- compression, err := detectCompression(bufRd)
- if err != nil {
- return "", "", "", err
- }
- var uncompressedReader io.Reader = bufRd
- switch compression {
- case archive.Gzip, archive.Zstd:
- // If the input is already a compressed layer, just save it as is.
- mediaType := ocispec.MediaTypeImageLayerGzip
- if compression == archive.Zstd {
- mediaType = ocispec.MediaTypeImageLayerZstd
- }
- compressedDigest, uncompressedDigest, err := writeCompressedBlob(ctx, cs, mediaType, bufRd)
- if err != nil {
- return "", "", "", err
- }
- return compressedDigest, uncompressedDigest, mediaType, nil
- case archive.Bzip2, archive.Xz:
- r, err := archive.DecompressStream(bufRd)
- if err != nil {
- return "", "", "", errdefs.InvalidParameter(err)
- }
- defer r.Close()
- uncompressedReader = r
- fallthrough
- case archive.Uncompressed:
- mediaType := ocispec.MediaTypeImageLayerGzip
- compression := archive.Gzip
- compressedDigest, uncompressedDigest, err := compressAndWriteBlob(ctx, cs, compression, mediaType, uncompressedReader)
- if err != nil {
- return "", "", "", err
- }
- return compressedDigest, uncompressedDigest, mediaType, nil
- }
- return "", "", "", errdefs.InvalidParameter(errors.New("unsupported archive compression"))
- }
- // writeCompressedBlob writes the blob and simultaneously computes the digest of the uncompressed data.
- func writeCompressedBlob(ctx context.Context, cs content.Store, mediaType string, bufRd *bufio.Reader) (digest.Digest, digest.Digest, error) {
- pr, pw := io.Pipe()
- defer pw.Close()
- defer pr.Close()
- c := make(chan digest.Digest)
- // Start copying the blob to the content store from the pipe and tee it to the pipe.
- go func() {
- compressedDigest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, io.TeeReader(bufRd, pw))
- pw.CloseWithError(err)
- c <- compressedDigest
- }()
- digester := digest.Canonical.Digester()
- // Decompress the piped blob.
- decompressedStream, err := archive.DecompressStream(pr)
- if err == nil {
- // Feed the digester with decompressed data.
- _, err = io.Copy(digester.Hash(), decompressedStream)
- decompressedStream.Close()
- }
- pr.CloseWithError(err)
- compressedDigest := <-c
- if err != nil {
- if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
- return "", "", errdefs.Cancelled(err)
- }
- return "", "", errdefs.System(err)
- }
- uncompressedDigest := digester.Digest()
- return compressedDigest, uncompressedDigest, nil
- }
- // compressAndWriteBlob compresses the uncompressedReader and stores it in the content store.
- func compressAndWriteBlob(ctx context.Context, cs content.Store, compression archive.Compression, mediaType string, uncompressedLayerReader io.Reader) (digest.Digest, digest.Digest, error) {
- pr, pw := io.Pipe()
- defer pr.Close()
- defer pw.Close()
- compressor, err := archive.CompressStream(pw, compression)
- if err != nil {
- return "", "", errdefs.InvalidParameter(err)
- }
- defer compressor.Close()
- writeChan := make(chan digest.Digest)
- // Start copying the blob to the content store from the pipe.
- go func() {
- digest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, pr)
- pr.CloseWithError(err)
- writeChan <- digest
- }()
- // Copy archive to the pipe and tee it to a digester.
- // This will feed the pipe the above goroutine is reading from.
- uncompressedDigester := digest.Canonical.Digester()
- readFromInputAndDigest := io.TeeReader(uncompressedLayerReader, uncompressedDigester.Hash())
- _, err = io.Copy(compressor, readFromInputAndDigest)
- compressor.Close()
- pw.CloseWithError(err)
- compressedDigest := <-writeChan
- if err != nil {
- if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
- return "", "", errdefs.Cancelled(err)
- }
- return "", "", errdefs.System(err)
- }
- return compressedDigest, uncompressedDigester.Digest(), err
- }
- // writeBlobAndReturnDigest writes a blob to the content store and returns the digest.
- func writeBlobAndReturnDigest(ctx context.Context, cs content.Store, mt string, reader io.Reader) (digest.Digest, error) {
- digester := digest.Canonical.Digester()
- if err := content.WriteBlob(ctx, cs, uuid.New().String(), io.TeeReader(reader, digester.Hash()), ocispec.Descriptor{MediaType: mt}); err != nil {
- return "", errdefs.System(err)
- }
- return digester.Digest(), nil
- }
- // saveImage creates an image in the ImageService or updates it if it exists.
- func (i *ImageService) saveImage(ctx context.Context, img images.Image) error {
- is := i.client.ImageService()
- if _, err := is.Update(ctx, img); err != nil {
- if cerrdefs.IsNotFound(err) {
- if _, err := is.Create(ctx, img); err != nil {
- return errdefs.Unknown(err)
- }
- } else {
- return errdefs.Unknown(err)
- }
- }
- return nil
- }
- // unpackImage unpacks the image into the snapshotter.
- func (i *ImageService) unpackImage(ctx context.Context, img images.Image, platform ocispec.Platform) error {
- c8dImg := containerd.NewImageWithPlatform(i.client, img, platforms.Only(platform))
- unpacked, err := c8dImg.IsUnpacked(ctx, i.snapshotter)
- if err != nil {
- return err
- }
- if !unpacked {
- err = c8dImg.Unpack(ctx, i.snapshotter)
- }
- return err
- }
- // detectCompression dectects the reader compression type.
- func detectCompression(bufRd *bufio.Reader) (archive.Compression, error) {
- bs, err := bufRd.Peek(10)
- if err != nil && err != io.EOF {
- // Note: we'll ignore any io.EOF error because there are some odd
- // cases where the layer.tar file will be empty (zero bytes) and
- // that results in an io.EOF from the Peek() call. So, in those
- // cases we'll just treat it as a non-compressed stream and
- // that means just create an empty layer.
- // See Issue 18170
- return archive.Uncompressed, errdefs.Unknown(err)
- }
- return archive.DetectCompression(bs), nil
- }
- // fillUncompressedLabel sets the uncompressed digest label on the compressed blob metadata
- // and returns the compressed blob size.
- func fillUncompressedLabel(ctx context.Context, cs content.Store, compressedDigest digest.Digest, uncompressedDigest digest.Digest) (int64, error) {
- info, err := cs.Info(ctx, compressedDigest)
- if err != nil {
- return 0, errdefs.Unknown(errors.Wrapf(err, "couldn't open previously written blob"))
- }
- size := info.Size
- info.Labels = map[string]string{"containerd.io/uncompressed": uncompressedDigest.String()}
- _, err = cs.Update(ctx, info, "labels.*")
- if err != nil {
- return 0, errdefs.System(errors.Wrapf(err, "couldn't set uncompressed label"))
- }
- return size, nil
- }
- // storeJson marshals the provided object as json and stores it.
- func storeJson(ctx context.Context, cs content.Ingester, mt string, obj interface{}, labels map[string]string) (ocispec.Descriptor, error) {
- configData, err := json.Marshal(obj)
- if err != nil {
- return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
- }
- configDigest := digest.FromBytes(configData)
- if err != nil {
- return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
- }
- desc := ocispec.Descriptor{
- MediaType: mt,
- Digest: configDigest,
- Size: int64(len(configData)),
- }
- var opts []content.Opt
- if labels != nil {
- opts = append(opts, content.WithLabels(labels))
- }
- err = content.WriteBlob(ctx, cs, configDigest.String(), bytes.NewReader(configData), desc, opts...)
- if err != nil {
- return ocispec.Descriptor{}, errdefs.System(err)
- }
- return desc, nil
- }
- func containerConfigToOciImageConfig(cfg *container.Config) ocispec.ImageConfig {
- ociCfg := ocispec.ImageConfig{
- User: cfg.User,
- Env: cfg.Env,
- Entrypoint: cfg.Entrypoint,
- Cmd: cfg.Cmd,
- Volumes: cfg.Volumes,
- WorkingDir: cfg.WorkingDir,
- Labels: cfg.Labels,
- StopSignal: cfg.StopSignal,
- ArgsEscaped: cfg.ArgsEscaped,
- }
- if len(cfg.ExposedPorts) > 0 {
- ociCfg.ExposedPorts = map[string]struct{}{}
- for k, v := range cfg.ExposedPorts {
- ociCfg.ExposedPorts[string(k)] = v
- }
- }
- return ociCfg
- }
|