浏览代码

Merge pull request #44756 from rumpl/containerd-image-pull

containerd integration: image pull
Bjorn Neergaard 2 年之前
父节点
当前提交
6d212fa045
共有 6 个文件被更改,包括 267 次插入25 次删除
  1. 19 1
      daemon/containerd/image_pull.go
  2. 150 0
      daemon/containerd/progress.go
  3. 67 18
      daemon/containerd/resolver.go
  4. 15 5
      daemon/containerd/service.go
  5. 9 1
      daemon/daemon.go
  6. 7 0
      registry/service.go

+ 19 - 1
daemon/containerd/image_pull.go

@@ -6,11 +6,13 @@ import (
 	"io"
 
 	"github.com/containerd/containerd"
+	"github.com/containerd/containerd/images"
 	"github.com/containerd/containerd/platforms"
 	"github.com/docker/distribution"
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types/registry"
 	"github.com/docker/docker/errdefs"
+	"github.com/docker/docker/pkg/streamformatter"
 	"github.com/opencontainers/go-digest"
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
 )
@@ -42,9 +44,25 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
 		}
 	}
 
-	resolver := newResolverFromAuthConfig(authConfig)
+	resolver, _ := i.newResolverFromAuthConfig(authConfig)
 	opts = append(opts, containerd.WithResolver(resolver))
 
+	jobs := newJobs()
+	h := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
+		if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
+			jobs.Add(desc)
+		}
+		return nil, nil
+	})
+	opts = append(opts, containerd.WithImageHandler(h))
+
+	out := streamformatter.NewJSONProgressOutput(outStream, false)
+	finishProgress := jobs.showProgress(ctx, out, pullProgress{Store: i.client.ContentStore(), ShowExists: true})
+	defer finishProgress()
+
+	opts = append(opts, containerd.WithPullUnpack)
+	opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))
+
 	_, err = i.client.Pull(ctx, ref.String(), opts...)
 	return err
 }

+ 150 - 0
daemon/containerd/progress.go

@@ -0,0 +1,150 @@
+package containerd
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"time"
+
+	"github.com/containerd/containerd/content"
+	cerrdefs "github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/remotes"
+	"github.com/docker/docker/pkg/progress"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/opencontainers/go-digest"
+	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/sirupsen/logrus"
+)
+
+type progressUpdater interface {
+	UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
+}
+
+type jobs struct {
+	descs map[digest.Digest]ocispec.Descriptor
+	mu    sync.Mutex
+}
+
+// newJobs creates a new instance of the job status tracker
+func newJobs() *jobs {
+	return &jobs{
+		descs: map[digest.Digest]ocispec.Descriptor{},
+	}
+}
+
+func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
+	ctx, cancelProgress := context.WithCancel(ctx)
+
+	start := time.Now()
+
+	go func() {
+		ticker := time.NewTicker(100 * time.Millisecond)
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-ticker.C:
+				if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
+					if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
+						logrus.WithError(err).Error("Updating progress failed")
+					}
+					return
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	return cancelProgress
+}
+
+// Add adds a descriptor to be tracked
+func (j *jobs) Add(desc ocispec.Descriptor) {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	if _, ok := j.descs[desc.Digest]; ok {
+		return
+	}
+	j.descs[desc.Digest] = desc
+}
+
+// Remove removes a descriptor
+func (j *jobs) Remove(desc ocispec.Descriptor) {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	delete(j.descs, desc.Digest)
+}
+
+// Jobs returns a list of all tracked descriptors
+func (j *jobs) Jobs() []ocispec.Descriptor {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+
+	descs := make([]ocispec.Descriptor, 0, len(j.descs))
+	for _, d := range j.descs {
+		descs = append(descs, d)
+	}
+	return descs
+}
+
+type pullProgress struct {
+	Store      content.Store
+	ShowExists bool
+}
+
+func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
+	actives, err := p.Store.ListStatuses(ctx, "")
+	if err != nil {
+		if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+			return err
+		}
+		logrus.WithError(err).Error("status check failed")
+		return nil
+	}
+	pulling := make(map[string]content.Status, len(actives))
+
+	// update status of status entries!
+	for _, status := range actives {
+		pulling[status.Ref] = status
+	}
+
+	for _, j := range ongoing.Jobs() {
+		key := remotes.MakeRefKey(ctx, j)
+		if info, ok := pulling[key]; ok {
+			out.WriteProgress(progress.Progress{
+				ID:      stringid.TruncateID(j.Digest.Encoded()),
+				Action:  "Downloading",
+				Current: info.Offset,
+				Total:   info.Total,
+			})
+			continue
+		}
+
+		info, err := p.Store.Info(ctx, j.Digest)
+		if err != nil {
+			if !cerrdefs.IsNotFound(err) {
+				return err
+			}
+		} else if info.CreatedAt.After(start) {
+			out.WriteProgress(progress.Progress{
+				ID:         stringid.TruncateID(j.Digest.Encoded()),
+				Action:     "Download complete",
+				HideCounts: true,
+				LastUpdate: true,
+			})
+			ongoing.Remove(j)
+		} else if p.ShowExists {
+			out.WriteProgress(progress.Progress{
+				ID:         stringid.TruncateID(j.Digest.Encoded()),
+				Action:     "Exists",
+				HideCounts: true,
+				LastUpdate: true,
+			})
+			ongoing.Remove(j)
+		}
+	}
+	return nil
+}

