Merge pull request #47580 from vvoland/c8d-list-slow

c8d/list: Generate image summary concurrently
This commit is contained in:
Paweł Gronowski 2024-03-19 13:32:52 +01:00 committed by GitHub
commit 4531a371f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 267 additions and 58 deletions

View file

@ -3,8 +3,10 @@ package containerd
import (
"context"
"encoding/json"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
@ -27,6 +29,7 @@ import (
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
// Subset of ocispec.Image that only contains Labels
@ -91,16 +94,6 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
return usage.Size, nil
}
var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}
uniqueImages := map[digest.Digest]images.Image{}
tagsByDigest := map[digest.Digest][]string{}
intermediateImages := map[digest.Digest]struct{}{}
@ -152,24 +145,48 @@ func (i *ImageService) Images(ctx context.Context, opts imagetypes.ListOptions)
tagsByDigest[dgst] = append(tagsByDigest[dgst], reference.FamiliarString(ref))
}
resultsMut := sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(runtime.NumCPU() * 2)
var (
summaries = make([]*imagetypes.Summary, 0, len(imgs))
root []*[]digest.Digest
layers map[digest.Digest]int
)
if opts.SharedSize {
root = make([]*[]digest.Digest, 0, len(imgs))
layers = make(map[digest.Digest]int)
}
for _, img := range uniqueImages {
image, allChainsIDs, err := i.imageSummary(ctx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return nil, err
}
// No error, but image should be skipped.
if image == nil {
continue
}
summaries = append(summaries, image)
if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
img := img
eg.Go(func() error {
image, allChainsIDs, err := i.imageSummary(egCtx, img, platformMatcher, opts, tagsByDigest)
if err != nil {
return err
}
}
// No error, but image should be skipped.
if image == nil {
return nil
}
resultsMut.Lock()
summaries = append(summaries, image)
if opts.SharedSize {
root = append(root, &allChainsIDs)
for _, id := range allChainsIDs {
layers[id] = layers[id] + 1
}
}
resultsMut.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
if opts.SharedSize {

View file

@ -3,16 +3,20 @@ package containerd
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/log/logtest"
imagetypes "github.com/docker/docker/api/types/image"
@ -37,6 +41,52 @@ func imagesFromIndex(index ...*ocispec.Index) []images.Image {
return imgs
}
func BenchmarkImageList(b *testing.B) {
populateStore := func(ctx context.Context, is *ImageService, dir string, count int) {
// Use constant seed for reproducibility
src := rand.NewSource(1982731263716)
for i := 0; i < count; i++ {
platform := platforms.DefaultSpec()
// 20% is other architecture than the host
if i%5 == 0 {
platform.Architecture = "other"
}
idx, err := specialimage.RandomSinglePlatform(dir, platform, src)
assert.NilError(b, err)
imgs := imagesFromIndex(idx)
for _, desc := range imgs {
_, err := is.images.Create(ctx, desc)
assert.NilError(b, err)
}
}
}
for _, count := range []int{10, 100, 1000} {
csDir := b.TempDir()
ctx := namespaces.WithNamespace(context.TODO(), "testing-"+strconv.Itoa(count))
cs := &delayedStore{
store: &blobsDirContentStore{blobs: filepath.Join(csDir, "blobs/sha256")},
overhead: 500 * time.Microsecond,
}
is := fakeImageService(b, ctx, cs)
populateStore(ctx, is, csDir, count)
b.Run(strconv.Itoa(count)+"-images", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := is.Images(ctx, imagetypes.ListOptions{All: true})
assert.NilError(b, err)
}
})
}
}
func TestImageList(t *testing.T) {
ctx := namespaces.WithNamespace(context.TODO(), "testing")
@ -53,19 +103,17 @@ func TestImageList(t *testing.T) {
cs := &blobsDirContentStore{blobs: filepath.Join(blobsDir, "blobs/sha256")}
snapshotter := &testSnapshotterService{}
for _, tc := range []struct {
name string
images []images.Image
opts imagetypes.ListOptions
check func(*testing.T, []*imagetypes.Summary) // Change the type of the check function
check func(*testing.T, []*imagetypes.Summary)
}{
{
name: "one multi-layer image",
images: imagesFromIndex(multilayer),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))
assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
@ -75,7 +123,7 @@ func TestImageList(t *testing.T) {
{
name: "one image with two platforms is still one entry",
images: imagesFromIndex(twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 1))
assert.Check(t, is.Equal(all[0].ID, twoplatform.Manifests[0].Digest.String()))
@ -85,7 +133,7 @@ func TestImageList(t *testing.T) {
{
name: "two images are two entries",
images: imagesFromIndex(multilayer, twoplatform),
check: func(t *testing.T, all []*imagetypes.Summary) { // Change the type of the check function
check: func(t *testing.T, all []*imagetypes.Summary) {
assert.Check(t, is.Len(all, 2))
assert.Check(t, is.Equal(all[0].ID, multilayer.Manifests[0].Digest.String()))
@ -106,31 +154,7 @@ func TestImageList(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := logtest.WithT(ctx, t)
mdb := newTestDB(ctx, t)
snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}
service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}
// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)
service.client = c8dCli
service := fakeImageService(t, ctx, cs)
for _, img := range tc.images {
_, err := service.images.Create(ctx, img)
@ -156,6 +180,37 @@ func TestImageList(t *testing.T) {
}
func fakeImageService(t testing.TB, ctx context.Context, cs content.Store) *ImageService {
snapshotter := &testSnapshotterService{}
mdb := newTestDB(ctx, t)
snapshotters := map[string]snapshots.Snapshotter{
containerd.DefaultSnapshotter: snapshotter,
}
service := &ImageService{
images: metadata.NewImageStore(mdb),
containers: emptyTestContainerStore(),
content: cs,
eventsService: daemonevents.New(),
snapshotterServices: snapshotters,
snapshotter: containerd.DefaultSnapshotter,
}
// containerd.Image gets the services directly from containerd.Client
// so we need to create a "fake" containerd.Client with the test services.
c8dCli, err := containerd.New("", containerd.WithServices(
containerd.WithImageStore(service.images),
containerd.WithContentStore(cs),
containerd.WithSnapshotters(snapshotters),
))
assert.NilError(t, err)
service.client = c8dCli
return service
}
type blobsDirContentStore struct {
blobs string
}
@ -251,3 +306,63 @@ func (s *blobsDirContentStore) Info(ctx context.Context, dgst digest.Digest) (co
func (s *blobsDirContentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, fmt.Errorf("read-only")
}
// delayedStore is a content store wrapper that adds a constant delay to all
// operations in order to imitate gRPC overhead.
//
// The delay is constant to make the benchmark results more reproducible
// Since content store may be accessed concurrently random delay would be
// order-dependent.
type delayedStore struct {
store content.Store
overhead time.Duration
}
func (s *delayedStore) delay() {
time.Sleep(s.overhead)
}
func (s *delayedStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
s.delay()
return s.store.ReaderAt(ctx, desc)
}
func (s *delayedStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
s.delay()
return s.store.Writer(ctx, opts...)
}
func (s *delayedStore) Status(ctx context.Context, st string) (content.Status, error) {
s.delay()
return s.store.Status(ctx, st)
}
func (s *delayedStore) Delete(ctx context.Context, dgst digest.Digest) error {
s.delay()
return s.store.Delete(ctx, dgst)
}
func (s *delayedStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
s.delay()
return s.store.ListStatuses(ctx, filters...)
}
func (s *delayedStore) Abort(ctx context.Context, ref string) error {
s.delay()
return s.store.Abort(ctx, ref)
}
func (s *delayedStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
s.delay()
return s.store.Walk(ctx, fn, filters...)
}
func (s *delayedStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
s.delay()
return s.store.Info(ctx, dgst)
}
func (s *delayedStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
s.delay()
return s.store.Update(ctx, info, fieldpaths...)
}

View file

@ -280,7 +280,7 @@ func digestFor(i int64) digest.Digest {
return dgstr.Digest()
}
func newTestDB(ctx context.Context, t *testing.T) *metadata.DB {
func newTestDB(ctx context.Context, t testing.TB) *metadata.DB {
t.Helper()
p := filepath.Join(t.TempDir(), "metadata")

View file

@ -0,0 +1,77 @@
package specialimage
import (
"math/rand"
"strconv"
"github.com/distribution/reference"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func RandomSinglePlatform(dir string, platform ocispec.Platform, source rand.Source) (*ocispec.Index, error) {
r := rand.New(source) //nolint:gosec // Ignore G404: Use of weak random number generator (math/rand instead of crypto/rand)
imageRef := "random-" + strconv.FormatInt(r.Int63(), 10) + ":latest"
layerCount := r.Intn(8)
var layers []ocispec.Descriptor
for i := 0; i < layerCount; i++ {
layerDesc, err := writeLayerWithOneFile(dir, "layer-"+strconv.Itoa(i), []byte(strconv.Itoa(i)))
if err != nil {
return nil, err
}
layers = append(layers, layerDesc)
}
configDesc, err := writeJsonBlob(dir, ocispec.MediaTypeImageConfig, ocispec.Image{
Platform: platform,
Config: ocispec.ImageConfig{
Env: []string{"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"},
},
RootFS: ocispec.RootFS{
Type: "layers",
DiffIDs: layersToDigests(layers),
},
})
if err != nil {
return nil, err
}
manifest := ocispec.Manifest{
MediaType: ocispec.MediaTypeImageManifest,
Config: configDesc,
Layers: layers,
}
legacyManifests := []manifestItem{
{
Config: blobPath(configDesc),
RepoTags: []string{imageRef},
Layers: blobPaths(layers),
},
}
ref, err := reference.ParseNormalizedNamed(imageRef)
if err != nil {
return nil, err
}
return singlePlatformImage(dir, ref, manifest, legacyManifests)
}
func layersToDigests(layers []ocispec.Descriptor) []digest.Digest {
var digests []digest.Digest
for _, l := range layers {
digests = append(digests, l.Digest)
}
return digests
}
func blobPaths(descriptors []ocispec.Descriptor) []string {
var paths []string
for _, d := range descriptors {
paths = append(paths, blobPath(d))
}
return paths
}