瀏覽代碼

Merge pull request #46618 from vvoland/c8d-pull-all-tags-2

c8d/pull: Handle pull all tags (2nd approach)
Sebastiaan van Stijn 1 年之前
父節點
當前提交
d9dce8e0d0
共有 5 個文件被更改,包括 121 次插入57 次删除
  1. 38 4
      daemon/containerd/image_pull.go
  2. 1 1
      daemon/containerd/resolver.go
  3. 5 3
      daemon/containerd/service.go
  4. 76 37
      distribution/pull.go
  5. 1 12
      distribution/pull_v2.go

+ 38 - 4
daemon/containerd/image_pull.go

@@ -15,7 +15,7 @@ import (
 	"github.com/containerd/log"
 	"github.com/distribution/reference"
 	"github.com/docker/docker/api/types/events"
-	"github.com/docker/docker/api/types/registry"
+	registrytypes "github.com/docker/docker/api/types/registry"
 	"github.com/docker/docker/distribution"
 	"github.com/docker/docker/errdefs"
 	"github.com/docker/docker/internal/compatcontext"
@@ -25,8 +25,43 @@ import (
 	"github.com/pkg/errors"
 )
 
-// PullImage initiates a pull operation. ref is the image to pull.
-func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
+// PullImage initiates a pull operation. baseRef is the image to pull.
+// If reference is not tagged, all tags are pulled.
+func (i *ImageService) PullImage(ctx context.Context, baseRef reference.Named, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, outStream io.Writer) error {
+	out := streamformatter.NewJSONProgressOutput(outStream, false)
+
+	if tagged, ok := baseRef.(reference.NamedTagged); ok {
+		return i.pullTag(ctx, tagged, platform, metaHeaders, authConfig, out)
+	}
+
+	tags, err := distribution.Tags(ctx, baseRef, &distribution.Config{
+		RegistryService: i.registryService,
+		MetaHeaders:     metaHeaders,
+		AuthConfig:      authConfig,
+	})
+	if err != nil {
+		return err
+	}
+
+	for _, tag := range tags {
+		ref, err := reference.WithTag(baseRef, tag)
+		if err != nil {
+			log.G(ctx).WithFields(log.Fields{
+				"tag":     tag,
+				"baseRef": baseRef,
+			}).Warn("invalid tag, won't pull")
+			continue
+		}
+
+		if err := i.pullTag(ctx, ref, platform, metaHeaders, authConfig, out); err != nil {
+			return fmt.Errorf("error pulling %s: %w", ref, err)
+		}
+	}
+
+	return nil
+}
+
+func (i *ImageService) pullTag(ctx context.Context, ref reference.NamedTagged, platform *ocispec.Platform, metaHeaders map[string][]string, authConfig *registrytypes.AuthConfig, out progress.Output) error {
 	var opts []containerd.RemoteOpt
 	if platform != nil {
 		opts = append(opts, containerd.WithPlatform(platforms.Format(*platform)))
@@ -53,7 +88,6 @@ func (i *ImageService) PullImage(ctx context.Context, ref reference.Named, platf
 	})
 	opts = append(opts, containerd.WithImageHandler(h))
 
-	out := streamformatter.NewJSONProgressOutput(outStream, false)
 	pp := pullProgress{store: i.client.ContentStore(), showExists: true}
 	finishProgress := jobs.showProgress(ctx, out, pp)
 

+ 1 - 1
daemon/containerd/resolver.go

@@ -31,7 +31,7 @@ func (i *ImageService) newResolverFromAuthConfig(ctx context.Context, authConfig
 	}), tracker
 }
 
