Merge pull request #44628 from vvoland/c8d-import-upstream
daemon/c8d: Implement import
This commit is contained in:
commit
228f82fcda
5 changed files with 473 additions and 87 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/image"
|
||||
|
@ -31,7 +32,7 @@ type imageBackend interface {
|
|||
|
||||
type importExportBackend interface {
|
||||
LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error
|
||||
ImportImage(ctx context.Context, src string, repository string, platform *specs.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error
|
||||
ImportImage(ctx context.Context, ref reference.Named, platform *specs.Platform, msg string, layerReader io.Reader, changes []string) (dockerimage.ID, error)
|
||||
ExportImage(ctx context.Context, names []string, outStream io.Writer) error
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,9 @@ package image // import "github.com/docker/docker/api/server/router/image"
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -15,9 +17,11 @@ import (
|
|||
opts "github.com/docker/docker/api/types/image"
|
||||
"github.com/docker/docker/api/types/registry"
|
||||
"github.com/docker/docker/api/types/versions"
|
||||
"github.com/docker/docker/builder/remotecontext"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/image"
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/docker/docker/pkg/progress"
|
||||
"github.com/docker/docker/pkg/streamformatter"
|
||||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -33,7 +37,7 @@ func (ir *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrit
|
|||
img = r.Form.Get("fromImage")
|
||||
repo = r.Form.Get("repo")
|
||||
tag = r.Form.Get("tag")
|
||||
message = r.Form.Get("message")
|
||||
comment = r.Form.Get("message")
|
||||
progressErr error
|
||||
output = ioutils.NewWriteFlusher(w)
|
||||
platform *specs.Platform
|
||||
|
@ -67,7 +71,61 @@ func (ir *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrit
|
|||
progressErr = ir.backend.PullImage(ctx, img, tag, platform, metaHeaders, authConfig, output)
|
||||
} else { // import
|
||||
src := r.Form.Get("fromSrc")
|
||||
progressErr = ir.backend.ImportImage(ctx, src, repo, platform, tag, message, r.Body, output, r.Form["changes"])
|
||||
|
||||
var ref reference.Named
|
||||
if repo != "" {
|
||||
var err error
|
||||
ref, err = reference.ParseNormalizedNamed(repo)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
if _, isDigested := ref.(reference.Digested); isDigested {
|
||||
return errdefs.InvalidParameter(errors.New("cannot import digest reference"))
|
||||
}
|
||||
|
||||
if tag != "" {
|
||||
ref, err = reference.WithTag(ref, tag)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
} else {
|
||||
ref = reference.TagNameOnly(ref)
|
||||
}
|
||||
}
|
||||
|
||||
if len(comment) == 0 {
|
||||
comment = "Imported from " + src
|
||||
}
|
||||
|
||||
var layerReader io.ReadCloser
|
||||
defer r.Body.Close()
|
||||
if src == "-" {
|
||||
layerReader = r.Body
|
||||
} else {
|
||||
if len(strings.Split(src, "://")) == 1 {
|
||||
src = "http://" + src
|
||||
}
|
||||
u, err := url.Parse(src)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
|
||||
resp, err := remotecontext.GetWithStatusError(u.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
output.Write(streamformatter.FormatStatus("", "Downloading from %s", u))
|
||||
progressOutput := streamformatter.NewJSONProgressOutput(output, true)
|
||||
layerReader = progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Importing")
|
||||
defer layerReader.Close()
|
||||
}
|
||||
|
||||
var id image.ID
|
||||
id, progressErr = ir.backend.ImportImage(ctx, ref, platform, comment, layerReader, r.Form["changes"])
|
||||
|
||||
if progressErr == nil {
|
||||
output.Write(streamformatter.FormatStatus("", id.String()))
|
||||
}
|
||||
}
|
||||
if progressErr != nil {
|
||||
if !output.Flushed() {
|
||||
|
|
|
@ -1,18 +1,400 @@
|
|||
package containerd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/content"
|
||||
cerrdefs "github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/builder/dockerfile"
|
||||
"github.com/docker/docker/errdefs"
|
||||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/docker/docker/image"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/pools"
|
||||
"github.com/google/uuid"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/specs-go"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ImportImage imports an image, getting the archived layer data either from
|
||||
// inConfig (if src is "-"), or from a URI specified in src. Progress output is
|
||||
// written to outStream. Repository and tag names can optionally be given in
|
||||
// the repo and tag arguments, respectively.
|
||||
func (i *ImageService) ImportImage(ctx context.Context, src string, repository string, platform *specs.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error {
|
||||
return errdefs.NotImplemented(errors.New("not implemented"))
|
||||
// ImportImage imports an image, getting the archived layer data from layerReader.
|
||||
// Layer archive is imported as-is if the compression is gzip or zstd.
|
||||
// Uncompressed, xz and bzip2 archives are recompressed into gzip.
|
||||
// The image is tagged with the given reference.
|
||||
// If the platform is nil, the default host platform is used.
|
||||
// The message is used as the history comment.
|
||||
// Image configuration is derived from the dockerfile instructions in changes.
|
||||
func (i *ImageService) ImportImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) {
|
||||
refString := ""
|
||||
if ref != nil {
|
||||
refString = ref.String()
|
||||
}
|
||||
logger := logrus.WithField("ref", refString)
|
||||
|
||||
ctx, release, err := i.client.WithLease(ctx)
|
||||
if err != nil {
|
||||
return "", errdefs.System(err)
|
||||
}
|
||||
defer release(ctx)
|
||||
|
||||
if platform == nil {
|
||||
def := platforms.DefaultSpec()
|
||||
platform = &def
|
||||
}
|
||||
|
||||
imageConfig, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS)
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("failed to process changes")
|
||||
return "", errdefs.InvalidParameter(err)
|
||||
}
|
||||
|
||||
cs := i.client.ContentStore()
|
||||
|
||||
compressedDigest, uncompressedDigest, mt, err := saveArchive(ctx, cs, layerReader)
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("failed to write layer blob")
|
||||
return "", err
|
||||
}
|
||||
logger = logger.WithFields(logrus.Fields{
|
||||
"compressedDigest": compressedDigest,
|
||||
"uncompressedDigest": uncompressedDigest,
|
||||
})
|
||||
|
||||
size, err := fillUncompressedLabel(ctx, cs, compressedDigest, uncompressedDigest)
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("failed to set uncompressed label on the compressed blob")
|
||||
return "", err
|
||||
}
|
||||
|
||||
compressedRootfsDesc := ocispec.Descriptor{
|
||||
MediaType: mt,
|
||||
Digest: compressedDigest,
|
||||
Size: size,
|
||||
}
|
||||
|
||||
ociCfg := containerConfigToOciImageConfig(imageConfig)
|
||||
createdAt := time.Now()
|
||||
config := ocispec.Image{
|
||||
Architecture: platform.Architecture,
|
||||
OS: platform.OS,
|
||||
Created: &createdAt,
|
||||
Author: "",
|
||||
Config: ociCfg,
|
||||
RootFS: ocispec.RootFS{
|
||||
Type: "layers",
|
||||
DiffIDs: []digest.Digest{uncompressedDigest},
|
||||
},
|
||||
History: []ocispec.History{
|
||||
{
|
||||
Created: &createdAt,
|
||||
CreatedBy: "",
|
||||
Author: "",
|
||||
Comment: msg,
|
||||
EmptyLayer: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
configDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageConfig, config, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
manifest := ocispec.Manifest{
|
||||
MediaType: ocispec.MediaTypeImageManifest,
|
||||
Versioned: specs.Versioned{
|
||||
SchemaVersion: 2,
|
||||
},
|
||||
Config: configDesc,
|
||||
Layers: []ocispec.Descriptor{
|
||||
compressedRootfsDesc,
|
||||
},
|
||||
}
|
||||
manifestDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageManifest, manifest, map[string]string{
|
||||
"containerd.io/gc.ref.content.config": configDesc.Digest.String(),
|
||||
"containerd.io/gc.ref.content.l.0": compressedDigest.String(),
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
id := image.ID(manifestDesc.Digest.String())
|
||||
img := images.Image{
|
||||
Name: refString,
|
||||
Target: manifestDesc,
|
||||
CreatedAt: createdAt,
|
||||
}
|
||||
if img.Name == "" {
|
||||
// TODO(vvoland): danglingImageName(manifestDesc.Digest)
|
||||
img.Name = "dangling@" + manifestDesc.Digest.String()
|
||||
|
||||
}
|
||||
|
||||
err = i.saveImage(ctx, img)
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("failed to save image")
|
||||
return "", err
|
||||
}
|
||||
err = i.unpackImage(ctx, img, *platform)
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("failed to unpack image")
|
||||
}
|
||||
|
||||
return id, err
|
||||
}
|
||||
|
||||
// saveArchive saves the archive from bufRd to the content store, compressing it if necessary.
|
||||
// Returns compressed blob digest, digest of the uncompressed data and media type of the stored blob.
|
||||
func saveArchive(ctx context.Context, cs content.Store, layerReader io.Reader) (digest.Digest, digest.Digest, string, error) {
|
||||
// Wrap the reader in buffered reader to allow peeks.
|
||||
p := pools.BufioReader32KPool
|
||||
bufRd := p.Get(layerReader)
|
||||
defer p.Put(bufRd)
|
||||
|
||||
compression, err := detectCompression(bufRd)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
var uncompressedReader io.Reader = bufRd
|
||||
switch compression {
|
||||
case archive.Gzip, archive.Zstd:
|
||||
// If the input is already a compressed layer, just save it as is.
|
||||
mediaType := ocispec.MediaTypeImageLayerGzip
|
||||
if compression == archive.Zstd {
|
||||
mediaType = ocispec.MediaTypeImageLayerZstd
|
||||
}
|
||||
|
||||
compressedDigest, uncompressedDigest, err := writeCompressedBlob(ctx, cs, mediaType, bufRd)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
return compressedDigest, uncompressedDigest, mediaType, nil
|
||||
case archive.Bzip2, archive.Xz:
|
||||
r, err := archive.DecompressStream(bufRd)
|
||||
if err != nil {
|
||||
return "", "", "", errdefs.InvalidParameter(err)
|
||||
}
|
||||
defer r.Close()
|
||||
uncompressedReader = r
|
||||
fallthrough
|
||||
case archive.Uncompressed:
|
||||
mediaType := ocispec.MediaTypeImageLayerGzip
|
||||
compression := archive.Gzip
|
||||
|
||||
compressedDigest, uncompressedDigest, err := compressAndWriteBlob(ctx, cs, compression, mediaType, uncompressedReader)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
return compressedDigest, uncompressedDigest, mediaType, nil
|
||||
}
|
||||
|
||||
return "", "", "", errdefs.InvalidParameter(errors.New("unsupported archive compression"))
|
||||
}
|
||||
|
||||
// writeCompressedBlob writes the blob and simultaneously computes the digest of the uncompressed data.
|
||||
func writeCompressedBlob(ctx context.Context, cs content.Store, mediaType string, bufRd *bufio.Reader) (digest.Digest, digest.Digest, error) {
|
||||
pr, pw := io.Pipe()
|
||||
defer pw.Close()
|
||||
defer pr.Close()
|
||||
|
||||
c := make(chan digest.Digest)
|
||||
// Start copying the blob to the content store from the pipe and tee it to the pipe.
|
||||
go func() {
|
||||
compressedDigest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, io.TeeReader(bufRd, pw))
|
||||
pw.CloseWithError(err)
|
||||
c <- compressedDigest
|
||||
}()
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
|
||||
// Decompress the piped blob.
|
||||
decompressedStream, err := archive.DecompressStream(pr)
|
||||
if err == nil {
|
||||
// Feed the digester with decompressed data.
|
||||
_, err = io.Copy(digester.Hash(), decompressedStream)
|
||||
decompressedStream.Close()
|
||||
}
|
||||
pr.CloseWithError(err)
|
||||
|
||||
compressedDigest := <-c
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return "", "", errdefs.Cancelled(err)
|
||||
}
|
||||
return "", "", errdefs.System(err)
|
||||
}
|
||||
|
||||
uncompressedDigest := digester.Digest()
|
||||
return compressedDigest, uncompressedDigest, nil
|
||||
}
|
||||
|
||||
// compressAndWriteBlob compresses the uncompressedReader and stores it in the content store.
|
||||
func compressAndWriteBlob(ctx context.Context, cs content.Store, compression archive.Compression, mediaType string, uncompressedLayerReader io.Reader) (digest.Digest, digest.Digest, error) {
|
||||
pr, pw := io.Pipe()
|
||||
defer pr.Close()
|
||||
defer pw.Close()
|
||||
|
||||
compressor, err := archive.CompressStream(pw, compression)
|
||||
if err != nil {
|
||||
return "", "", errdefs.InvalidParameter(err)
|
||||
}
|
||||
defer compressor.Close()
|
||||
|
||||
writeChan := make(chan digest.Digest)
|
||||
// Start copying the blob to the content store from the pipe.
|
||||
go func() {
|
||||
digest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, pr)
|
||||
pr.CloseWithError(err)
|
||||
writeChan <- digest
|
||||
}()
|
||||
|
||||
// Copy archive to the pipe and tee it to a digester.
|
||||
// This will feed the pipe the above goroutine is reading from.
|
||||
uncompressedDigester := digest.Canonical.Digester()
|
||||
readFromInputAndDigest := io.TeeReader(uncompressedLayerReader, uncompressedDigester.Hash())
|
||||
_, err = io.Copy(compressor, readFromInputAndDigest)
|
||||
compressor.Close()
|
||||
pw.CloseWithError(err)
|
||||
|
||||
compressedDigest := <-writeChan
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return "", "", errdefs.Cancelled(err)
|
||||
}
|
||||
return "", "", errdefs.System(err)
|
||||
}
|
||||
|
||||
return compressedDigest, uncompressedDigester.Digest(), err
|
||||
}
|
||||
|
||||
// writeBlobAndReturnDigest writes a blob to the content store and returns the digest.
|
||||
func writeBlobAndReturnDigest(ctx context.Context, cs content.Store, mt string, reader io.Reader) (digest.Digest, error) {
|
||||
digester := digest.Canonical.Digester()
|
||||
if err := content.WriteBlob(ctx, cs, uuid.New().String(), io.TeeReader(reader, digester.Hash()), ocispec.Descriptor{MediaType: mt}); err != nil {
|
||||
return "", errdefs.System(err)
|
||||
}
|
||||
return digester.Digest(), nil
|
||||
}
|
||||
|
||||
// saveImage creates an image in the ImageService or updates it if it exists.
|
||||
func (i *ImageService) saveImage(ctx context.Context, img images.Image) error {
|
||||
is := i.client.ImageService()
|
||||
|
||||
if _, err := is.Update(ctx, img); err != nil {
|
||||
if cerrdefs.IsNotFound(err) {
|
||||
if _, err := is.Create(ctx, img); err != nil {
|
||||
return errdefs.Unknown(err)
|
||||
}
|
||||
} else {
|
||||
return errdefs.Unknown(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// unpackImage unpacks the image into the snapshotter.
|
||||
func (i *ImageService) unpackImage(ctx context.Context, img images.Image, platform ocispec.Platform) error {
|
||||
c8dImg := containerd.NewImageWithPlatform(i.client, img, platforms.Only(platform))
|
||||
unpacked, err := c8dImg.IsUnpacked(ctx, i.snapshotter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !unpacked {
|
||||
err = c8dImg.Unpack(ctx, i.snapshotter)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// detectCompression dectects the reader compression type.
|
||||
func detectCompression(bufRd *bufio.Reader) (archive.Compression, error) {
|
||||
bs, err := bufRd.Peek(10)
|
||||
if err != nil && err != io.EOF {
|
||||
// Note: we'll ignore any io.EOF error because there are some odd
|
||||
// cases where the layer.tar file will be empty (zero bytes) and
|
||||
// that results in an io.EOF from the Peek() call. So, in those
|
||||
// cases we'll just treat it as a non-compressed stream and
|
||||
// that means just create an empty layer.
|
||||
// See Issue 18170
|
||||
return archive.Uncompressed, errdefs.Unknown(err)
|
||||
}
|
||||
|
||||
return archive.DetectCompression(bs), nil
|
||||
}
|
||||
|
||||
// fillUncompressedLabel sets the uncompressed digest label on the compressed blob metadata
|
||||
// and returns the compressed blob size.
|
||||
func fillUncompressedLabel(ctx context.Context, cs content.Store, compressedDigest digest.Digest, uncompressedDigest digest.Digest) (int64, error) {
|
||||
info, err := cs.Info(ctx, compressedDigest)
|
||||
if err != nil {
|
||||
return 0, errdefs.Unknown(errors.Wrapf(err, "couldn't open previously written blob"))
|
||||
}
|
||||
size := info.Size
|
||||
info.Labels = map[string]string{"containerd.io/uncompressed": uncompressedDigest.String()}
|
||||
|
||||
_, err = cs.Update(ctx, info, "labels.*")
|
||||
if err != nil {
|
||||
return 0, errdefs.System(errors.Wrapf(err, "couldn't set uncompressed label"))
|
||||
}
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// storeJson marshals the provided object as json and stores it.
|
||||
func storeJson(ctx context.Context, cs content.Ingester, mt string, obj interface{}, labels map[string]string) (ocispec.Descriptor, error) {
|
||||
configData, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
|
||||
}
|
||||
configDigest := digest.FromBytes(configData)
|
||||
if err != nil {
|
||||
return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
|
||||
}
|
||||
desc := ocispec.Descriptor{
|
||||
MediaType: mt,
|
||||
Digest: configDigest,
|
||||
Size: int64(len(configData)),
|
||||
}
|
||||
|
||||
var opts []content.Opt
|
||||
if labels != nil {
|
||||
opts = append(opts, content.WithLabels(labels))
|
||||
}
|
||||
|
||||
err = content.WriteBlob(ctx, cs, configDigest.String(), bytes.NewReader(configData), desc, opts...)
|
||||
if err != nil {
|
||||
return ocispec.Descriptor{}, errdefs.System(err)
|
||||
}
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
func containerConfigToOciImageConfig(cfg *container.Config) ocispec.ImageConfig {
|
||||
ociCfg := ocispec.ImageConfig{
|
||||
User: cfg.User,
|
||||
Env: cfg.Env,
|
||||
Entrypoint: cfg.Entrypoint,
|
||||
Cmd: cfg.Cmd,
|
||||
Volumes: cfg.Volumes,
|
||||
WorkingDir: cfg.WorkingDir,
|
||||
Labels: cfg.Labels,
|
||||
StopSignal: cfg.StopSignal,
|
||||
}
|
||||
for k, v := range cfg.ExposedPorts {
|
||||
ociCfg.ExposedPorts[string(k)] = v
|
||||
}
|
||||
|
||||
return ociCfg
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ type ImageService interface {
|
|||
LogImageEventWithAttributes(imageID, refName, action string, attributes map[string]string)
|
||||
CountImages() int
|
||||
ImagesPrune(ctx context.Context, pruneFilters filters.Args) (*types.ImagesPruneReport, error)
|
||||
ImportImage(ctx context.Context, src string, repository string, platform *v1.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error
|
||||
ImportImage(ctx context.Context, ref reference.Named, platform *v1.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error)
|
||||
TagImage(imageName, repository, tag string) (string, error)
|
||||
TagImageWithReference(imageID image.ID, newTag reference.Named) error
|
||||
GetImage(ctx context.Context, refOrID string, options imagetype.GetImageOpts) (*image.Image, error)
|
||||
|
|
|
@ -4,102 +4,49 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/builder/dockerfile"
|
||||
"github.com/docker/docker/builder/remotecontext"
|
||||
"github.com/docker/docker/dockerversion"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/image"
|
||||
"github.com/docker/docker/layer"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/progress"
|
||||
"github.com/docker/docker/pkg/streamformatter"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ImportImage imports an image, getting the archived layer data either from
|
||||
// inConfig (if src is "-"), or from a URI specified in src. Progress output is
|
||||
// written to outStream. Repository and tag names can optionally be given in
|
||||
// the repo and tag arguments, respectively.
|
||||
func (i *ImageService) ImportImage(ctx context.Context, src string, repository string, platform *specs.Platform, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error {
|
||||
var (
|
||||
rc io.ReadCloser
|
||||
resp *http.Response
|
||||
newRef reference.Named
|
||||
)
|
||||
|
||||
if repository != "" {
|
||||
var err error
|
||||
newRef, err = reference.ParseNormalizedNamed(repository)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
if _, isCanonical := newRef.(reference.Canonical); isCanonical {
|
||||
return errdefs.InvalidParameter(errors.New("cannot import digest reference"))
|
||||
}
|
||||
|
||||
if tag != "" {
|
||||
newRef, err = reference.WithTag(newRef, tag)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize platform - default to the operating system and architecture if not supplied.
|
||||
// ImportImage imports an image, getting the archived layer data from layerReader.
|
||||
// Uncompressed layer archive is passed to the layerStore and handled by the
|
||||
// underlying graph driver.
|
||||
// Image is tagged with the given reference.
|
||||
// If the platform is nil, the default host platform is used.
|
||||
// Message is used as the image's history comment.
|
||||
// Image configuration is derived from the dockerfile instructions in changes.
|
||||
func (i *ImageService) ImportImage(ctx context.Context, newRef reference.Named, platform *specs.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) {
|
||||
if platform == nil {
|
||||
p := platforms.DefaultSpec()
|
||||
platform = &p
|
||||
def := platforms.DefaultSpec()
|
||||
platform = &def
|
||||
}
|
||||
if !system.IsOSSupported(platform.OS) {
|
||||
return errdefs.InvalidParameter(system.ErrNotSupportedOperatingSystem)
|
||||
return "", errdefs.InvalidParameter(system.ErrNotSupportedOperatingSystem)
|
||||
}
|
||||
|
||||
config, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if src == "-" {
|
||||
rc = inConfig
|
||||
} else {
|
||||
inConfig.Close()
|
||||
if len(strings.Split(src, "://")) == 1 {
|
||||
src = "http://" + src
|
||||
}
|
||||
u, err := url.Parse(src)
|
||||
if err != nil {
|
||||
return errdefs.InvalidParameter(err)
|
||||
}
|
||||
|
||||
resp, err = remotecontext.GetWithStatusError(u.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
outStream.Write(streamformatter.FormatStatus("", "Downloading from %s", u))
|
||||
progressOutput := streamformatter.NewJSONProgressOutput(outStream, true)
|
||||
rc = progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Importing")
|
||||
return "", errdefs.InvalidParameter(err)
|
||||
}
|
||||
|
||||
defer rc.Close()
|
||||
if len(msg) == 0 {
|
||||
msg = "Imported from " + src
|
||||
}
|
||||
|
||||
inflatedLayerData, err := archive.DecompressStream(rc)
|
||||
inflatedLayerData, err := archive.DecompressStream(layerReader)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
l, err := i.layerStore.Register(inflatedLayerData, "")
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
defer layer.ReleaseAndLog(i.layerStore, l)
|
||||
|
||||
|
@ -124,22 +71,20 @@ func (i *ImageService) ImportImage(ctx context.Context, src string, repository s
|
|||
}},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
id, err := i.imageStore.Create(imgConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// FIXME: connect with commit code and call refstore directly
|
||||
if newRef != nil {
|
||||
if err := i.TagImageWithReference(id, newRef); err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
i.LogImageEvent(id.String(), id.String(), "import")
|
||||
outStream.Write(streamformatter.FormatStatus("", id.String()))
|
||||
return nil
|
||||
return id, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue