|
@@ -11,9 +11,10 @@ package converter
|
|
|
|
|
|
import (
|
|
|
"archive/tar"
|
|
|
+ "bytes"
|
|
|
"compress/gzip"
|
|
|
"context"
|
|
|
- "encoding/json"
|
|
|
+ "encoding/binary"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"os"
|
|
@@ -24,10 +25,12 @@ import (
|
|
|
"github.com/containerd/containerd/archive"
|
|
|
"github.com/containerd/containerd/archive/compression"
|
|
|
"github.com/containerd/containerd/content"
|
|
|
+ "github.com/containerd/containerd/errdefs"
|
|
|
"github.com/containerd/containerd/images"
|
|
|
"github.com/containerd/containerd/images/converter"
|
|
|
"github.com/containerd/containerd/labels"
|
|
|
"github.com/containerd/fifo"
|
|
|
+ "github.com/klauspost/compress/zstd"
|
|
|
"github.com/opencontainers/go-digest"
|
|
|
"github.com/opencontainers/image-spec/identity"
|
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
@@ -35,11 +38,14 @@ import (
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
|
"github.com/containerd/nydus-snapshotter/pkg/converter/tool"
|
|
|
- "github.com/containerd/nydus-snapshotter/pkg/errdefs"
|
|
|
+ "github.com/containerd/nydus-snapshotter/pkg/label"
|
|
|
)
|
|
|
|
|
|
-const bootstrapNameInTar = "image.boot"
|
|
|
-const blobNameInTar = "image.blob"
|
|
|
+const EntryBlob = "image.blob"
|
|
|
+const EntryBootstrap = "image.boot"
|
|
|
+const EntryBlobMeta = "blob.meta"
|
|
|
+const EntryBlobMetaHeader = "blob.meta.header"
|
|
|
+const EntryTOC = "rafs.blob.toc"
|
|
|
|
|
|
const envNydusBuilder = "NYDUS_BUILDER"
|
|
|
const envNydusWorkDir = "NYDUS_WORKDIR"
|
|
@@ -113,152 +119,190 @@ func unpackOciTar(ctx context.Context, dst string, reader io.Reader) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// Unpack a Nydus formatted tar stream into a directory.
|
|
|
-func unpackNydusTar(ctx context.Context, bootDst, blobDst string, ra content.ReaderAt) error {
|
|
|
+// unpackNydusBlob unpacks a Nydus formatted tar stream into a directory.
|
|
|
+// unpackBlob indicates whether to unpack blob data.
|
|
|
+func unpackNydusBlob(bootDst, blobDst string, ra content.ReaderAt, unpackBlob bool) error {
|
|
|
boot, err := os.OpenFile(bootDst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
|
if err != nil {
|
|
|
return errors.Wrapf(err, "write to bootstrap %s", bootDst)
|
|
|
}
|
|
|
defer boot.Close()
|
|
|
|
|
|
- if err = unpackBootstrapFromNydusTar(ctx, ra, boot); err != nil {
|
|
|
+ if _, err = UnpackEntry(ra, EntryBootstrap, boot); err != nil {
|
|
|
return errors.Wrap(err, "unpack bootstrap from nydus")
|
|
|
}
|
|
|
|
|
|
- blob, err := os.OpenFile(blobDst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
|
- if err != nil {
|
|
|
- return errors.Wrapf(err, "write to blob %s", blobDst)
|
|
|
- }
|
|
|
- defer blob.Close()
|
|
|
+ if unpackBlob {
|
|
|
+ blob, err := os.OpenFile(blobDst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrapf(err, "write to blob %s", blobDst)
|
|
|
+ }
|
|
|
+ defer blob.Close()
|
|
|
|
|
|
- if err = unpackBlobFromNydusTar(ctx, ra, blob); err != nil {
|
|
|
- return errors.Wrap(err, "unpack blob from nydus")
|
|
|
+ if _, err = UnpackEntry(ra, EntryBlob, blob); err != nil {
|
|
|
+ if errors.Is(err, ErrNotFound) {
|
|
|
+ // The nydus layer may contain only bootstrap and no blob
|
|
|
+ // data, which should be ignored.
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return errors.Wrap(err, "unpack blob from nydus")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// Unpack the bootstrap from nydus formatted tar stream (blob + bootstrap).
|
|
|
-// The nydus formatted tar stream is a tar-like structure that arranges the
|
|
|
-// data as follows:
|
|
|
-//
|
|
|
-// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
|
|
|
-func unpackBootstrapFromNydusTar(ctx context.Context, ra content.ReaderAt, target io.Writer) error {
|
|
|
- cur := ra.Size()
|
|
|
- reader := newSeekReader(ra)
|
|
|
-
|
|
|
+func seekFileByTarHeader(ra content.ReaderAt, targetName string, handle func(io.Reader, *tar.Header) error) error {
|
|
|
const headerSize = 512
|
|
|
|
|
|
- // Seek from tail to head of nydus formatted tar stream to find nydus
|
|
|
- // bootstrap data.
|
|
|
- for {
|
|
|
- if headerSize > cur {
|
|
|
- return fmt.Errorf("invalid tar format at pos %d", cur)
|
|
|
- }
|
|
|
+ if headerSize > ra.Size() {
|
|
|
+ return fmt.Errorf("invalid nydus tar size %d", ra.Size())
|
|
|
+ }
|
|
|
|
|
|
- // Try to seek to the part of tar header.
|
|
|
- var err error
|
|
|
- cur, err = reader.Seek(cur-headerSize, io.SeekCurrent)
|
|
|
+ cur := ra.Size() - headerSize
|
|
|
+ reader := newSeekReader(ra)
|
|
|
+
|
|
|
+ // Seek from tail to head of nydus formatted tar stream to find
|
|
|
+ // target data.
|
|
|
+ for {
|
|
|
+ // Try to seek the part of tar header.
|
|
|
+ _, err := reader.Seek(cur, io.SeekStart)
|
|
|
if err != nil {
|
|
|
- return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
|
|
|
+ return errors.Wrapf(err, "seek %d for nydus tar header", cur)
|
|
|
}
|
|
|
|
|
|
- tr := tar.NewReader(reader)
|
|
|
// Parse tar header.
|
|
|
+ tr := tar.NewReader(reader)
|
|
|
hdr, err := tr.Next()
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "parse tar header")
|
|
|
+ return errors.Wrap(err, "parse nydus tar header")
|
|
|
}
|
|
|
|
|
|
- if hdr.Name == bootstrapNameInTar {
|
|
|
- // Try to seek to the part of tar data (bootstrap_data).
|
|
|
- if hdr.Size > cur {
|
|
|
- return fmt.Errorf("invalid tar format at pos %d", cur)
|
|
|
- }
|
|
|
- bootstrapOffset := cur - hdr.Size
|
|
|
- _, err = reader.Seek(bootstrapOffset, io.SeekStart)
|
|
|
+ if cur < hdr.Size {
|
|
|
+ return fmt.Errorf("invalid nydus tar data, name %s, size %d", hdr.Name, hdr.Size)
|
|
|
+ }
|
|
|
+
|
|
|
+ if hdr.Name == targetName {
|
|
|
+ // Try to seek the part of tar data.
|
|
|
+ _, err = reader.Seek(cur-hdr.Size, io.SeekStart)
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "seek to bootstrap data offset")
|
|
|
+ return errors.Wrap(err, "seek target data offset")
|
|
|
}
|
|
|
+ dataReader := io.NewSectionReader(reader, cur-hdr.Size, hdr.Size)
|
|
|
|
|
|
- // Copy tar data (bootstrap_data) to provided target writer.
|
|
|
- if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
|
|
|
- return errors.Wrap(err, "copy bootstrap data to reader")
|
|
|
+ if err := handle(dataReader, hdr); err != nil {
|
|
|
+ return errors.Wrap(err, "handle target data")
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- if cur == hdr.Size {
|
|
|
+ cur = cur - hdr.Size - headerSize
|
|
|
+ if cur < 0 {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return fmt.Errorf("can't find bootstrap in nydus tar")
|
|
|
+ return errors.Wrapf(ErrNotFound, "can't find target %s by seeking tar", targetName)
|
|
|
}
|
|
|
|
|
|
-// Unpack the blob from nydus formatted tar stream (blob + bootstrap).
|
|
|
-// The nydus formatted tar stream is a tar-like structure that arranges the
|
|
|
-// data as follows:
|
|
|
-//
|
|
|
-// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
|
|
|
-func unpackBlobFromNydusTar(ctx context.Context, ra content.ReaderAt, target io.Writer) error {
|
|
|
- cur := ra.Size()
|
|
|
- reader := newSeekReader(ra)
|
|
|
-
|
|
|
- const headerSize = 512
|
|
|
-
|
|
|
- // Seek from tail to head of nydus formatted tar stream to find nydus
|
|
|
- // bootstrap data.
|
|
|
- for {
|
|
|
- if headerSize > cur {
|
|
|
- break
|
|
|
- }
|
|
|
+func seekFileByTOC(ra content.ReaderAt, targetName string, handle func(io.Reader, *tar.Header) error) (*TOCEntry, error) {
|
|
|
+ entrySize := 128
|
|
|
+ var tocEntry *TOCEntry
|
|
|
|
|
|
- // Try to seek to the part of tar header.
|
|
|
- var err error
|
|
|
- cur, err = reader.Seek(cur-headerSize, io.SeekStart)
|
|
|
+ err := seekFileByTarHeader(ra, EntryTOC, func(tocEntryDataReader io.Reader, _ *tar.Header) error {
|
|
|
+ entryData, err := io.ReadAll(tocEntryDataReader)
|
|
|
if err != nil {
|
|
|
- return errors.Wrapf(err, "seek to %d for tar header", cur-headerSize)
|
|
|
+ return errors.Wrap(err, "read toc entries")
|
|
|
}
|
|
|
-
|
|
|
- tr := tar.NewReader(reader)
|
|
|
- // Parse tar header.
|
|
|
- hdr, err := tr.Next()
|
|
|
- if err != nil {
|
|
|
- return errors.Wrap(err, "parse tar header")
|
|
|
+ if len(entryData)%entrySize != 0 {
|
|
|
+ return fmt.Errorf("invalid entries length %d", len(entryData))
|
|
|
}
|
|
|
|
|
|
- if hdr.Name == bootstrapNameInTar {
|
|
|
- if hdr.Size > cur {
|
|
|
- return fmt.Errorf("invalid tar format at pos %d", cur)
|
|
|
+ count := len(entryData) / entrySize
|
|
|
+ for i := 0; i < count; i++ {
|
|
|
+ var entry TOCEntry
|
|
|
+ r := bytes.NewReader(entryData[i*entrySize : i*entrySize+entrySize])
|
|
|
+ if err := binary.Read(r, binary.LittleEndian, &entry); err != nil {
|
|
|
+ return errors.Wrap(err, "read toc entries")
|
|
|
}
|
|
|
- cur, err = reader.Seek(cur-hdr.Size, io.SeekStart)
|
|
|
- if err != nil {
|
|
|
- return errors.Wrap(err, "seek to bootstrap data offset")
|
|
|
- }
|
|
|
- } else if hdr.Name == blobNameInTar {
|
|
|
- if hdr.Size > cur {
|
|
|
- return fmt.Errorf("invalid tar format at pos %d", cur)
|
|
|
- }
|
|
|
- _, err = reader.Seek(cur-hdr.Size, io.SeekStart)
|
|
|
- if err != nil {
|
|
|
- return errors.Wrap(err, "seek to blob data offset")
|
|
|
- }
|
|
|
- if _, err := io.CopyN(target, reader, hdr.Size); err != nil {
|
|
|
- return errors.Wrap(err, "copy blob data to reader")
|
|
|
+ if entry.GetName() == targetName {
|
|
|
+ compressor, err := entry.GetCompressor()
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "get compressor of entry")
|
|
|
+ }
|
|
|
+ compressedOffset := int64(entry.GetCompressedOffset())
|
|
|
+ compressedSize := int64(entry.GetCompressedSize())
|
|
|
+ sr := io.NewSectionReader(ra, compressedOffset, compressedSize)
|
|
|
+
|
|
|
+ var rd io.Reader
|
|
|
+ switch compressor {
|
|
|
+ case CompressorZstd:
|
|
|
+ decoder, err := zstd.NewReader(sr)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "seek to target data offset")
|
|
|
+ }
|
|
|
+ defer decoder.Close()
|
|
|
+ rd = decoder
|
|
|
+ case CompressorNone:
|
|
|
+ rd = sr
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("unsupported compressor %x", compressor)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := handle(rd, nil); err != nil {
|
|
|
+ return errors.Wrap(err, "handle target entry data")
|
|
|
+ }
|
|
|
+
|
|
|
+ tocEntry = &entry
|
|
|
+
|
|
|
+ return nil
|
|
|
}
|
|
|
- return nil
|
|
|
}
|
|
|
+
|
|
|
+ return errors.Wrapf(ErrNotFound, "can't find target %s by seeking TOC", targetName)
|
|
|
+ })
|
|
|
+
|
|
|
+ return tocEntry, err
|
|
|
+}
|
|
|
+
|
|
|
+// Unpack the file from nydus formatted tar stream.
|
|
|
+// The nydus formatted tar stream is a tar-like structure that arranges the
|
|
|
+// data as follows:
|
|
|
+//
|
|
|
+// `data | tar_header | ... | data | tar_header | [toc_entry | ... | toc_entry | tar_header]`
|
|
|
+func UnpackEntry(ra content.ReaderAt, targetName string, target io.Writer) (*TOCEntry, error) {
|
|
|
+ handle := func(dataReader io.Reader, _ *tar.Header) error {
|
|
|
+ // Copy data to provided target writer.
|
|
|
+ if _, err := io.Copy(target, dataReader); err != nil {
|
|
|
+ return errors.Wrap(err, "copy target data to reader")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return seekFile(ra, targetName, handle)
|
|
|
+}
|
|
|
+
|
|
|
+func seekFile(ra content.ReaderAt, targetName string, handle func(io.Reader, *tar.Header) error) (*TOCEntry, error) {
|
|
|
+ // Try seek target data by TOC.
|
|
|
+ entry, err := seekFileByTOC(ra, targetName, handle)
|
|
|
+ if err != nil {
|
|
|
+ if !errors.Is(err, ErrNotFound) {
|
|
|
+ return nil, errors.Wrap(err, "seek file by TOC")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return entry, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Seek target data by tar header, ensure compatible with old rafs blob format.
|
|
|
+ return nil, seekFileByTarHeader(ra, targetName, handle)
|
|
|
}
|
|
|
|
|
|
// Pack converts an OCI tar stream to nydus formatted stream with a tar-like
|
|
|
// structure that arranges the data as follows:
|
|
|
//
|
|
|
-// `blob_data | blob_tar_header | bootstrap_data | bootstrap_tar_header`
|
|
|
+// `data | tar_header | data | tar_header | [toc_entry | ... | toc_entry | tar_header]`
|
|
|
//
|
|
|
// The caller should write OCI tar stream into the returned `io.WriteCloser`,
|
|
|
// then the Pack method will write the nydus formatted stream to `dest`
|
|
@@ -267,6 +311,24 @@ func unpackBlobFromNydusTar(ctx context.Context, ra content.ReaderAt, target io.
|
|
|
// Important: the caller must check `io.WriteCloser.Close() == nil` to ensure
|
|
|
// the conversion workflow is finished.
|
|
|
func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser, error) {
|
|
|
+ if opt.FsVersion == "" {
|
|
|
+ opt.FsVersion = "6"
|
|
|
+ }
|
|
|
+
|
|
|
+ builderPath := getBuilder(opt.BuilderPath)
|
|
|
+ opt.features = tool.DetectFeatures(builderPath, []tool.Feature{tool.FeatureTar2Rafs})
|
|
|
+
|
|
|
+ if opt.OCIRef {
|
|
|
+ if opt.FsVersion == "6" {
|
|
|
+ return packFromTar(ctx, dest, opt)
|
|
|
+ }
|
|
|
+ return nil, fmt.Errorf("oci ref can only be supported by fs version 6")
|
|
|
+ }
|
|
|
+
|
|
|
+ if opt.features.Contains(tool.FeatureTar2Rafs) {
|
|
|
+ return packFromTar(ctx, dest, opt)
|
|
|
+ }
|
|
|
+
|
|
|
workDir, err := ensureWorkDir(opt.WorkDir)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "ensure work directory")
|
|
@@ -295,9 +357,7 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser,
|
|
|
}()
|
|
|
|
|
|
wc := newWriteCloser(pw, func() error {
|
|
|
- defer func() {
|
|
|
- os.RemoveAll(workDir)
|
|
|
- }()
|
|
|
+ defer os.RemoveAll(workDir)
|
|
|
|
|
|
// Because PipeWriter#Close is called does not mean that the PipeReader
|
|
|
// has finished reading all the data, and unpack may not be complete yet,
|
|
@@ -313,15 +373,19 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser,
|
|
|
|
|
|
go func() {
|
|
|
err := tool.Pack(tool.PackOption{
|
|
|
- BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
+ BuilderPath: builderPath,
|
|
|
|
|
|
BlobPath: blobPath,
|
|
|
FsVersion: opt.FsVersion,
|
|
|
SourcePath: sourceDir,
|
|
|
ChunkDictPath: opt.ChunkDictPath,
|
|
|
PrefetchPatterns: opt.PrefetchPatterns,
|
|
|
+ AlignedChunk: opt.AlignedChunk,
|
|
|
+ ChunkSize: opt.ChunkSize,
|
|
|
Compressor: opt.Compressor,
|
|
|
Timeout: opt.Timeout,
|
|
|
+
|
|
|
+ Features: opt.features,
|
|
|
})
|
|
|
if err != nil {
|
|
|
pw.CloseWithError(errors.Wrapf(err, "convert blob for %s", sourceDir))
|
|
@@ -341,6 +405,117 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser,
|
|
|
return wc, nil
|
|
|
}
|
|
|
|
|
|
+func packFromTar(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser, error) {
|
|
|
+ workDir, err := ensureWorkDir(opt.WorkDir)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "ensure work directory")
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ os.RemoveAll(workDir)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ rafsBlobPath := filepath.Join(workDir, "blob.rafs")
|
|
|
+ rafsBlobFifo, err := fifo.OpenFifo(ctx, rafsBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0644)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "create fifo file")
|
|
|
+ }
|
|
|
+
|
|
|
+ tarBlobPath := filepath.Join(workDir, "blob.targz")
|
|
|
+ tarBlobFifo, err := fifo.OpenFifo(ctx, tarBlobPath, syscall.O_CREAT|syscall.O_WRONLY|syscall.O_NONBLOCK, 0644)
|
|
|
+ if err != nil {
|
|
|
+ defer rafsBlobFifo.Close()
|
|
|
+ return nil, errors.Wrapf(err, "create fifo file")
|
|
|
+ }
|
|
|
+
|
|
|
+ pr, pw := io.Pipe()
|
|
|
+ eg := errgroup.Group{}
|
|
|
+
|
|
|
+ wc := newWriteCloser(pw, func() error {
|
|
|
+ defer os.RemoveAll(workDir)
|
|
|
+ if err := eg.Wait(); err != nil {
|
|
|
+ return errors.Wrapf(err, "convert nydus ref")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
+ eg.Go(func() error {
|
|
|
+ defer tarBlobFifo.Close()
|
|
|
+ buffer := bufPool.Get().(*[]byte)
|
|
|
+ defer bufPool.Put(buffer)
|
|
|
+ if _, err := io.CopyBuffer(tarBlobFifo, pr, *buffer); err != nil {
|
|
|
+ return errors.Wrapf(err, "copy targz to fifo")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
+ eg.Go(func() error {
|
|
|
+ defer rafsBlobFifo.Close()
|
|
|
+ buffer := bufPool.Get().(*[]byte)
|
|
|
+ defer bufPool.Put(buffer)
|
|
|
+ if _, err := io.CopyBuffer(dest, rafsBlobFifo, *buffer); err != nil {
|
|
|
+ return errors.Wrapf(err, "copy blob meta fifo to nydus blob")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
+ eg.Go(func() error {
|
|
|
+ var err error
|
|
|
+ if opt.OCIRef {
|
|
|
+ err = tool.Pack(tool.PackOption{
|
|
|
+ BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
+
|
|
|
+ OCIRef: opt.OCIRef,
|
|
|
+ BlobPath: rafsBlobPath,
|
|
|
+ SourcePath: tarBlobPath,
|
|
|
+ Timeout: opt.Timeout,
|
|
|
+
|
|
|
+ Features: opt.features,
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ err = tool.Pack(tool.PackOption{
|
|
|
+ BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
+
|
|
|
+ BlobPath: rafsBlobPath,
|
|
|
+ FsVersion: opt.FsVersion,
|
|
|
+ SourcePath: tarBlobPath,
|
|
|
+ ChunkDictPath: opt.ChunkDictPath,
|
|
|
+ PrefetchPatterns: opt.PrefetchPatterns,
|
|
|
+ AlignedChunk: opt.AlignedChunk,
|
|
|
+ ChunkSize: opt.ChunkSize,
|
|
|
+ BatchSize: opt.BatchSize,
|
|
|
+ Compressor: opt.Compressor,
|
|
|
+ Timeout: opt.Timeout,
|
|
|
+
|
|
|
+ Features: opt.features,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ // Without handling the returned error because we just only
|
|
|
+ // focus on the command exit status in `tool.Pack`.
|
|
|
+ wc.Close()
|
|
|
+ }
|
|
|
+ return errors.Wrapf(err, "call builder")
|
|
|
+ })
|
|
|
+
|
|
|
+ return wc, nil
|
|
|
+}
|
|
|
+
|
|
|
+func calcBlobTOCDigest(ra content.ReaderAt) (*digest.Digest, error) {
|
|
|
+ digester := digest.Canonical.Digester()
|
|
|
+ if err := seekFileByTarHeader(ra, EntryTOC, func(tocData io.Reader, _ *tar.Header) error {
|
|
|
+ if _, err := io.Copy(digester.Hash(), tocData); err != nil {
|
|
|
+ return errors.Wrap(err, "calc toc data and header digest")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ tocDigest := digester.Digest()
|
|
|
+ return &tocDigest, nil
|
|
|
+}
|
|
|
+
|
|
|
// Merge multiple nydus bootstraps (from each layer of image) to a final
|
|
|
// bootstrap. And due to the possibility of enabling the `ChunkDictPath`
|
|
|
// option causes the data deduplication, it will return the actual blob
|
|
@@ -352,22 +527,40 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
|
|
|
}
|
|
|
defer os.RemoveAll(workDir)
|
|
|
|
|
|
- eg, ctx := errgroup.WithContext(ctx)
|
|
|
+ getBootstrapPath := func(layerIdx int) string {
|
|
|
+ digestHex := layers[layerIdx].Digest.Hex()
|
|
|
+ if originalDigest := layers[layerIdx].OriginalDigest; originalDigest != nil {
|
|
|
+ return filepath.Join(workDir, originalDigest.Hex())
|
|
|
+ }
|
|
|
+ return filepath.Join(workDir, digestHex)
|
|
|
+ }
|
|
|
+
|
|
|
+ eg, _ := errgroup.WithContext(ctx)
|
|
|
sourceBootstrapPaths := []string{}
|
|
|
+ rafsBlobDigests := []string{}
|
|
|
+ rafsBlobSizes := []int64{}
|
|
|
+ rafsBlobTOCDigests := []string{}
|
|
|
for idx := range layers {
|
|
|
- sourceBootstrapPaths = append(sourceBootstrapPaths, filepath.Join(workDir, layers[idx].Digest.Hex()))
|
|
|
+ sourceBootstrapPaths = append(sourceBootstrapPaths, getBootstrapPath(idx))
|
|
|
+ if layers[idx].OriginalDigest != nil {
|
|
|
+ rafsBlobDigests = append(rafsBlobDigests, layers[idx].Digest.Hex())
|
|
|
+ rafsBlobSizes = append(rafsBlobSizes, layers[idx].ReaderAt.Size())
|
|
|
+ rafsBlobTOCDigest, err := calcBlobTOCDigest(layers[idx].ReaderAt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "calc blob toc digest for layer %s", layers[idx].Digest)
|
|
|
+ }
|
|
|
+ rafsBlobTOCDigests = append(rafsBlobTOCDigests, rafsBlobTOCDigest.Hex())
|
|
|
+ }
|
|
|
eg.Go(func(idx int) func() error {
|
|
|
return func() error {
|
|
|
- layer := layers[idx]
|
|
|
-
|
|
|
// Use the hex hash string of whole tar blob as the bootstrap name.
|
|
|
- bootstrap, err := os.Create(filepath.Join(workDir, layer.Digest.Hex()))
|
|
|
+ bootstrap, err := os.Create(getBootstrapPath(idx))
|
|
|
if err != nil {
|
|
|
return errors.Wrap(err, "create source bootstrap")
|
|
|
}
|
|
|
defer bootstrap.Close()
|
|
|
|
|
|
- if err := unpackBootstrapFromNydusTar(ctx, layer.ReaderAt, bootstrap); err != nil {
|
|
|
+ if _, err := UnpackEntry(layers[idx].ReaderAt, EntryBootstrap, bootstrap); err != nil {
|
|
|
return errors.Wrap(err, "unpack nydus tar")
|
|
|
}
|
|
|
|
|
@@ -386,11 +579,16 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
|
|
|
BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
|
|
|
SourceBootstrapPaths: sourceBootstrapPaths,
|
|
|
- TargetBootstrapPath: targetBootstrapPath,
|
|
|
- ChunkDictPath: opt.ChunkDictPath,
|
|
|
- PrefetchPatterns: opt.PrefetchPatterns,
|
|
|
- OutputJSONPath: filepath.Join(workDir, "merge-output.json"),
|
|
|
- Timeout: opt.Timeout,
|
|
|
+ RafsBlobDigests: rafsBlobDigests,
|
|
|
+ RafsBlobSizes: rafsBlobSizes,
|
|
|
+ RafsBlobTOCDigests: rafsBlobTOCDigests,
|
|
|
+
|
|
|
+ TargetBootstrapPath: targetBootstrapPath,
|
|
|
+ ChunkDictPath: opt.ChunkDictPath,
|
|
|
+ ParentBootstrapPath: opt.ParentBootstrapPath,
|
|
|
+ PrefetchPatterns: opt.PrefetchPatterns,
|
|
|
+ OutputJSONPath: filepath.Join(workDir, "merge-output.json"),
|
|
|
+ Timeout: opt.Timeout,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "merge bootstrap")
|
|
@@ -399,7 +597,7 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
|
|
|
var rc io.ReadCloser
|
|
|
|
|
|
if opt.WithTar {
|
|
|
- rc, err = packToTar(targetBootstrapPath, fmt.Sprintf("image/%s", bootstrapNameInTar), false)
|
|
|
+ rc, err = packToTar(targetBootstrapPath, fmt.Sprintf("image/%s", EntryBootstrap), false)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "pack bootstrap to tar")
|
|
|
}
|
|
@@ -428,8 +626,8 @@ func Unpack(ctx context.Context, ra content.ReaderAt, dest io.Writer, opt Unpack
|
|
|
}
|
|
|
defer os.RemoveAll(workDir)
|
|
|
|
|
|
- bootPath, blobPath := filepath.Join(workDir, bootstrapNameInTar), filepath.Join(workDir, blobNameInTar)
|
|
|
- if err = unpackNydusTar(ctx, bootPath, blobPath, ra); err != nil {
|
|
|
+ bootPath, blobPath := filepath.Join(workDir, EntryBootstrap), filepath.Join(workDir, EntryBlob)
|
|
|
+ if err = unpackNydusBlob(bootPath, blobPath, ra, !opt.Stream); err != nil {
|
|
|
return errors.Wrap(err, "unpack nydus tar")
|
|
|
}
|
|
|
|
|
@@ -440,16 +638,35 @@ func Unpack(ctx context.Context, ra content.ReaderAt, dest io.Writer, opt Unpack
|
|
|
}
|
|
|
defer blobFifo.Close()
|
|
|
|
|
|
+ unpackOpt := tool.UnpackOption{
|
|
|
+ BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
+ BootstrapPath: bootPath,
|
|
|
+ BlobPath: blobPath,
|
|
|
+ TarPath: tarPath,
|
|
|
+ Timeout: opt.Timeout,
|
|
|
+ }
|
|
|
+
|
|
|
+ if opt.Stream {
|
|
|
+ proxy, err := setupContentStoreProxy(opt.WorkDir, ra)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "new content store proxy")
|
|
|
+ }
|
|
|
+ defer proxy.close()
|
|
|
+
|
|
|
+ // generate backend config file
|
|
|
+ backendConfigStr := fmt.Sprintf(`{"version":2,"backend":{"type":"http-proxy","http-proxy":{"addr":"%s"}}}`, proxy.socketPath)
|
|
|
+ backendConfigPath := filepath.Join(workDir, "backend-config.json")
|
|
|
+ if err := os.WriteFile(backendConfigPath, []byte(backendConfigStr), 0644); err != nil {
|
|
|
+ return errors.Wrap(err, "write backend config")
|
|
|
+ }
|
|
|
+ unpackOpt.BlobPath = ""
|
|
|
+ unpackOpt.BackendConfigPath = backendConfigPath
|
|
|
+ }
|
|
|
+
|
|
|
unpackErrChan := make(chan error)
|
|
|
go func() {
|
|
|
defer close(unpackErrChan)
|
|
|
- err := tool.Unpack(tool.UnpackOption{
|
|
|
- BuilderPath: getBuilder(opt.BuilderPath),
|
|
|
- BootstrapPath: bootPath,
|
|
|
- BlobPath: blobPath,
|
|
|
- TarPath: tarPath,
|
|
|
- Timeout: opt.Timeout,
|
|
|
- })
|
|
|
+ err := tool.Unpack(unpackOpt)
|
|
|
if err != nil {
|
|
|
blobFifo.Close()
|
|
|
unpackErrChan <- err
|
|
@@ -476,11 +693,11 @@ func IsNydusBlobAndExists(ctx context.Context, cs content.Store, desc ocispec.De
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
- return IsNydusBlob(ctx, desc)
|
|
|
+ return IsNydusBlob(desc)
|
|
|
}
|
|
|
|
|
|
-// IsNydusBlob returns true when the specified descriptor is nydus blob format.
|
|
|
-func IsNydusBlob(ctx context.Context, desc ocispec.Descriptor) bool {
|
|
|
+// IsNydusBlob returns true when the specified descriptor is nydus blob layer.
|
|
|
+func IsNydusBlob(desc ocispec.Descriptor) bool {
|
|
|
if desc.Annotations == nil {
|
|
|
return false
|
|
|
}
|
|
@@ -489,6 +706,16 @@ func IsNydusBlob(ctx context.Context, desc ocispec.Descriptor) bool {
|
|
|
return hasAnno
|
|
|
}
|
|
|
|
|
|
+// IsNydusBootstrap returns true when the specified descriptor is nydus bootstrap layer.
|
|
|
+func IsNydusBootstrap(desc ocispec.Descriptor) bool {
|
|
|
+ if desc.Annotations == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ _, hasAnno := desc.Annotations[LayerAnnotationNydusBootstrap]
|
|
|
+ return hasAnno
|
|
|
+}
|
|
|
+
|
|
|
// LayerConvertFunc returns a function which converts an OCI image layer to
|
|
|
// a nydus blob layer, and set the media type to "application/vnd.oci.image.layer.nydus.blob.v1".
|
|
|
func LayerConvertFunc(opt PackOption) converter.ConvertFunc {
|
|
@@ -497,6 +724,11 @@ func LayerConvertFunc(opt PackOption) converter.ConvertFunc {
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
+ // Skip the conversion of nydus layer.
|
|
|
+ if IsNydusBlob(desc) || IsNydusBootstrap(desc) {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+
|
|
|
ra, err := cs.ReaderAt(ctx, desc)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "get source blob reader")
|
|
@@ -511,9 +743,14 @@ func LayerConvertFunc(opt PackOption) converter.ConvertFunc {
|
|
|
}
|
|
|
defer dst.Close()
|
|
|
|
|
|
- tr, err := compression.DecompressStream(rdr)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrap(err, "decompress blob stream")
|
|
|
+ var tr io.ReadCloser
|
|
|
+ if opt.OCIRef {
|
|
|
+ tr = io.NopCloser(rdr)
|
|
|
+ } else {
|
|
|
+ tr, err = compression.DecompressStream(rdr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "decompress blob stream")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
digester := digest.SHA256.Digester()
|
|
@@ -574,14 +811,12 @@ func LayerConvertFunc(opt PackOption) converter.ConvertFunc {
|
|
|
},
|
|
|
}
|
|
|
|
|
|
- if opt.Backend != nil {
|
|
|
- blobRa, err := cs.ReaderAt(ctx, newDesc)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrap(err, "get nydus blob reader")
|
|
|
- }
|
|
|
- defer blobRa.Close()
|
|
|
+ if opt.OCIRef {
|
|
|
+ newDesc.Annotations[label.NydusRefLayer] = desc.Digest.String()
|
|
|
+ }
|
|
|
|
|
|
- if err := opt.Backend.Push(ctx, blobRa, blobDigest); err != nil {
|
|
|
+ if opt.Backend != nil {
|
|
|
+ if err := opt.Backend.Push(ctx, cs, newDesc); err != nil {
|
|
|
return nil, errors.Wrap(err, "push to storage backend")
|
|
|
}
|
|
|
}
|
|
@@ -595,11 +830,15 @@ func LayerConvertFunc(opt PackOption) converter.ConvertFunc {
|
|
|
// the index conversion and the manifest conversion.
|
|
|
func ConvertHookFunc(opt MergeOption) converter.ConvertHookFunc {
|
|
|
return func(ctx context.Context, cs content.Store, orgDesc ocispec.Descriptor, newDesc *ocispec.Descriptor) (*ocispec.Descriptor, error) {
|
|
|
+ // If the previous conversion did not occur, the `newDesc` may be nil.
|
|
|
+ if newDesc == nil {
|
|
|
+ return &orgDesc, nil
|
|
|
+ }
|
|
|
switch {
|
|
|
case images.IsIndexType(newDesc.MediaType):
|
|
|
return convertIndex(ctx, cs, orgDesc, newDesc)
|
|
|
case images.IsManifestType(newDesc.MediaType):
|
|
|
- return convertManifest(ctx, cs, newDesc, opt)
|
|
|
+ return convertManifest(ctx, cs, orgDesc, newDesc, opt)
|
|
|
default:
|
|
|
return newDesc, nil
|
|
|
}
|
|
@@ -636,6 +875,13 @@ func convertIndex(ctx context.Context, cs content.Store, orgDesc ocispec.Descrip
|
|
|
manifest.Platform.OSFeatures = append(manifest.Platform.OSFeatures, ManifestOSFeatureNydus)
|
|
|
index.Manifests[i] = manifest
|
|
|
}
|
|
|
+
|
|
|
+ // If the converted manifest list contains only one manifest,
|
|
|
+ // convert it directly to manifest.
|
|
|
+ if len(index.Manifests) == 1 {
|
|
|
+ return &index.Manifests[0], nil
|
|
|
+ }
|
|
|
+
|
|
|
// Update image index in content store.
|
|
|
newIndexDesc, err := writeJSON(ctx, cs, index, *newDesc, indexLabels)
|
|
|
if err != nil {
|
|
@@ -644,10 +890,23 @@ func convertIndex(ctx context.Context, cs content.Store, orgDesc ocispec.Descrip
|
|
|
return newIndexDesc, nil
|
|
|
}
|
|
|
|
|
|
+// isNydusImage checks if the last layer is nydus bootstrap,
|
|
|
+// so that we can ensure it is a nydus image.
|
|
|
+func isNydusImage(manifest *ocispec.Manifest) bool {
|
|
|
+ layers := manifest.Layers
|
|
|
+ if len(layers) != 0 {
|
|
|
+ desc := layers[len(layers)-1]
|
|
|
+ if IsNydusBootstrap(desc) {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
// convertManifest merges all the nydus blob layers into a
|
|
|
// nydus bootstrap layer, update the image config,
|
|
|
// and modify the image manifest.
|
|
|
-func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Descriptor, opt MergeOption) (*ocispec.Descriptor, error) {
|
|
|
+func convertManifest(ctx context.Context, cs content.Store, oldDesc ocispec.Descriptor, newDesc *ocispec.Descriptor, opt MergeOption) (*ocispec.Descriptor, error) {
|
|
|
var manifest ocispec.Manifest
|
|
|
manifestDesc := *newDesc
|
|
|
manifestLabels, err := readJSON(ctx, cs, &manifest, manifestDesc)
|
|
@@ -655,14 +914,21 @@ func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Des
|
|
|
return nil, errors.Wrap(err, "read manifest json")
|
|
|
}
|
|
|
|
|
|
+ if isNydusImage(&manifest) {
|
|
|
+ return &manifestDesc, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // This option needs to be enabled for image scenario.
|
|
|
+ opt.WithTar = true
|
|
|
+
|
|
|
+ // If the original image is already an OCI type, we should forcibly set the
|
|
|
+ // bootstrap layer to the OCI type.
|
|
|
+ if !opt.OCI && oldDesc.MediaType == ocispec.MediaTypeImageManifest {
|
|
|
+ opt.OCI = true
|
|
|
+ }
|
|
|
+
|
|
|
// Append bootstrap layer to manifest.
|
|
|
- bootstrapDesc, blobDescs, err := MergeLayers(ctx, cs, manifest.Layers, MergeOption{
|
|
|
- BuilderPath: opt.BuilderPath,
|
|
|
- WorkDir: opt.WorkDir,
|
|
|
- ChunkDictPath: opt.ChunkDictPath,
|
|
|
- FsVersion: opt.FsVersion,
|
|
|
- WithTar: true,
|
|
|
- })
|
|
|
+ bootstrapDesc, blobDescs, err := MergeLayers(ctx, cs, manifest.Layers, opt)
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "merge nydus layers")
|
|
|
}
|
|
@@ -678,7 +944,8 @@ func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Des
|
|
|
// Affected by chunk dict, the blob list referenced by final bootstrap
|
|
|
// are from different layers, part of them are from original layers, part
|
|
|
// from chunk dict bootstrap, so we need to rewrite manifest's layers here.
|
|
|
- manifest.Layers = append(blobDescs, *bootstrapDesc)
|
|
|
+ blobDescs := append(blobDescs, *bootstrapDesc)
|
|
|
+ manifest.Layers = blobDescs
|
|
|
}
|
|
|
|
|
|
// Update the gc label of bootstrap layer
|
|
@@ -691,8 +958,13 @@ func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Des
|
|
|
if err != nil {
|
|
|
return nil, errors.Wrap(err, "read image config")
|
|
|
}
|
|
|
+ bootstrapHistory := ocispec.History{
|
|
|
+ CreatedBy: "Nydus Converter",
|
|
|
+ Comment: "Nydus Bootstrap Layer",
|
|
|
+ }
|
|
|
if opt.Backend != nil {
|
|
|
config.RootFS.DiffIDs = []digest.Digest{digest.Digest(bootstrapDesc.Annotations[LayerAnnotationUncompressed])}
|
|
|
+ config.History = []ocispec.History{bootstrapHistory}
|
|
|
} else {
|
|
|
config.RootFS.DiffIDs = make([]digest.Digest, 0, len(manifest.Layers))
|
|
|
for i, layer := range manifest.Layers {
|
|
@@ -700,6 +972,9 @@ func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Des
|
|
|
// Remove useless annotation.
|
|
|
delete(manifest.Layers[i].Annotations, LayerAnnotationUncompressed)
|
|
|
}
|
|
|
+ // Append history item for bootstrap layer, to ensure the history consistency.
|
|
|
+ // See https://github.com/distribution/distribution/blob/e5d5810851d1f17a5070e9b6f940d8af98ea3c29/manifest/schema1/config_builder.go#L136
|
|
|
+ config.History = append(config.History, bootstrapHistory)
|
|
|
}
|
|
|
// Update image config in content store.
|
|
|
newConfigDesc, err := writeJSON(ctx, cs, config, manifest.Config, configLabels)
|
|
@@ -710,6 +985,11 @@ func convertManifest(ctx context.Context, cs content.Store, newDesc *ocispec.Des
|
|
|
// Update the config gc label
|
|
|
manifestLabels[configGCLabelKey] = newConfigDesc.Digest.String()
|
|
|
|
|
|
+ // Associate a reference to the original OCI manifest.
|
|
|
+ // See the `subject` field description in
|
|
|
+ // https://github.com/opencontainers/image-spec/blob/main/manifest.md#image-manifest-property-descriptions
|
|
|
+ manifest.Subject = &oldDesc
|
|
|
+
|
|
|
// Update image manifest in content store.
|
|
|
newManifestDesc, err := writeJSON(ctx, cs, manifest, manifestDesc, manifestLabels)
|
|
|
if err != nil {
|
|
@@ -726,33 +1006,45 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
|
|
|
layers := []Layer{}
|
|
|
|
|
|
var chainID digest.Digest
|
|
|
- for _, blobDesc := range descs {
|
|
|
- ra, err := cs.ReaderAt(ctx, blobDesc)
|
|
|
+ nydusBlobDigests := []digest.Digest{}
|
|
|
+ for _, nydusBlobDesc := range descs {
|
|
|
+ ra, err := cs.ReaderAt(ctx, nydusBlobDesc)
|
|
|
if err != nil {
|
|
|
- return nil, nil, errors.Wrapf(err, "get reader for blob %q", blobDesc.Digest)
|
|
|
+ return nil, nil, errors.Wrapf(err, "get reader for blob %q", nydusBlobDesc.Digest)
|
|
|
}
|
|
|
defer ra.Close()
|
|
|
+ var originalDigest *digest.Digest
|
|
|
+ if opt.OCIRef {
|
|
|
+ digestStr := nydusBlobDesc.Annotations[label.NydusRefLayer]
|
|
|
+ _originalDigest, err := digest.Parse(digestStr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, errors.Wrapf(err, "invalid label %s=%s", label.NydusRefLayer, digestStr)
|
|
|
+ }
|
|
|
+ originalDigest = &_originalDigest
|
|
|
+ }
|
|
|
layers = append(layers, Layer{
|
|
|
- Digest: blobDesc.Digest,
|
|
|
- ReaderAt: ra,
|
|
|
+ Digest: nydusBlobDesc.Digest,
|
|
|
+ OriginalDigest: originalDigest,
|
|
|
+ ReaderAt: ra,
|
|
|
})
|
|
|
if chainID == "" {
|
|
|
- chainID = identity.ChainID([]digest.Digest{blobDesc.Digest})
|
|
|
+ chainID = identity.ChainID([]digest.Digest{nydusBlobDesc.Digest})
|
|
|
} else {
|
|
|
- chainID = identity.ChainID([]digest.Digest{chainID, blobDesc.Digest})
|
|
|
+ chainID = identity.ChainID([]digest.Digest{chainID, nydusBlobDesc.Digest})
|
|
|
}
|
|
|
+ nydusBlobDigests = append(nydusBlobDigests, nydusBlobDesc.Digest)
|
|
|
}
|
|
|
|
|
|
// Merge all nydus bootstraps into a final nydus bootstrap.
|
|
|
pr, pw := io.Pipe()
|
|
|
- blobDigestChan := make(chan []digest.Digest, 1)
|
|
|
+ originalBlobDigestChan := make(chan []digest.Digest, 1)
|
|
|
go func() {
|
|
|
defer pw.Close()
|
|
|
- blobDigests, err := Merge(ctx, layers, pw, opt)
|
|
|
+ originalBlobDigests, err := Merge(ctx, layers, pw, opt)
|
|
|
if err != nil {
|
|
|
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
|
|
|
}
|
|
|
- blobDigestChan <- blobDigests
|
|
|
+ originalBlobDigestChan <- originalBlobDigests
|
|
|
}()
|
|
|
|
|
|
// Compress final nydus bootstrap to tar.gz and write into content store.
|
|
@@ -791,10 +1083,17 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
|
|
|
return nil, nil, errors.Wrap(err, "get info from content store")
|
|
|
}
|
|
|
|
|
|
- blobDigests := <-blobDigestChan
|
|
|
+ originalBlobDigests := <-originalBlobDigestChan
|
|
|
blobDescs := []ocispec.Descriptor{}
|
|
|
- blobIDs := []string{}
|
|
|
- for _, blobDigest := range blobDigests {
|
|
|
+
|
|
|
+ var blobDigests []digest.Digest
|
|
|
+ if opt.OCIRef {
|
|
|
+ blobDigests = nydusBlobDigests
|
|
|
+ } else {
|
|
|
+ blobDigests = originalBlobDigests
|
|
|
+ }
|
|
|
+
|
|
|
+ for idx, blobDigest := range blobDigests {
|
|
|
blobInfo, err := cs.Info(ctx, blobDigest)
|
|
|
if err != nil {
|
|
|
return nil, nil, errors.Wrap(err, "get info from content store")
|
|
@@ -808,30 +1107,29 @@ func MergeLayers(ctx context.Context, cs content.Store, descs []ocispec.Descript
|
|
|
LayerAnnotationNydusBlob: "true",
|
|
|
},
|
|
|
}
|
|
|
+ if opt.OCIRef {
|
|
|
+ blobDesc.Annotations[label.NydusRefLayer] = layers[idx].OriginalDigest.String()
|
|
|
+ }
|
|
|
blobDescs = append(blobDescs, blobDesc)
|
|
|
- blobIDs = append(blobIDs, blobDigest.Hex())
|
|
|
- }
|
|
|
-
|
|
|
- blobIDsBytes, err := json.Marshal(blobIDs)
|
|
|
- if err != nil {
|
|
|
- return nil, nil, errors.Wrap(err, "marshal blob ids")
|
|
|
}
|
|
|
|
|
|
if opt.FsVersion == "" {
|
|
|
- opt.FsVersion = "5"
|
|
|
+ opt.FsVersion = "6"
|
|
|
+ }
|
|
|
+ mediaType := images.MediaTypeDockerSchema2LayerGzip
|
|
|
+ if opt.OCI {
|
|
|
+ mediaType = ocispec.MediaTypeImageLayerGzip
|
|
|
}
|
|
|
|
|
|
bootstrapDesc := ocispec.Descriptor{
|
|
|
Digest: compressedDgst,
|
|
|
Size: bootstrapInfo.Size,
|
|
|
- MediaType: ocispec.MediaTypeImageLayerGzip,
|
|
|
+ MediaType: mediaType,
|
|
|
Annotations: map[string]string{
|
|
|
LayerAnnotationUncompressed: uncompressedDgst.Digest().String(),
|
|
|
LayerAnnotationFSVersion: opt.FsVersion,
|
|
|
// Use this annotation to identify nydus bootstrap layer.
|
|
|
LayerAnnotationNydusBootstrap: "true",
|
|
|
- // Track all blob digests for nydus snapshotter.
|
|
|
- LayerAnnotationNydusBlobIDs: string(blobIDsBytes),
|
|
|
},
|
|
|
}
|
|
|
|