+ 67 - 18
daemon/containerd/resolver.go

@@ -1,6 +1,9 @@
 package containerd
 
 import (
+	"net/http"
+	"strings"
+
 	"github.com/containerd/containerd/remotes"
 	"github.com/containerd/containerd/remotes/docker"
 	registrytypes "github.com/docker/docker/api/types/registry"
@@ -8,28 +11,74 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Resolver {
-	opts := []docker.RegistryOpt{}
-	if authConfig != nil {
-		cfgHost := registry.ConvertToHostname(authConfig.ServerAddress)
-		if cfgHost == registry.IndexHostname {
-			cfgHost = registry.DefaultRegistryHost
+func (i *ImageService) newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) {
+	tracker := docker.NewInMemoryTracker()
+	hostsFn := i.registryHosts.RegistryHosts()
+
+	hosts := hostsWrapper(hostsFn, authConfig, i.registryService)
+
+	return docker.NewResolver(docker.ResolverOptions{
+		Hosts:   hosts,
+		Tracker: tracker,
+	}), tracker
+}
+
+func hostsWrapper(hostsFn docker.RegistryHosts, authConfig *registrytypes.AuthConfig, regService registry.Service) docker.RegistryHosts {
+	return func(n string) ([]docker.RegistryHost, error) {
+		hosts, err := hostsFn(n)
+		if err != nil {
+			return nil, err
 		}
-		authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) {
-			if cfgHost != host {
-				logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match")
-				return "", "", nil
-			}
-			if authConfig.IdentityToken != "" {
-				return "", authConfig.IdentityToken, nil
+
+		for i := range hosts {
+			if hosts[i].Authorizer == nil {
+				var opts []docker.AuthorizerOpt
+				if authConfig != nil {
+					opts = append(opts, authorizationCredsFromAuthConfig(*authConfig))
+				}
+				hosts[i].Authorizer = docker.NewDockerAuthorizer(opts...)
+
+				isInsecure := regService.IsInsecureRegistry(hosts[i].Host)
+				if hosts[i].Client.Transport != nil && isInsecure {
+					hosts[i].Client.Transport = httpFallback{super: hosts[i].Client.Transport}
+				}
 			}
-			return authConfig.Username, authConfig.Password, nil
-		}))
+		}
+		return hosts, nil
+	}
+}
 
-		opts = append(opts, docker.WithAuthorizer(authorizer))
+func authorizationCredsFromAuthConfig(authConfig registrytypes.AuthConfig) docker.AuthorizerOpt {
+	cfgHost := registry.ConvertToHostname(authConfig.ServerAddress)
+	if cfgHost == registry.IndexHostname {
+		cfgHost = registry.DefaultRegistryHost
 	}
 
-	return docker.NewResolver(docker.ResolverOptions{
-		Hosts: docker.ConfigureDefaultRegistries(opts...),
+	return docker.WithAuthCreds(func(host string) (string, string, error) {
+		if cfgHost != host {
+			logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match")
+			return "", "", nil
+		}
+		if authConfig.IdentityToken != "" {
+			return "", authConfig.IdentityToken, nil
+		}
+		return authConfig.Username, authConfig.Password, nil
 	})
 }
+
+type httpFallback struct {
+	super http.RoundTripper
+}
+
+func (f httpFallback) RoundTrip(r *http.Request) (*http.Response, error) {
+	resp, err := f.super.RoundTrip(r)
+	if err != nil {
+		if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") {
+			plain := r.Clone(r.Context())
+			plain.URL.Scheme = "http"
+			return http.DefaultTransport.RoundTrip(plain)
+		}
+	}
+
+	return resp, err
+}

+ 15 - 5
daemon/containerd/service.go

@@ -5,26 +5,36 @@ import (
 
 	"github.com/containerd/containerd"
 	"github.com/containerd/containerd/plugin"
+	"github.com/containerd/containerd/remotes/docker"
 	"github.com/containerd/containerd/snapshots"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/images"
 	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
+	"github.com/docker/docker/registry"
 	"github.com/pkg/errors"
 )
 
 // ImageService implements daemon.ImageService
 type ImageService struct {
-	client      *containerd.Client
-	snapshotter string
+	client          *containerd.Client
+	snapshotter     string
+	registryHosts   RegistryHostsProvider
+	registryService registry.Service
+}
+
+type RegistryHostsProvider interface {
+	RegistryHosts() docker.RegistryHosts
 }
 
 // NewService creates a new ImageService.
-func NewService(c *containerd.Client, snapshotter string) *ImageService {
+func NewService(c *containerd.Client, snapshotter string, hostsProvider RegistryHostsProvider, registry registry.Service) *ImageService {
 	return &ImageService{
-		client:      c,
-		snapshotter: snapshotter,
+		client:          c,
+		snapshotter:     snapshotter,
+		registryHosts:   hostsProvider,
+		registryService: registry,
 	}
 }
 

+ 9 - 1
daemon/daemon.go

@@ -14,6 +14,7 @@ import (
 	"path"
 	"path/filepath"
 	"runtime"
+	"strings"
 	"sync"
 	"time"
 
@@ -177,6 +178,13 @@ func (daemon *Daemon) RegistryHosts() docker.RegistryHosts {
 
 	for _, v := range daemon.configStore.InsecureRegistries {
 		u, err := url.Parse(v)
+		if err != nil && !strings.HasPrefix(v, "http://") && !strings.HasPrefix(v, "https://") {
+			originalErr := err
+			u, err = url.Parse("http://" + v)
+			if err != nil {
+				err = originalErr
+			}
+		}
 		c := resolverconfig.RegistryConfig{}
 		if err == nil {
 			v = u.Host
@@ -994,7 +1002,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
 		if err := configureKernelSecuritySupport(config, driverName); err != nil {
 			return nil, err
 		}
-		d.imageService = ctrd.NewService(d.containerdCli, driverName)
+		d.imageService = ctrd.NewService(d.containerdCli, driverName, d, d.registryService)
 	} else {
 		layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{
 			Root:                      config.Root,

+ 7 - 0
registry/service.go

@@ -26,6 +26,7 @@ type Service interface {
 	LoadAllowNondistributableArtifacts([]string) error
 	LoadMirrors([]string) error
 	LoadInsecureRegistries([]string) error
+	IsInsecureRegistry(string) bool
 }
 
 // defaultService is a registry service. It tracks configuration data such as a list
@@ -232,3 +233,9 @@ func (s *defaultService) LookupPushEndpoints(hostname string) (endpoints []APIEn
 	}
 	return endpoints, err
 }
+
+// IsInsecureRegistry returns true if the registry at given host is configured as
+// insecure registry.
+func (s *defaultService) IsInsecureRegistry(host string) bool {
+	return !s.config.isSecureIndex(host)
+}