Merge pull request #43365 from thaJeztah/cleanup_distribution

distribution: remove v1 leftovers, and refactor to reduce public api/interface
This commit is contained in:
Sebastiaan van Stijn 2022-04-26 23:45:38 +02:00 committed by GitHub
commit 9184f0b5e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 305 additions and 372 deletions

View file

@ -122,7 +122,6 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
ReferenceStore: i.referenceStore,
},
DownloadManager: i.downloadManager,
Schema2Types: distribution.ImageTypes,
Platform: platform,
}
@ -134,35 +133,12 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
// GetRepository returns a repository from the registry.
func (i *ImageService) GetRepository(ctx context.Context, ref reference.Named, authConfig *types.AuthConfig) (dist.Repository, error) {
// get repository info
repoInfo, err := i.registryService.ResolveRepository(ref)
if err != nil {
return nil, errdefs.InvalidParameter(err)
}
// makes sure name is not empty or `scratch`
if err := distribution.ValidateRepoName(repoInfo.Name); err != nil {
return nil, errdefs.InvalidParameter(err)
}
// get endpoints
endpoints, err := i.registryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return nil, err
}
// retrieve repository
var (
repository dist.Repository
lastError error
)
for _, endpoint := range endpoints {
repository, lastError = distribution.NewV2Repository(ctx, repoInfo, endpoint, nil, authConfig, "pull")
if lastError == nil {
break
}
}
return repository, lastError
return distribution.GetRepository(ctx, ref, &distribution.ImagePullConfig{
Config: distribution.Config{
AuthConfig: authConfig,
RegistryService: i.registryService,
},
})
}
func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {

View file

@ -57,8 +57,9 @@ type ImagePullConfig struct {
// DownloadManager manages concurrent pulls.
DownloadManager *xfer.LayerDownloadManager
// Schema2Types is the valid schema2 configuration types allowed
// by the pull operation.
// Schema2Types is an optional list of valid schema2 configuration types
// allowed by the pull operation. If omitted, the default list of accepted
// types is used.
Schema2Types []string
// Platform is the requested platform of the image being pulled
Platform *specs.Platform
@ -86,8 +87,6 @@ type ImagePushConfig struct {
type ImageConfigStore interface {
Put(context.Context, []byte) (digest.Digest, error)
Get(context.Context, digest.Digest) ([]byte, error)
RootFSFromConfig([]byte) (*image.RootFS, error)
PlatformFromConfig([]byte) (*specs.Platform, error)
}
// PushLayerProvider provides layers to be pushed by ChainID.
@ -132,7 +131,7 @@ func (s *imageConfigStore) Get(_ context.Context, d digest.Digest) ([]byte, erro
return img.RawJSON(), nil
}
func (s *imageConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
func rootFSFromConfig(c []byte) (*image.RootFS, error) {
var unmarshalledConfig image.Image
if err := json.Unmarshal(c, &unmarshalledConfig); err != nil {
return nil, err
@ -140,7 +139,7 @@ func (s *imageConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
return unmarshalledConfig.RootFS, nil
}
func (s *imageConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) {
func platformFromConfig(c []byte) (*specs.Platform, error) {
var unmarshalledConfig image.Image
if err := json.Unmarshal(c, &unmarshalledConfig); err != nil {
return nil, err
@ -153,7 +152,12 @@ func (s *imageConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error)
if !system.IsOSSupported(os) {
return nil, errors.Wrapf(system.ErrNotSupportedOperatingSystem, "image operating system %q cannot be used on this platform", os)
}
return &specs.Platform{OS: os, Architecture: unmarshalledConfig.Architecture, Variant: unmarshalledConfig.Variant, OSVersion: unmarshalledConfig.OSVersion}, nil
return &specs.Platform{
OS: os,
Architecture: unmarshalledConfig.Architecture,
Variant: unmarshalledConfig.Variant,
OSVersion: unmarshalledConfig.OSVersion,
}, nil
}
type storeLayerProvider struct {

View file

@ -18,17 +18,6 @@ import (
"github.com/sirupsen/logrus"
)
// ErrNoSupport is an error type used for errors indicating that an operation
// is not supported. It encapsulates a more specific error.
type ErrNoSupport struct{ Err error }
func (e ErrNoSupport) Error() string {
if e.Err == nil {
return "not supported"
}
return e.Err.Error()
}
// fallbackError wraps an error that can possibly allow fallback to a different
// endpoint.
type fallbackError struct {
@ -74,18 +63,18 @@ func (e notFoundError) Cause() error {
return e.cause
}
// TranslatePullError is used to convert an error from a registry pull
// translatePullError is used to convert an error from a registry pull
// operation to an error representing the entire pull operation. Any error
// information which is not used by the returned error gets output to
// log at info level.
func TranslatePullError(err error, ref reference.Named) error {
func translatePullError(err error, ref reference.Named) error {
switch v := err.(type) {
case errcode.Errors:
if len(v) != 0 {
for _, extra := range v[1:] {
logrus.Infof("Ignoring extra error returned from registry: %v", extra)
logrus.WithError(extra).Infof("Ignoring extra error returned from registry")
}
return TranslatePullError(v[0], ref)
return translatePullError(v[0], ref)
}
case errcode.Error:
switch v.Code {
@ -93,7 +82,7 @@ func TranslatePullError(err error, ref reference.Named) error {
return notFoundError{v, ref}
}
case xfer.DoNotRetry:
return TranslatePullError(v.Err, ref)
return translatePullError(v.Err, ref)
}
return errdefs.Unknown(err)
@ -125,14 +114,12 @@ func continueOnError(err error, mirrorEndpoint bool) bool {
return true
}
return continueOnError(v[0], mirrorEndpoint)
case ErrNoSupport:
return continueOnError(v.Err, mirrorEndpoint)
case errcode.Error:
return mirrorEndpoint
case *client.UnexpectedHTTPResponseError:
return true
case ImageConfigPullError:
// ImageConfigPullError only happens with v2 images, v1 fallback is
case imageConfigPullError:
// imageConfigPullError only happens with v2 images, v1 fallback is
// unnecessary.
// Failures from a mirror endpoint should result in fallback to the
// canonical repo.

View file

@ -18,15 +18,13 @@ var alwaysContinue = []error{
errUnexpected,
// nested
errcode.Errors{errUnexpected},
ErrNoSupport{Err: errUnexpected},
}
var continueFromMirrorEndpoint = []error{
ImageConfigPullError{},
imageConfigPullError{},
errcode.Error{},
// nested
errcode.Errors{errcode.Error{}},
ErrNoSupport{Err: errcode.Error{}},
}
var neverContinue = []error{

View file

@ -6,57 +6,27 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/docker/api"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/pkg/progress"
refstore "github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Puller is an interface that abstracts pulling for different API versions.
type Puller interface {
// Pull tries to pull the image referenced by `tag`
// Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint.
//
Pull(ctx context.Context, ref reference.Named) error
}
// newPuller returns a Puller interface that will pull from a v2 registry.
func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Puller{
V2MetadataService: metadata.NewV2MetadataService(imagePullConfig.MetadataStore),
endpoint: endpoint,
config: imagePullConfig,
repoInfo: repoInfo,
manifestStore: &manifestStore{
local: local,
},
}, nil
case registry.APIVersion1:
return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
// Pull initiates a pull operation. image is the repository name to pull, and
// tag may be either empty, or indicate a specific tag to pull.
func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error {
func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, local ContentStore) error {
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
repoInfo, err := config.RegistryService.ResolveRepository(ref)
if err != nil {
return err
}
// makes sure name is not `scratch`
if err := ValidateRepoName(repoInfo.Name); err != nil {
if err := validateRepoName(repoInfo.Name); err != nil {
return err
}
endpoints, err := imagePullConfig.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
endpoints, err := config.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return err
}
@ -64,15 +34,6 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
var (
lastErr error
// discardNoSupportErrors is used to track whether an endpoint encountered an error of type registry.ErrNoSupport
// By default it is false, which means that if an ErrNoSupport error is encountered, it will be saved in lastErr.
// As soon as another kind of error is encountered, discardNoSupportErrors is set to true, avoiding the saving of
// any subsequent ErrNoSupport errors in lastErr.
// It's needed for pull-by-digest on v1 endpoints: if there are only v1 endpoints configured, the error should be
// returned and displayed, but if there was a v2 endpoint which supports pull-by-digest, then the last relevant
// error is the ones from v2 endpoints not v1.
discardNoSupportErrors bool
// confirmedTLSRegistries is a map indicating which registries
// are known to be using TLS. There should never be a plaintext
// retry for any of these.
@ -86,15 +47,9 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
}
}
logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version)
logrus.Debugf("Trying to pull %s from %s", reference.FamiliarName(repoInfo.Name), endpoint.URL)
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local)
if err != nil {
lastErr = err
continue
}
if err := puller.Pull(ctx, ref); err != nil {
if err := newPuller(endpoint, repoInfo, config, local).pull(ctx, ref); err != nil {
// Was this pull cancelled? If so, don't try to fall
// back.
fallback := false
@ -110,25 +65,15 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
}
}
if fallback {
if _, ok := err.(ErrNoSupport); !ok {
// Because we found an error that's not ErrNoSupport, discard all subsequent ErrNoSupport errors.
discardNoSupportErrors = true
// append subsequent errors
lastErr = err
} else if !discardNoSupportErrors {
// Save the ErrNoSupport error, because it's either the first error or all encountered errors
// were also ErrNoSupport errors.
// append subsequent errors
lastErr = err
}
lastErr = err
logrus.Infof("Attempting next endpoint for pull after error: %v", err)
continue
}
logrus.Errorf("Not continuing with pull after error: %v", err)
return TranslatePullError(err, ref)
return translatePullError(err, ref)
}
imagePullConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull")
config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull")
return nil
}
@ -136,23 +81,11 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
lastErr = fmt.Errorf("no endpoints found for %s", reference.FamiliarString(ref))
}
return TranslatePullError(lastErr, ref)
return translatePullError(lastErr, ref)
}
// writeStatus writes a status message to out. If layersDownloaded is true, the
// status message indicates that a newer image was downloaded. Otherwise, it
// indicates that the image is up to date. requestedTag is the tag the message
// will refer to.
func writeStatus(requestedTag string, out progress.Output, layersDownloaded bool) {
if layersDownloaded {
progress.Message(out, "", "Status: Downloaded newer image for "+requestedTag)
} else {
progress.Message(out, "", "Status: Image is up to date for "+requestedTag)
}
}
// ValidateRepoName validates the name of a repository.
func ValidateRepoName(name reference.Named) error {
// validateRepoName validates the name of a repository.
func validateRepoName(name reference.Named) error {
if reference.FamiliarName(name) == api.NoBaseImageSpecifier {
return errors.WithStack(reservedNameError(api.NoBaseImageSpecifier))
}

View file

@ -41,29 +41,42 @@ var (
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
)
// ImageConfigPullError is an error pulling the image config blob
// imageConfigPullError is an error pulling the image config blob
// (only applies to schema2).
type ImageConfigPullError struct {
type imageConfigPullError struct {
Err error
}
// Error returns the error string for ImageConfigPullError.
func (e ImageConfigPullError) Error() string {
// Error returns the error string for imageConfigPullError.
func (e imageConfigPullError) Error() string {
return "error pulling image configuration: " + e.Err.Error()
}
type v2Puller struct {
V2MetadataService metadata.V2MetadataService
endpoint registry.APIEndpoint
config *ImagePullConfig
repoInfo *registry.RepositoryInfo
repo distribution.Repository
manifestStore *manifestStore
// newPuller returns a puller to pull from a v2 registry.
func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePullConfig, local ContentStore) *puller {
return &puller{
metadataService: metadata.NewV2MetadataService(config.MetadataStore),
endpoint: endpoint,
config: config,
repoInfo: repoInfo,
manifestStore: &manifestStore{
local: local,
},
}
}
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
type puller struct {
metadataService metadata.V2MetadataService
endpoint registry.APIEndpoint
config *ImagePullConfig
repoInfo *registry.RepositoryInfo
repo distribution.Repository
manifestStore *manifestStore
}
func (p *puller) pull(ctx context.Context, ref reference.Named) (err error) {
// TODO(tiborvass): was ReceiveTimeout
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
p.repo, err = newRepository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
logrus.Warnf("Error getting v2 registry: %v", err)
return err
@ -74,7 +87,7 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
return err
}
if err = p.pullV2Repository(ctx, ref); err != nil {
if err = p.pullRepository(ctx, ref); err != nil {
if _, ok := err.(fallbackError); ok {
return err
}
@ -88,10 +101,10 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
return err
}
func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) {
func (p *puller) pullRepository(ctx context.Context, ref reference.Named) (err error) {
var layersDownloaded bool
if !reference.IsNameOnly(ref) {
layersDownloaded, err = p.pullV2Tag(ctx, ref, p.config.Platform)
layersDownloaded, err = p.pullTag(ctx, ref, p.config.Platform)
if err != nil {
return err
}
@ -106,7 +119,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
if err != nil {
return err
}
pulledNew, err := p.pullV2Tag(ctx, tagRef, p.config.Platform)
pulledNew, err := p.pullTag(ctx, tagRef, p.config.Platform)
if err != nil {
// Since this is the pull-all-tags case, don't
// allow an error pulling a particular tag to
@ -122,38 +135,50 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
}
}
writeStatus(reference.FamiliarString(ref), p.config.ProgressOutput, layersDownloaded)
p.writeStatus(reference.FamiliarString(ref), layersDownloaded)
return nil
}
type v2LayerDescriptor struct {
digest digest.Digest
diffID layer.DiffID
repoInfo *registry.RepositoryInfo
repo distribution.Repository
V2MetadataService metadata.V2MetadataService
tmpFile *os.File
verifier digest.Verifier
src distribution.Descriptor
// writeStatus writes a status message to out. If layersDownloaded is true, the
// status message indicates that a newer image was downloaded. Otherwise, it
// indicates that the image is up to date. requestedTag is the tag the message
// will refer to.
func (p *puller) writeStatus(requestedTag string, layersDownloaded bool) {
if layersDownloaded {
progress.Message(p.config.ProgressOutput, "", "Status: Downloaded newer image for "+requestedTag)
} else {
progress.Message(p.config.ProgressOutput, "", "Status: Image is up to date for "+requestedTag)
}
}
func (ld *v2LayerDescriptor) Key() string {
type layerDescriptor struct {
digest digest.Digest
diffID layer.DiffID
repoInfo *registry.RepositoryInfo
repo distribution.Repository
metadataService metadata.V2MetadataService
tmpFile *os.File
verifier digest.Verifier
src distribution.Descriptor
}
func (ld *layerDescriptor) Key() string {
return "v2:" + ld.digest.String()
}
func (ld *v2LayerDescriptor) ID() string {
func (ld *layerDescriptor) ID() string {
return stringid.TruncateID(ld.digest.String())
}
func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
if ld.diffID != "" {
return ld.diffID, nil
}
return ld.V2MetadataService.GetDiffID(ld.digest)
return ld.metadataService.GetDiffID(ld.digest)
}
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
func (ld *layerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)
var (
@ -291,7 +316,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
}), size, nil
}
func (ld *v2LayerDescriptor) Close() {
func (ld *layerDescriptor) Close() {
if ld.tmpFile != nil {
ld.tmpFile.Close()
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
@ -300,7 +325,7 @@ func (ld *v2LayerDescriptor) Close() {
}
}
func (ld *v2LayerDescriptor) truncateDownloadFile() error {
func (ld *layerDescriptor) truncateDownloadFile() error {
// Need a new hash context since we will be redoing the download
ld.verifier = nil
@ -317,13 +342,12 @@ func (ld *v2LayerDescriptor) truncateDownloadFile() error {
return nil
}
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
_ = ld.metadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
}
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
func (p *puller) pullTag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
var (
tagOrDigest string // Used for logging/progress only
dgst digest.Digest
@ -397,19 +421,8 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform
}
if m, ok := manifest.(*schema2.DeserializedManifest); ok {
var allowedMediatype bool
for _, t := range p.config.Schema2Types {
if m.Manifest.Config.MediaType == t {
allowedMediatype = true
break
}
}
if !allowedMediatype {
configClass := mediaTypeClasses[m.Manifest.Config.MediaType]
if configClass == "" {
configClass = "unknown"
}
return false, invalidManifestClassError{m.Manifest.Config.MediaType, configClass}
if err := p.validateMediaType(m.Manifest.Config.MediaType); err != nil {
return false, err
}
}
@ -486,7 +499,29 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform
return true, nil
}
func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
// validateMediaType validates if the given mediaType is accepted by the puller's
// configuration.
func (p *puller) validateMediaType(mediaType string) error {
var allowedMediaTypes []string
if len(p.config.Schema2Types) > 0 {
allowedMediaTypes = p.config.Schema2Types
} else {
allowedMediaTypes = defaultImageTypes
}
for _, t := range allowedMediaTypes {
if mediaType == t {
return nil
}
}
configClass := mediaTypeClasses[mediaType]
if configClass == "" {
configClass = "unknown"
}
return invalidManifestClassError{mediaType, configClass}
}
func (p *puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
if platform != nil {
// Early bath if the requested OS doesn't match that of the configuration.
// This avoids doing the download, only to potentially fail later.
@ -539,11 +574,11 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
continue
}
layerDescriptor := &v2LayerDescriptor{
digest: blobSum,
repoInfo: p.repoInfo,
repo: p.repo,
V2MetadataService: p.V2MetadataService,
layerDescriptor := &layerDescriptor{
digest: blobSum,
repoInfo: p.repoInfo,
repo: p.repo,
metadataService: p.metadataService,
}
descriptors = append(descriptors, layerDescriptor)
@ -570,7 +605,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv
return imageID, manifestDigest, nil
}
func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
func (p *puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
// If the image already exists locally, no need to pull
// anything.
@ -585,12 +620,12 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
if err := d.Digest.Validate(); err != nil {
return "", errors.Wrapf(err, "could not validate layer digest %q", d.Digest)
}
layerDescriptor := &v2LayerDescriptor{
digest: d.Digest,
repo: p.repo,
repoInfo: p.repoInfo,
V2MetadataService: p.V2MetadataService,
src: d,
layerDescriptor := &layerDescriptor{
digest: d.Digest,
repo: p.repo,
repoInfo: p.repoInfo,
metadataService: p.metadataService,
src: d,
}
descriptors = append(descriptors, layerDescriptor)
@ -608,7 +643,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
go func() {
configJSON, err := p.pullSchema2Config(ctx, target.Digest)
if err != nil {
configErrChan <- ImageConfigPullError{Err: err}
configErrChan <- imageConfigPullError{Err: err}
cancel()
return
}
@ -637,7 +672,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
// check to block Windows images being pulled on Linux is implemented, it
// may be necessary to perform the same type of serialisation.
if runtime.GOOS == "windows" {
configJSON, configRootFS, configPlatform, err = receiveConfig(p.config.ImageStore, configChan, configErrChan)
configJSON, configRootFS, configPlatform, err = receiveConfig(configChan, configErrChan)
if err != nil {
return "", err
}
@ -663,7 +698,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
// Populate diff ids in descriptors to avoid downloading foreign layers
// which have been side loaded
for i := range descriptors {
descriptors[i].(*v2LayerDescriptor).diffID = configRootFS.DiffIDs[i]
descriptors[i].(*layerDescriptor).diffID = configRootFS.DiffIDs[i]
}
}
@ -698,7 +733,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
}
if configJSON == nil {
configJSON, configRootFS, _, err = receiveConfig(p.config.ImageStore, configChan, configErrChan)
configJSON, configRootFS, _, err = receiveConfig(configChan, configErrChan)
if err == nil && configRootFS == nil {
err = errRootFSInvalid
}
@ -745,7 +780,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De
return imageID, nil
}
func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
func (p *puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
manifestDigest, err = schema2ManifestDigest(ref, mfst)
if err != nil {
return "", "", err
@ -754,7 +789,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
return id, manifestDigest, err
}
func (p *v2Puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
func (p *puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
manifestDigest, err = schema2ManifestDigest(ref, mfst)
if err != nil {
return "", "", err
@ -763,14 +798,14 @@ func (p *v2Puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocisc
return id, manifestDigest, err
}
func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, *specs.Platform, error) {
func receiveConfig(configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, *specs.Platform, error) {
select {
case configJSON := <-configChan:
rootfs, err := s.RootFSFromConfig(configJSON)
rootfs, err := rootFSFromConfig(configJSON)
if err != nil {
return nil, nil, nil, err
}
platform, err := s.PlatformFromConfig(configJSON)
platform, err := platformFromConfig(configJSON)
if err != nil {
return nil, nil, nil, err
}
@ -784,7 +819,7 @@ func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan
// pullManifestList handles "manifest lists" which point to various
// platform-specific manifests.
func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) {
func (p *puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) {
manifestListDigest, err = schema2ManifestDigest(ref, mfstList)
if err != nil {
return "", "", err
@ -863,7 +898,7 @@ const (
defaultMaxSchemaPullAttempts = 5
)
func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
func (p *puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
blobs := p.repo.Blobs(ctx)
err = retry(ctx, defaultMaxSchemaPullAttempts, defaultSchemaPullBackoff, func(ctx context.Context) (err error) {
configJSON, err = blobs.Get(ctx, dgst)

View file

@ -322,7 +322,7 @@ func TestPullSchema2Config(t *testing.T) {
}
}
func testNewPuller(t *testing.T, rawurl string) *v2Puller {
func testNewPuller(t *testing.T, rawurl string) *puller {
t.Helper()
uri, err := url.Parse(rawurl)
@ -356,16 +356,10 @@ func testNewPuller(t *testing.T, rawurl string) *v2Puller {
RegistryToken: secretRegistryToken,
},
},
Schema2Types: ImageTypes,
}
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
if err != nil {
t.Fatal(err)
}
p := puller.(*v2Puller)
p.repo, err = NewV2Repository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
p := newPuller(endpoint, repoInfo, imagePullConfig, nil)
p.repo, err = newRepository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
t.Fatal(err)
}

View file

@ -14,7 +14,7 @@ import (
"github.com/sirupsen/logrus"
)
func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
blobs := ld.repo.Blobs(ctx)
return blobs.Open(ctx, ld.digest)
}

View file

@ -22,16 +22,16 @@ import (
"github.com/sirupsen/logrus"
)
var _ distribution.Describable = &v2LayerDescriptor{}
var _ distribution.Describable = &layerDescriptor{}
func (ld *v2LayerDescriptor) Descriptor() distribution.Descriptor {
func (ld *layerDescriptor) Descriptor() distribution.Descriptor {
if ld.src.MediaType == schema2.MediaTypeForeignLayer && len(ld.src.URLs) > 0 {
return ld.src
}
return distribution.Descriptor{}
}
func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) {
blobs := ld.repo.Blobs(ctx)
rsc, err := blobs.Open(ctx, ld.digest)

View file

@ -8,64 +8,31 @@ import (
"io"
"github.com/docker/distribution/reference"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/registry"
"github.com/sirupsen/logrus"
)
// Pusher is an interface that abstracts pushing for different API versions.
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
//
// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
Push(ctx context.Context) error
}
const compressionBufSize = 32768
// NewPusher creates a new Pusher interface that will push to either a v1 or v2
// registry. The endpoint argument contains a Version field that determines
// whether a v1 or v2 pusher will be created. The other parameters are passed
// through to the underlying pusher implementation for use during the actual
// push operation.
func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{
v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: imagePushConfig,
}, nil
case registry.APIVersion1:
return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
// Push initiates a push operation on ref.
// ref is the specific variant of the image to be pushed.
// If no tag is provided, all tags will be pushed.
func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
// Push initiates a push operation on ref. ref is the specific variant of the
// image to push. If no tag is provided, all tags are pushed.
func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error {
// FIXME: Allow to interrupt current push when new push of same image is done.
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
repoInfo, err := config.RegistryService.ResolveRepository(ref)
if err != nil {
return err
}
endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return err
}
progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
associations := config.ReferenceStore.ReferencesByName(repoInfo.Name)
if len(associations) == 0 {
return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
}
@ -87,14 +54,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
}
}
logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
logrus.Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL)
pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
if err != nil {
lastErr = err
continue
}
if err := pusher.Push(ctx); err != nil {
if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil {
// Was this push cancelled? If so, don't try to fall
// back.
select {
@ -115,7 +77,7 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
return err
}
imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
return nil
}

View file

@ -34,13 +34,26 @@ const (
middleLayerMaximumSize = 10 * (1 << 20) // 10MB
)
type v2Pusher struct {
v2MetadataService metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository
// newPusher creates a new pusher for pushing to a v2 registry.
// The parameters are passed through to the underlying pusher implementation for
// use during the actual push operation.
func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher {
return &pusher{
metadataService: metadata.NewV2MetadataService(config.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: config,
}
}
type pusher struct {
metadataService metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository
// pushState is state built by the Upload functions.
pushState pushState
@ -56,17 +69,18 @@ type pushState struct {
hasAuthInfo bool
}
func (p *v2Pusher) Push(ctx context.Context) (err error) {
// TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
func (p *pusher) push(ctx context.Context) (err error) {
p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
p.repo, err = newRepository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
p.pushState.hasAuthInfo = p.config.AuthConfig.RegistryToken != "" || (p.config.AuthConfig.Username != "" && p.config.AuthConfig.Password != "")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return err
}
if err = p.pushV2Repository(ctx); err != nil {
if err = p.pushRepository(ctx); err != nil {
if continueOnError(err, p.endpoint.Mirror) {
return fallbackError{
err: err,
@ -77,14 +91,14 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) {
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
func (p *pusher) pushRepository(ctx context.Context) (err error) {
if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
imageID, err := p.config.ReferenceStore.Get(p.ref)
if err != nil {
return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
}
return p.pushV2Tag(ctx, namedTagged, imageID)
return p.pushTag(ctx, namedTagged, imageID)
}
if !reference.IsNameOnly(p.ref) {
@ -96,7 +110,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
pushed++
if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
if err := p.pushTag(ctx, namedTagged, association.ID); err != nil {
return err
}
}
@ -109,7 +123,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
return nil
}
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
imgConfig, err := p.config.ImageStore.Get(ctx, id)
@ -117,7 +131,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
}
rootfs, err := p.config.ImageStore.RootFSFromConfig(imgConfig)
rootfs, err := rootFSFromConfig(imgConfig)
if err != nil {
return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
}
@ -135,14 +149,14 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
var descriptors []xfer.UploadDescriptor
descriptorTemplate := v2PushDescriptor{
v2MetadataService: p.v2MetadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo.Name,
ref: p.ref,
endpoint: p.endpoint,
repo: p.repo,
pushState: &p.pushState,
descriptorTemplate := pushDescriptor{
metadataService: p.metadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo.Name,
ref: p.ref,
endpoint: p.endpoint,
repo: p.repo,
pushState: &p.pushState,
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
@ -243,7 +257,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len(descriptors) - 1; i >= 0; i-- {
if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil {
return nil, err
}
}
@ -251,33 +265,33 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
return builder.Build(ctx)
}
type v2PushDescriptor struct {
layer PushLayer
v2MetadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
endpoint registry.APIEndpoint
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
type pushDescriptor struct {
layer PushLayer
metadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
endpoint registry.APIEndpoint
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
// a set of digests whose presence has been checked in a target repository
checkedDigests map[digest.Digest]struct{}
}
func (pd *v2PushDescriptor) Key() string {
func (pd *pushDescriptor) Key() string {
return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
}
func (pd *v2PushDescriptor) ID() string {
func (pd *pushDescriptor) ID() string {
return stringid.TruncateID(pd.layer.DiffID().String())
}
func (pd *v2PushDescriptor) DiffID() layer.DiffID {
func (pd *pushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
// Skip foreign layers unless this registry allows nondistributable artifacts.
if !pd.endpoint.AllowNondistributableArtifacts {
if fs, ok := pd.layer.(distribution.Describable); ok {
@ -303,10 +317,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
metaData, err := pd.metadataService.GetMetadata(diffID)
if err == nil {
// check for blob existence in the target repository
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata)
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData)
if exists || err != nil {
return descriptor, err
}
@ -319,7 +333,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
var layerUpload distribution.BlobWriter
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData)
isUnauthorizedError := false
for _, mc := range candidates {
mountCandidate := mc
@ -329,8 +343,8 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if len(mountCandidate.SourceRepository) > 0 {
namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
if err != nil {
logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err)
pd.v2MetadataService.Remove(mountCandidate)
logrus.WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef))
_ = pd.metadataService.Remove(mountCandidate)
continue
}
@ -338,13 +352,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// with only path to set mount from with
remoteRef, err := reference.WithName(reference.Path(namedRef))
if err != nil {
logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err)
logrus.WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef))
continue
}
canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
if err != nil {
logrus.Errorf("failed to make canonical reference: %v", err)
logrus.WithError(err).Error("failed to make canonical reference")
continue
}
@ -366,7 +380,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: err.Descriptor.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -400,7 +414,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
cause = fmt.Sprintf("an error: %v", err.Error())
}
logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
pd.v2MetadataService.Remove(mountCandidate)
_ = pd.metadataService.Remove(mountCandidate)
}
if lu != nil {
@ -412,7 +426,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if maxExistenceChecks-len(pd.checkedDigests) > 0 {
// do additional layer existence checks with other known digests if any
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData)
if exists || err != nil {
return descriptor, err
}
@ -430,15 +444,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
func (pd *pushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}
func (pd *v2PushDescriptor) uploadUsingSession(
func (pd *pushDescriptor) uploadUsingSession(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
@ -485,7 +499,7 @@ func (pd *v2PushDescriptor) uploadUsingSession(
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: pushDigest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -509,7 +523,7 @@ func (pd *v2PushDescriptor) uploadUsingSession(
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *v2PushDescriptor) layerAlreadyExists(
func (pd *pushDescriptor) layerAlreadyExists(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
@ -558,7 +572,7 @@ attempts:
case nil:
if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
// cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: desc.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -571,7 +585,7 @@ attempts:
case distribution.ErrBlobUnknown:
if meta.SourceRepository == pd.repoInfo.Name() {
// remove the mapping to the target repository
pd.v2MetadataService.Remove(*meta)
pd.metadataService.Remove(*meta)
}
default:
logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())

View file

@ -395,16 +395,16 @@ func TestLayerAlreadyExists(t *testing.T) {
}
ctx := context.Background()
ms := &mockV2MetadataService{}
pd := &v2PushDescriptor{
pd := &pushDescriptor{
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},
checkedDigests: make(map[digest.Digest]struct{}),
repo: repo,
metadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},
checkedDigests: make(map[digest.Digest]struct{}),
}
desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata)
@ -522,7 +522,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) {
}
imagePushConfig.ReferenceStore = &mockReferenceStore{}
repoInfo, _ := reference.ParseNormalizedNamed("xujihui1985/test.img")
pusher := &v2Pusher{
pusher := &pusher{
config: imagePushConfig,
repoInfo: &registry.RepositoryInfo{
Name: repoInfo,
@ -536,7 +536,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) {
TrimHostname: true,
},
}
pusher.Push(context.Background())
pusher.push(context.Background())
if pusher.pushState.hasAuthInfo != authInfo.expected {
t.Errorf("hasAuthInfo does not match expected: %t != %t", authInfo.expected, pusher.pushState.hasAuthInfo)
}
@ -598,14 +598,14 @@ func TestPushRegistryWhenAuthInfoEmpty(t *testing.T) {
requests: []string{},
},
}
pd := &v2PushDescriptor{
pd := &pushDescriptor{
hmacKey: []byte("abcd"),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
repo: repo,
metadataService: ms,
pushState: &pushState{
remoteLayers: make(map[layer.DiffID]distribution.Descriptor),
hasAuthInfo: false,

View file

@ -19,43 +19,44 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// ImageTypes represents the schema2 config types for images
var ImageTypes = []string{
schema2.MediaTypeImageConfig,
ocispec.MediaTypeImageConfig,
// Handle unexpected values from https://github.com/docker/distribution/issues/1621
// (see also https://github.com/docker/docker/issues/22378,
// https://github.com/docker/docker/issues/30083)
"application/octet-stream",
"application/json",
"text/html",
// Treat defaulted values as images, newer types cannot be implied
"",
}
var (
// defaultImageTypes represents the schema2 config types for images
defaultImageTypes = []string{
schema2.MediaTypeImageConfig,
ocispec.MediaTypeImageConfig,
// Handle unexpected values from https://github.com/docker/distribution/issues/1621
// (see also https://github.com/docker/docker/issues/22378,
// https://github.com/docker/docker/issues/30083)
"application/octet-stream",
"application/json",
"text/html",
// Treat defaulted values as images, newer types cannot be implied
"",
}
// PluginTypes represents the schema2 config types for plugins
var PluginTypes = []string{
schema2.MediaTypePluginConfig,
}
// pluginTypes represents the schema2 config types for plugins
pluginTypes = []string{
schema2.MediaTypePluginConfig,
}
var mediaTypeClasses map[string]string
mediaTypeClasses map[string]string
)
func init() {
// initialize media type classes with all know types for
// plugin
// initialize media type classes with all know types for images and plugins.
mediaTypeClasses = map[string]string{}
for _, t := range ImageTypes {
for _, t := range defaultImageTypes {
mediaTypeClasses[t] = "image"
}
for _, t := range PluginTypes {
for _, t := range pluginTypes {
mediaTypeClasses[t] = "plugin"
}
}
// NewV2Repository returns a repository (v2 only). It creates an HTTP transport
// newRepository returns a repository (v2 only). It creates an HTTP transport
// providing timeout settings and authentication support, and also verifies the
// remote API version.
func NewV2Repository(
func newRepository(
ctx context.Context, repoInfo *registry.RepositoryInfo, endpoint registry.APIEndpoint,
metaHeaders http.Header, authConfig *types.AuthConfig, actions ...string,
) (repo distribution.Repository, err error) {

View file

@ -67,15 +67,10 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
RegistryToken: secretRegistryToken,
},
},
Schema2Types: ImageTypes,
}
puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
if err != nil {
t.Fatal(err)
}
p := puller.(*v2Puller)
p := newPuller(endpoint, repoInfo, imagePullConfig, nil)
ctx := context.Background()
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
p.repo, err = newRepository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
t.Fatal(err)
}
@ -83,7 +78,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
logrus.Debug("About to pull")
// We expect it to fail, since we haven't mock'd the full registry exchange in our handler above
tag, _ := reference.WithTag(n, "tag_goes_here")
_ = p.pullV2Repository(ctx, tag)
_ = p.pullRepository(ctx, tag)
}
func TestTokenPassThru(t *testing.T) {

View file

@ -0,0 +1,34 @@
package distribution
import (
"context"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/docker/errdefs"
)
// GetRepository returns a repository from the registry.
func GetRepository(ctx context.Context, ref reference.Named, config *ImagePullConfig) (repository distribution.Repository, lastError error) {
repoInfo, err := config.RegistryService.ResolveRepository(ref)
if err != nil {
return nil, errdefs.InvalidParameter(err)
}
// makes sure name is not empty or `scratch`
if err := validateRepoName(repoInfo.Name); err != nil {
return nil, errdefs.InvalidParameter(err)
}
endpoints, err := config.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return nil, err
}
for _, endpoint := range endpoints {
repository, lastError = newRepository(ctx, repoInfo, endpoint, nil, config.AuthConfig, "pull")
if lastError == nil {
break
}
}
return repository, lastError
}