Przeglądaj źródła

produce progress events polling ctrd's content.Store

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

containerd: Push progress

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
Nicolas De Loof 3 lat temu
rodzic
commit
4a8c4110e3

+ 31 - 2
daemon/containerd/image_pull.go

@@ -6,11 +6,13 @@ import (
 	"io"
 
 	"github.com/containerd/containerd"
+	"github.com/containerd/containerd/images"
 	"github.com/containerd/containerd/platforms"
 	"github.com/docker/distribution"
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types/registry"
 	"github.com/docker/docker/errdefs"
+	"github.com/docker/docker/pkg/streamformatter"
 	"github.com/opencontainers/go-digest"
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
 )
@@ -42,10 +44,37 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
 		}
 	}
 
-	resolver := newResolverFromAuthConfig(authConfig)
+	resolver, _ := newResolverFromAuthConfig(authConfig)
 	opts = append(opts, containerd.WithResolver(resolver))
 
-	_, err = i.client.Pull(ctx, ref.String(), opts...)
+	jobs := newJobs()
+	h := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
+			jobs.Add(desc)
+		}
+		return nil, nil
+	})
+	opts = append(opts, containerd.WithImageHandler(h))
+
+	out := streamformatter.NewJSONProgressOutput(outStream, false)
+	finishProgress := jobs.showProgress(ctx, out, pullProgress{Store: i.client.ContentStore(), ShowExists: true})
+	defer finishProgress()
+
+	img, err := i.client.Pull(ctx, ref.String(), opts...)
+	if err != nil {
+		return err
+	}
+
+	unpacked, err := img.IsUnpacked(ctx, i.snapshotter)
+	if err != nil {
+		return err
+	}
+
+	if !unpacked {
+		if err := img.Unpack(ctx, i.snapshotter); err != nil {
+			return err
+		}
+	}
 	return err
 }
 

+ 150 - 0
daemon/containerd/progress.go

@@ -0,0 +1,150 @@
+package containerd
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"time"
+
+	"github.com/containerd/containerd/content"
+	cerrdefs "github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/remotes"
+	"github.com/docker/docker/pkg/progress"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/opencontainers/go-digest"
+	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/sirupsen/logrus"
+)
+
+type progressUpdater interface {
+	UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
+}
+
+type jobs struct {
+	descs map[digest.Digest]ocispec.Descriptor
+	mu    sync.Mutex
+}
+
+// newJobs creates a new instance of the job status tracker
+func newJobs() *jobs {
+	return &jobs{
+		descs: map[digest.Digest]ocispec.Descriptor{},
+	}
+}
+
+func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
+	ctx, cancelProgress := context.WithCancel(ctx)
+
+	start := time.Now()
+
+	go func() {
+		ticker := time.NewTicker(100 * time.Millisecond)
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-ticker.C:
+				if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
+					if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
+						logrus.WithError(err).Error("Updating progress failed")
+					}
+					return
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	return cancelProgress
+}
+
+// Add adds a descriptor to be tracked
+func (j *jobs) Add(desc ocispec.Descriptor) {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	if _, ok := j.descs[desc.Digest]; ok {
+		return
+	}
+	j.descs[desc.Digest] = desc
+}
+
+// Remove removes a descriptor
+func (j *jobs) Remove(desc ocispec.Descriptor) {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	delete(j.descs, desc.Digest)
+}
+
+// Jobs returns a list of all tracked descriptors
+func (j *jobs) Jobs() []ocispec.Descriptor {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	descs := make([]ocispec.Descriptor, 0, len(j.descs))
+	for _, d := range j.descs {
+		descs = append(descs, d)
+	}
+	return descs
+}
+
+type pullProgress struct {
+	Store      content.Store
+	ShowExists bool
+}
+
+func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
+	actives, err := p.Store.ListStatuses(ctx, "")
+	if err != nil {
+		if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+			return err
+		}
+		logrus.WithError(err).Error("status check failed")
+		return nil
+	}
+	pulling := make(map[string]content.Status, len(actives))
+
+	// update status of status entries!
+	for _, status := range actives {
+		pulling[status.Ref] = status
+	}
+
+	for _, j := range ongoing.Jobs() {
+		key := remotes.MakeRefKey(ctx, j)
+		if info, ok := pulling[key]; ok {
+			out.WriteProgress(progress.Progress{
+				ID:      stringid.TruncateID(j.Digest.Encoded()),
+				Action:  "Downloading",
+				Current: info.Offset,
+				Total:   info.Total,
+			})
+			continue
+		}
+
+		info, err := p.Store.Info(ctx, j.Digest)
+		if err != nil {
+			if !cerrdefs.IsNotFound(err) {
+				return err
+			}
+		} else if info.CreatedAt.After(start) {
+			out.WriteProgress(progress.Progress{
+				ID:         stringid.TruncateID(j.Digest.Encoded()),
+				Action:     "Download complete",
+				HideCounts: true,
+				LastUpdate: true,
+			})
+			ongoing.Remove(j)
+		} else if p.ShowExists {
+			out.WriteProgress(progress.Progress{
+				ID:         stringid.TruncateID(j.Digest.Encoded()),
+				Action:     "Exists",
+				HideCounts: true,
+				LastUpdate: true,
+			})
+			ongoing.Remove(j)
+		}
+	}
+	return nil
+}

+ 7 - 3
daemon/containerd/resolver.go

@@ -8,8 +8,9 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Resolver {
+func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) {
 	opts := []docker.RegistryOpt{}
+
 	if authConfig != nil {
 		cfgHost := registry.ConvertToHostname(authConfig.ServerAddress)
 		if cfgHost == registry.IndexHostname {
@@ -29,7 +30,10 @@ func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Res
 		opts = append(opts, docker.WithAuthorizer(authorizer))
 	}
 
+	tracker := docker.NewInMemoryTracker()
+
 	return docker.NewResolver(docker.ResolverOptions{
-		Hosts: docker.ConfigureDefaultRegistries(opts...),
-	})
+		Hosts:   docker.ConfigureDefaultRegistries(opts...),
+		Tracker: tracker,
+	}), tracker
 }