distribution: remove Puller interface, remove redundant V1 checks
It's only used internally, so we can refer to the implementation itself. Given that RegistryService.LookupPullEndpoints now only returns V2 endpoints, we no longer need to check if an endpoint is possibly V1. Also rename some types that had "v2" in their name, now that we only support v2. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
parent
074e41679d
commit
41999abcbe
6 changed files with 74 additions and 104 deletions
|
@ -15,38 +15,24 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Puller is an interface that abstracts pulling for different API versions.
|
||||
type Puller interface {
|
||||
// Pull tries to pull the image referenced by `tag`
|
||||
// Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint.
|
||||
//
|
||||
Pull(ctx context.Context, ref reference.Named) error
|
||||
}
|
||||
|
||||
// newPuller returns a Puller interface that will pull from a v2 registry.
|
||||
func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) {
|
||||
switch endpoint.Version {
|
||||
case registry.APIVersion2:
|
||||
return &v2Puller{
|
||||
V2MetadataService: metadata.NewV2MetadataService(imagePullConfig.MetadataStore),
|
||||
endpoint: endpoint,
|
||||
config: imagePullConfig,
|
||||
repoInfo: repoInfo,
|
||||
manifestStore: &manifestStore{
|
||||
local: local,
|
||||
},
|
||||
}, nil
|
||||
case registry.APIVersion1:
|
||||
return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
|
||||
// newPuller returns a puller to pull from a v2 registry.
|
||||
func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePullConfig, local ContentStore) *puller {
|
||||
return &puller{
|
||||
metadataService: metadata.NewV2MetadataService(config.MetadataStore),
|
||||
endpoint: endpoint,
|
||||
config: config,
|
||||
repoInfo: repoInfo,
|
||||
manifestStore: &manifestStore{
|
||||
local: local,
|
||||
},
|
||||
}
|
||||
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
|
||||
}
|
||||
|
||||
// Pull initiates a pull operation. image is the repository name to pull, and
|
||||
// tag may be either empty, or indicate a specific tag to pull.
|
||||
func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error {
|
||||
func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, local ContentStore) error {
|
||||
// Resolve the Repository name from fqn to RepositoryInfo
|
||||
repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
|
||||
repoInfo, err := config.RegistryService.ResolveRepository(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -56,7 +42,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
|
|||
return err
|
||||
}
|
||||
|
||||
endpoints, err := imagePullConfig.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
|
||||
endpoints, err := config.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -77,15 +63,9 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
|
|||
}
|
||||
}
|
||||
|
||||
logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version)
|
||||
logrus.Debugf("Trying to pull %s from %s", reference.FamiliarName(repoInfo.Name), endpoint.URL)
|
||||
|
||||
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
if err := puller.Pull(ctx, ref); err != nil {
|
||||
if err := newPuller(endpoint, repoInfo, config, local).pull(ctx, ref); err != nil {
|
||||
// Was this pull cancelled? If so, don't try to fall
|
||||
// back.
|
||||
fallback := false
|
||||
|
@ -109,7 +89,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
|
|||
return translatePullError(err, ref)
|
||||
}
|
||||
|
||||
imagePullConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull")
|
||||
config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -52,16 +52,16 @@ func (e imageConfigPullError) Error() string {
|
|||
return "error pulling image configuration: " + e.Err.Error()
|
||||
}
|
||||
|
||||
type v2Puller struct {
|
||||
V2MetadataService metadata.V2MetadataService
|
||||
endpoint registry.APIEndpoint
|
||||
config *ImagePullConfig
|
||||
repoInfo *registry.RepositoryInfo
|
||||
repo distribution.Repository
|
||||
manifestStore *manifestStore
|
||||
type puller struct {
|
||||
metadataService metadata.V2MetadataService
|
||||
endpoint registry.APIEndpoint
|
||||
config *ImagePullConfig
|
||||
repoInfo *registry.RepositoryInfo
|
||||
repo distribution.Repository
|
||||
manifestStore *manifestStore
|
||||
}
|
||||
|
||||
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
|
||||
func (p *puller) pull(ctx context.Context, ref reference.Named) (err error) {
|
||||
// TODO(tiborvass): was ReceiveTimeout
|
||||
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
|
||||
if err != nil {
|
||||
|
@ -74,7 +74,7 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
if err = p.pullV2Repository(ctx, ref); err != nil {
|
||||
if err = p.pullRepository(ctx, ref); err != nil {
|
||||
if _, ok := err.(fallbackError); ok {
|
||||
return err
|
||||
}
|
||||
|
@ -88,10 +88,10 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) {
|
||||
func (p *puller) pullRepository(ctx context.Context, ref reference.Named) (err error) {
|
||||
var layersDownloaded bool
|
||||
if !reference.IsNameOnly(ref) {
|
||||
layersDownloaded, err = p.pullV2Tag(ctx, ref, p.config.Platform)
|
||||
layersDownloaded, err = p.pullTag(ctx, ref, p.config.Platform)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pulledNew, err := p.pullV2Tag(ctx, tagRef, p.config.Platform)
|
||||
pulledNew, err := p.pullTag(ctx, tagRef, p.config.Platform)
|
||||
if err != nil {
|
||||
// Since this is the pull-all-tags case, don't
|
||||
// allow an error pulling a particular tag to
|
||||
|
@ -127,33 +127,33 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
|
|||
return nil
|
||||
}
|
||||
|
||||
type v2LayerDescriptor struct {
|
||||
digest digest.Digest
|
||||
diffID layer.DiffID
|
||||
repoInfo *registry.RepositoryInfo
|
||||
repo distribution.Repository
|
||||
V2MetadataService metadata.V2MetadataService
|
||||
tmpFile *os.File
|
||||
verifier digest.Verifier
|
||||
src distribution.Descriptor
|
||||
type layerDescriptor struct {
|
||||
digest digest.Digest
|
||||
diffID layer.DiffID
|
||||
repoInfo *registry.RepositoryInfo
|
||||
repo distribution.Repository
|
||||
metadataService metadata.V2MetadataService
|
||||
tmpFile *os.File
|
||||
verifier digest.Verifier
|
||||
src distribution.Descriptor
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) Key() string {
|
||||
func (ld *layerDescriptor) Key() string {
|
||||
return "v2:" + ld.digest.String()
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) ID() string {
|
||||
func (ld *layerDescriptor) ID() string {
|
||||
return stringid.TruncateID(ld.digest.String())
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
|
||||
func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
|
||||
if ld.diffID != "" {
|
||||
return ld.diffID, nil
|
||||
}
|
||||
return ld.V2MetadataService.GetDiffID(ld.digest)
|
||||
return ld.metadataService.GetDiffID(ld.digest)
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
|
||||
func (ld *layerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
|
||||
logrus.Debugf("pulling blob %q", ld.digest)
|
||||
|
||||
var (
|
||||
|
@ -291,7 +291,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
|
|||
}), size, nil
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) Close() {
|
||||
func (ld *layerDescriptor) Close() {
|
||||
if ld.tmpFile != nil {
|
||||
ld.tmpFile.Close()
|
||||
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
|
||||
|
@ -300,7 +300,7 @@ func (ld *v2LayerDescriptor) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) truncateDownloadFile() error {
|
||||
func (ld *layerDescriptor) truncateDownloadFile() error {
|
||||
// Need a new hash context since we will be redoing the download
|
||||
ld.verifier = nil
|
||||
|
||||
|
@ -317,13 +317,12 @@ func (ld *v2LayerDescriptor) truncateDownloadFile() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
|
||||
func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
|
||||
// Cache mapping from this layer's DiffID to the blobsum
|
||||
ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
|
||||
_ = ld.metadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
|
||||
|
||||
func (p *puller) pullTag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
|
||||
var (
|
||||
tagOrDigest string // Used for logging/progress only
|
||||
dgst digest.Digest
|
||||
|
@ -477,7 +476,7 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform
|
|||
|
||||
// validateMediaType validates if the given mediaType is accepted by the puller's
|
||||
// configuration.
|
||||
func (p *v2Puller) validateMediaType(mediaType string) error {
|
||||
func (p *puller) validateMediaType(mediaType string) error {
|
||||
var allowedMediaTypes []string
|
||||
if len(p.config.Schema2Types) > 0 {
|
||||
allowedMediaTypes = p.config.Schema2Types
|
||||
|
@ -497,7 +496,7 @@ func (p *v2Puller) validateMediaType(mediaType string) error {
|
|||
return invalidManifestClassError{mediaType, configClass}
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
func (p *puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
if platform != nil {
|
||||
// Early bath if the requested OS doesn't match that of the configuration.
|
||||
// This avoids doing the download, only to potentially fail later.
|
||||
|
@ -550,11 +549,11 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
|
|||
continue
|
||||
}
|
||||
|
||||
layerDescriptor := &v2LayerDescriptor{
|
||||
digest: blobSum,
|
||||
repoInfo: p.repoInfo,
|
||||
repo: p.repo,
|
||||
V2MetadataService: p.V2MetadataService,
|
||||
layerDescriptor := &layerDescriptor{
|
||||
digest: blobSum,
|
||||
repoInfo: p.repoInfo,
|
||||
repo: p.repo,
|
||||
metadataService: p.metadataService,
|
||||
}
|
||||
|
||||
descriptors = append(descriptors, layerDescriptor)
|
||||
|
@ -581,7 +580,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
|
|||
return imageID, manifestDigest, nil
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
|
||||
func (p *puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
|
||||
if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
|
||||
// If the image already exists locally, no need to pull
|
||||
// anything.
|
||||
|
@ -596,12 +595,12 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
|
|||
if err := d.Digest.Validate(); err != nil {
|
||||
return "", errors.Wrapf(err, "could not validate layer digest %q", d.Digest)
|
||||
}
|
||||
layerDescriptor := &v2LayerDescriptor{
|
||||
digest: d.Digest,
|
||||
repo: p.repo,
|
||||
repoInfo: p.repoInfo,
|
||||
V2MetadataService: p.V2MetadataService,
|
||||
src: d,
|
||||
layerDescriptor := &layerDescriptor{
|
||||
digest: d.Digest,
|
||||
repo: p.repo,
|
||||
repoInfo: p.repoInfo,
|
||||
metadataService: p.metadataService,
|
||||
src: d,
|
||||
}
|
||||
|
||||
descriptors = append(descriptors, layerDescriptor)
|
||||
|
@ -674,7 +673,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
|
|||
// Populate diff ids in descriptors to avoid downloading foreign layers
|
||||
// which have been side loaded
|
||||
for i := range descriptors {
|
||||
descriptors[i].(*v2LayerDescriptor).diffID = configRootFS.DiffIDs[i]
|
||||
descriptors[i].(*layerDescriptor).diffID = configRootFS.DiffIDs[i]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -756,7 +755,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
|
|||
return imageID, nil
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
func (p *puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
manifestDigest, err = schema2ManifestDigest(ref, mfst)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
|
@ -765,7 +764,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
|
|||
return id, manifestDigest, err
|
||||
}
|
||||
|
||||
func (p *v2Puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
func (p *puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
|
||||
manifestDigest, err = schema2ManifestDigest(ref, mfst)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
|
@ -795,7 +794,7 @@ func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan
|
|||
|
||||
// pullManifestList handles "manifest lists" which point to various
|
||||
// platform-specific manifests.
|
||||
func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) {
|
||||
func (p *puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) {
|
||||
manifestListDigest, err = schema2ManifestDigest(ref, mfstList)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
|
@ -874,7 +873,7 @@ const (
|
|||
defaultMaxSchemaPullAttempts = 5
|
||||
)
|
||||
|
||||
func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
|
||||
func (p *puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
|
||||
blobs := p.repo.Blobs(ctx)
|
||||
err = retry(ctx, defaultMaxSchemaPullAttempts, defaultSchemaPullBackoff, func(ctx context.Context) (err error) {
|
||||
configJSON, err = blobs.Get(ctx, dgst)
|
||||
|
|
|
@ -322,7 +322,7 @@ func TestPullSchema2Config(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testNewPuller(t *testing.T, rawurl string) *v2Puller {
|
||||
func testNewPuller(t *testing.T, rawurl string) *puller {
|
||||
t.Helper()
|
||||
|
||||
uri, err := url.Parse(rawurl)
|
||||
|
@ -358,12 +358,7 @@ func testNewPuller(t *testing.T, rawurl string) *v2Puller {
|
|||
},
|
||||
}
|
||||
|
||||
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
p := puller.(*v2Puller)
|
||||
|
||||
p := newPuller(endpoint, repoInfo, imagePullConfig, nil)
|
||||
p.repo, err = NewV2Repository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
|
||||
func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
|
||||
blobs := ld.repo.Blobs(ctx)
|
||||
return blobs.Open(ctx, ld.digest)
|
||||
}
|
||||
|
|
|
@ -22,16 +22,16 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var _ distribution.Describable = &v2LayerDescriptor{}
|
||||
var _ distribution.Describable = &layerDescriptor{}
|
||||
|
||||
func (ld *v2LayerDescriptor) Descriptor() distribution.Descriptor {
|
||||
func (ld *layerDescriptor) Descriptor() distribution.Descriptor {
|
||||
if ld.src.MediaType == schema2.MediaTypeForeignLayer && len(ld.src.URLs) > 0 {
|
||||
return ld.src
|
||||
}
|
||||
return distribution.Descriptor{}
|
||||
}
|
||||
|
||||
func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
|
||||
func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
|
||||
blobs := ld.repo.Blobs(ctx)
|
||||
rsc, err := blobs.Open(ctx, ld.digest)
|
||||
|
||||
|
|
|
@ -68,11 +68,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
|
|||
},
|
||||
},
|
||||
}
|
||||
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
p := puller.(*v2Puller)
|
||||
p := newPuller(endpoint, repoInfo, imagePullConfig, nil)
|
||||
ctx := context.Background()
|
||||
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
|
||||
if err != nil {
|
||||
|
@ -82,7 +78,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
|
|||
logrus.Debug("About to pull")
|
||||
// We expect it to fail, since we haven't mock'd the full registry exchange in our handler above
|
||||
tag, _ := reference.WithTag(n, "tag_goes_here")
|
||||
_ = p.pullV2Repository(ctx, tag)
|
||||
_ = p.pullRepository(ctx, tag)
|
||||
}
|
||||
|
||||
func TestTokenPassThru(t *testing.T) {
|
||||
|
|
Loading…
Add table
Reference in a new issue