-func hostsWrapper(hostsFn docker.RegistryHosts, optAuthConfig *registrytypes.AuthConfig, regService RegistryConfigProvider) docker.RegistryHosts {
+func hostsWrapper(hostsFn docker.RegistryHosts, optAuthConfig *registrytypes.AuthConfig, regService registryResolver) docker.RegistryHosts {
 	var authorizer docker.Authorizer
 	if optAuthConfig != nil {
 		authorizer = authorizerFromAuthConfig(*optAuthConfig)

+ 5 - 3
daemon/containerd/service.go

@@ -30,16 +30,18 @@ type ImageService struct {
 	containers      container.Store
 	snapshotter     string
 	registryHosts   docker.RegistryHosts
-	registryService RegistryConfigProvider
+	registryService registryResolver
 	eventsService   *daemonevents.Events
 	pruneRunning    atomic.Bool
 	refCountMounter snapshotter.Mounter
 	idMapping       idtools.IdentityMapping
 }
 
-type RegistryConfigProvider interface {
+type registryResolver interface {
 	IsInsecureRegistry(host string) bool
 	ResolveRepository(name reference.Named) (*registry.RepositoryInfo, error)
+	LookupPullEndpoints(hostname string) ([]registry.APIEndpoint, error)
+	LookupPushEndpoints(hostname string) ([]registry.APIEndpoint, error)
 }
 
 type ImageServiceConfig struct {
@@ -47,7 +49,7 @@ type ImageServiceConfig struct {
 	Containers      container.Store
 	Snapshotter     string
 	RegistryHosts   docker.RegistryHosts
-	Registry        RegistryConfigProvider
+	Registry        registryResolver
 	EventsService   *daemonevents.Events
 	RefCountMounter snapshotter.Mounter
 	IDMapping       idtools.IdentityMapping

+ 76 - 37
distribution/pull.go

@@ -9,6 +9,7 @@ import (
 	"github.com/docker/docker/api"
 	"github.com/docker/docker/api/types/events"
 	refstore "github.com/docker/docker/reference"
+	"github.com/docker/docker/registry"
 	"github.com/opencontainers/go-digest"
 	"github.com/pkg/errors"
 )
@@ -16,20 +17,79 @@ import (
 // Pull initiates a pull operation. image is the repository name to pull, and
 // tag may be either empty, or indicate a specific tag to pull.
 func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, local ContentStore) error {
-	// Resolve the Repository name from fqn to RepositoryInfo
-	repoInfo, err := config.RegistryService.ResolveRepository(ref)
+	repoInfo, err := pullEndpoints(ctx, config.RegistryService, ref, func(ctx context.Context, repoInfo registry.RepositoryInfo, endpoint registry.APIEndpoint) error {
+		log.G(ctx).Debugf("Trying to pull %s from %s", reference.FamiliarName(repoInfo.Name), endpoint.URL)
+		puller := newPuller(endpoint, &repoInfo, config, local)
+		return puller.pull(ctx, ref)
+	})
+
+	if err == nil {
+		config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPull)
+	}
+
+	return err
+}
+
+// Tags returns available tags for the given image in the remote repository.
+func Tags(ctx context.Context, ref reference.Named, config *Config) ([]string, error) {
+	var tags []string
+	_, err := pullEndpoints(ctx, config.RegistryService, ref, func(ctx context.Context, repoInfo registry.RepositoryInfo, endpoint registry.APIEndpoint) error {
+		repo, err := newRepository(ctx, &repoInfo, endpoint, config.MetaHeaders, config.AuthConfig, "pull")
+		if err != nil {
+			return err
+		}
+
+		tags, err = repo.Tags(ctx).All(ctx)
+		return err
+	})
+
+	return tags, err
+}
+
+// validateRepoName validates the name of a repository.
+func validateRepoName(name reference.Named) error {
+	if reference.FamiliarName(name) == api.NoBaseImageSpecifier {
+		return errors.WithStack(reservedNameError(api.NoBaseImageSpecifier))
+	}
+	return nil
+}
+
+func addDigestReference(store refstore.Store, ref reference.Named, dgst digest.Digest, id digest.Digest) error {
+	dgstRef, err := reference.WithDigest(reference.TrimNamed(ref), dgst)
 	if err != nil {
 		return err
 	}
 
+	if oldTagID, err := store.Get(dgstRef); err == nil {
+		if oldTagID != id {
+			// Updating digests not supported by reference store
+			log.G(context.TODO()).Errorf("Image ID for digest %s changed from %s to %s, cannot update", dgst.String(), oldTagID, id)
+		}
+		return nil
+	} else if err != refstore.ErrDoesNotExist {
+		return err
+	}
+
+	return store.AddDigest(dgstRef, id, true)
+}
+
+func pullEndpoints(ctx context.Context, registryService RegistryResolver, ref reference.Named,
+	f func(context.Context, registry.RepositoryInfo, registry.APIEndpoint) error,
+) (*registry.RepositoryInfo, error) {
+	// Resolve the Repository name from fqn to RepositoryInfo
+	repoInfo, err := registryService.ResolveRepository(ref)
+	if err != nil {
+		return nil, err
+	}
+
 	// makes sure name is not `scratch`
 	if err := validateRepoName(repoInfo.Name); err != nil {
-		return err
+		return repoInfo, err
 	}
 
-	endpoints, err := config.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
+	endpoints, err := registryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
 	if err != nil {
-		return err
+		return repoInfo, err
 	}
 
 	var (
@@ -50,7 +110,14 @@ func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, loc
 
 		log.G(ctx).Debugf("Trying to pull %s from %s", reference.FamiliarName(repoInfo.Name), endpoint.URL)
 
-		if err := newPuller(endpoint, repoInfo, config, local).pull(ctx, ref); err != nil {
+		if err := f(ctx, *repoInfo, endpoint); err != nil {
+			if _, ok := err.(fallbackError); !ok && continueOnError(err, endpoint.Mirror) {
+				err = fallbackError{
+					err:         err,
+					transportOK: true,
+				}
+			}
+
 			// Was this pull cancelled? If so, don't try to fall
 			// back.
 			fallback := false
@@ -71,43 +138,15 @@ func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, loc
 				continue
 			}
 			log.G(ctx).Errorf("Not continuing with pull after error: %v", err)
-			return translatePullError(err, ref)
+			return repoInfo, translatePullError(err, ref)
 		}
 
-		config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPull)
-		return nil
+		return repoInfo, nil
 	}
 
 	if lastErr == nil {
 		lastErr = fmt.Errorf("no endpoints found for %s", reference.FamiliarString(ref))
 	}
 
-	return translatePullError(lastErr, ref)
-}
-
-// validateRepoName validates the name of a repository.
-func validateRepoName(name reference.Named) error {
-	if reference.FamiliarName(name) == api.NoBaseImageSpecifier {
-		return errors.WithStack(reservedNameError(api.NoBaseImageSpecifier))
-	}
-	return nil
-}
-
-func addDigestReference(store refstore.Store, ref reference.Named, dgst digest.Digest, id digest.Digest) error {
-	dgstRef, err := reference.WithDigest(reference.TrimNamed(ref), dgst)
-	if err != nil {
-		return err
-	}
-
-	if oldTagID, err := store.Get(dgstRef); err == nil {
-		if oldTagID != id {
-			// Updating digests not supported by reference store
-			log.G(context.TODO()).Errorf("Image ID for digest %s changed from %s to %s, cannot update", dgst.String(), oldTagID, id)
-		}
-		return nil
-	} else if err != refstore.ErrDoesNotExist {
-		return err
-	}
-
-	return store.AddDigest(dgstRef, id, true)
+	return repoInfo, translatePullError(lastErr, ref)
 }

+ 1 - 12
distribution/pull_v2.go

@@ -86,18 +86,7 @@ func (p *puller) pull(ctx context.Context, ref reference.Named) (err error) {
 		return err
 	}
 
-	if err = p.pullRepository(ctx, ref); err != nil {
-		if _, ok := err.(fallbackError); ok {
-			return err
-		}
-		if continueOnError(err, p.endpoint.Mirror) {
-			return fallbackError{
-				err:         err,
-				transportOK: true,
-			}
-		}
-	}
-	return err
+	return p.pullRepository(ctx, ref)
 }
 
 func (p *puller) pullRepository(ctx context.Context, ref reference.Named) (err error) {