2018-02-05 21:05:59 +00:00
package distribution // import "github.com/docker/docker/distribution"
2015-11-18 22:18:44 +00:00
import (
2018-04-19 22:30:59 +00:00
"context"
2015-11-18 22:18:44 +00:00
"fmt"
"io"
2020-07-29 14:33:40 +00:00
"os"
2019-06-18 00:23:44 +00:00
"runtime"
2016-09-18 08:55:28 +00:00
"sort"
"strings"
2015-11-14 00:59:01 +00:00
"sync"
2015-11-18 22:18:44 +00:00
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
2015-12-11 00:27:58 +00:00
"github.com/docker/distribution/manifest/schema2"
2017-01-26 00:54:18 +00:00
"github.com/docker/distribution/reference"
2018-03-07 11:30:17 +00:00
"github.com/docker/distribution/registry/api/errcode"
2015-12-11 00:27:58 +00:00
"github.com/docker/distribution/registry/client"
2016-12-22 19:44:09 +00:00
apitypes "github.com/docker/docker/api/types"
2015-11-18 22:18:44 +00:00
"github.com/docker/docker/distribution/metadata"
2015-11-14 00:59:01 +00:00
"github.com/docker/docker/distribution/xfer"
2015-11-18 22:18:44 +00:00
"github.com/docker/docker/layer"
2015-11-14 00:59:01 +00:00
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
2015-11-18 22:18:44 +00:00
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
2019-08-05 14:37:47 +00:00
digest "github.com/opencontainers/go-digest"
2020-07-29 14:33:40 +00:00
"github.com/pkg/errors"
2017-07-26 21:42:13 +00:00
"github.com/sirupsen/logrus"
2015-11-18 22:18:44 +00:00
)
2016-09-16 11:58:36 +00:00
const (
smallLayerMaximumSize = 100 * ( 1 << 10 ) // 100KB
middleLayerMaximumSize = 10 * ( 1 << 20 ) // 10MB
2015-11-18 22:18:44 +00:00
)
type v2Pusher struct {
2016-09-21 15:01:09 +00:00
v2MetadataService metadata . V2MetadataService
2016-01-14 03:34:27 +00:00
ref reference . Named
endpoint registry . APIEndpoint
repoInfo * registry . RepositoryInfo
config * ImagePushConfig
repo distribution . Repository
2015-11-18 22:18:44 +00:00
2016-03-01 18:56:05 +00:00
// pushState is state built by the Upload functions.
2015-12-11 00:27:58 +00:00
pushState pushState
2015-11-14 00:59:01 +00:00
}
2015-12-11 00:27:58 +00:00
type pushState struct {
2015-11-14 00:59:01 +00:00
sync . Mutex
2015-12-11 00:27:58 +00:00
// remoteLayers is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers. It is also used to fill in digest and size
// information when building the manifest.
remoteLayers map [ layer . DiffID ] distribution . Descriptor
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool
2018-03-07 11:30:17 +00:00
hasAuthInfo bool
2015-11-18 22:18:44 +00:00
}
2015-12-04 21:42:33 +00:00
func ( p * v2Pusher ) Push ( ctx context . Context ) ( err error ) {
2015-12-11 00:27:58 +00:00
p . pushState . remoteLayers = make ( map [ layer . DiffID ] distribution . Descriptor )
p . repo , p . pushState . confirmedV2 , err = NewV2Repository ( ctx , p . repoInfo , p . endpoint , p . config . MetaHeaders , p . config . AuthConfig , "push" , "pull" )
2018-03-07 11:30:17 +00:00
p . pushState . hasAuthInfo = p . config . AuthConfig . RegistryToken != "" || ( p . config . AuthConfig . Username != "" && p . config . AuthConfig . Password != "" )
2015-11-18 22:18:44 +00:00
if err != nil {
logrus . Debugf ( "Error getting v2 registry: %v" , err )
2016-02-11 23:45:29 +00:00
return err
2015-11-18 22:18:44 +00:00
}
2015-12-04 21:42:33 +00:00
if err = p . pushV2Repository ( ctx ) ; err != nil {
2017-11-15 00:06:17 +00:00
if continueOnError ( err , p . endpoint . Mirror ) {
2016-02-11 23:45:29 +00:00
return fallbackError {
err : err ,
confirmedV2 : p . pushState . confirmedV2 ,
transportOK : true ,
}
2015-12-04 21:42:33 +00:00
}
}
return err
}
func ( p * v2Pusher ) pushV2Repository ( ctx context . Context ) ( err error ) {
2015-12-14 23:44:45 +00:00
if namedTagged , isNamedTagged := p . ref . ( reference . NamedTagged ) ; isNamedTagged {
2015-12-04 21:55:15 +00:00
imageID , err := p . config . ReferenceStore . Get ( p . ref )
2015-11-18 22:18:44 +00:00
if err != nil {
2017-01-26 00:54:18 +00:00
return fmt . Errorf ( "tag does not exist: %s" , reference . FamiliarString ( p . ref ) )
2015-11-18 22:18:44 +00:00
}
2015-12-14 23:44:45 +00:00
return p . pushV2Tag ( ctx , namedTagged , imageID )
2015-11-18 22:18:44 +00:00
}
2015-12-14 23:44:45 +00:00
if ! reference . IsNameOnly ( p . ref ) {
return errors . New ( "cannot push a digest reference" )
2015-11-18 22:18:44 +00:00
}
2020-06-16 13:05:10 +00:00
// Push all tags
2015-12-14 23:44:45 +00:00
pushed := 0
for _ , association := range p . config . ReferenceStore . ReferencesByName ( p . ref ) {
if namedTagged , isNamedTagged := association . Ref . ( reference . NamedTagged ) ; isNamedTagged {
pushed ++
2016-09-15 23:37:32 +00:00
if err := p . pushV2Tag ( ctx , namedTagged , association . ID ) ; err != nil {
2015-12-14 23:44:45 +00:00
return err
}
2015-11-18 22:18:44 +00:00
}
}
2015-12-14 23:44:45 +00:00
if pushed == 0 {
2017-01-26 00:54:18 +00:00
return fmt . Errorf ( "no tags to push for %s" , reference . FamiliarName ( p . repoInfo . Name ) )
2015-12-14 23:44:45 +00:00
}
2015-12-04 21:42:33 +00:00
return nil
2015-11-18 22:18:44 +00:00
}
2016-09-15 23:37:32 +00:00
func ( p * v2Pusher ) pushV2Tag ( ctx context . Context , ref reference . NamedTagged , id digest . Digest ) error {
2017-01-26 00:54:18 +00:00
logrus . Debugf ( "Pushing repository: %s" , reference . FamiliarString ( ref ) )
2015-11-18 22:18:44 +00:00
2016-12-16 19:19:05 +00:00
imgConfig , err := p . config . ImageStore . Get ( id )
2015-11-18 22:18:44 +00:00
if err != nil {
2017-01-26 00:54:18 +00:00
return fmt . Errorf ( "could not find image from tag %s: %v" , reference . FamiliarString ( ref ) , err )
2015-11-18 22:18:44 +00:00
}
2018-02-15 21:17:27 +00:00
rootfs , err := p . config . ImageStore . RootFSFromConfig ( imgConfig )
2016-12-16 19:19:05 +00:00
if err != nil {
2017-01-26 00:54:18 +00:00
return fmt . Errorf ( "unable to get rootfs for image %s: %s" , reference . FamiliarString ( ref ) , err )
2016-12-16 19:19:05 +00:00
}
2015-11-18 22:18:44 +00:00
2018-02-15 21:17:27 +00:00
platform , err := p . config . ImageStore . PlatformFromConfig ( imgConfig )
if err != nil {
return fmt . Errorf ( "unable to get platform for image %s: %s" , reference . FamiliarString ( ref ) , err )
}
l , err := p . config . LayerStores [ platform . OS ] . Get ( rootfs . ChainID ( ) )
2016-12-16 19:19:05 +00:00
if err != nil {
return fmt . Errorf ( "failed to get top layer from image: %v" , err )
2015-11-18 22:18:44 +00:00
}
2016-12-16 19:19:05 +00:00
defer l . Release ( )
2015-11-18 22:18:44 +00:00
2016-09-18 08:55:28 +00:00
hmacKey , err := metadata . ComputeV2MetadataHMACKey ( p . config . AuthConfig )
if err != nil {
return fmt . Errorf ( "failed to compute hmac key of auth config: %v" , err )
}
2015-11-14 00:59:01 +00:00
var descriptors [ ] xfer . UploadDescriptor
2015-11-18 22:18:44 +00:00
2015-12-04 21:42:33 +00:00
descriptorTemplate := v2PushDescriptor {
2016-01-14 03:34:27 +00:00
v2MetadataService : p . v2MetadataService ,
2016-09-18 08:55:28 +00:00
hmacKey : hmacKey ,
2017-01-26 00:54:18 +00:00
repoInfo : p . repoInfo . Name ,
2016-06-18 05:27:03 +00:00
ref : p . ref ,
2017-05-09 21:00:31 +00:00
endpoint : p . endpoint ,
2016-01-14 03:34:27 +00:00
repo : p . repo ,
pushState : & p . pushState ,
2015-11-18 22:18:44 +00:00
}
2015-11-14 00:59:01 +00:00
// Loop bounds condition is to avoid pushing the base layer on Windows.
2017-06-27 06:47:15 +00:00
for range rootfs . DiffIDs {
2015-12-04 21:42:33 +00:00
descriptor := descriptorTemplate
descriptor . layer = l
2016-09-16 11:58:36 +00:00
descriptor . checkedDigests = make ( map [ digest . Digest ] struct { } )
2015-12-04 21:42:33 +00:00
descriptors = append ( descriptors , & descriptor )
2015-11-18 22:18:44 +00:00
l = l . Parent ( )
}
2015-12-11 00:27:58 +00:00
if err := p . config . UploadManager . Upload ( ctx , descriptors , p . config . ProgressOutput ) ; err != nil {
2015-11-14 00:59:01 +00:00
return err
}
2015-12-14 23:44:45 +00:00
// Try schema2 first
2016-12-16 19:19:05 +00:00
builder := schema2 . NewManifestBuilder ( p . repo . Blobs ( ctx ) , p . config . ConfigMediaType , imgConfig )
2015-12-14 23:44:45 +00:00
manifest , err := manifestFromBuilder ( ctx , builder , descriptors )
2015-11-18 22:18:44 +00:00
if err != nil {
return err
}
2015-12-14 23:44:45 +00:00
manSvc , err := p . repo . Manifests ( ctx )
2015-11-18 22:18:44 +00:00
if err != nil {
return err
}
2015-12-14 23:44:45 +00:00
2016-04-12 00:29:17 +00:00
putOptions := [ ] distribution . ManifestServiceOption { distribution . WithTag ( ref . Tag ( ) ) }
2015-12-14 23:44:45 +00:00
if _ , err = manSvc . Put ( ctx , manifest , putOptions ... ) ; err != nil {
2019-06-18 00:23:44 +00:00
if runtime . GOOS == "windows" || p . config . TrustKey == nil || p . config . RequireSchema2 {
logrus . Warnf ( "failed to upload schema2 manifest: %v" , err )
return err
}
2020-07-29 14:33:40 +00:00
// This is a temporary environment variables used in CI to allow pushing
// manifest v2 schema 1 images to test-registries used for testing *pulling*
// these images.
if os . Getenv ( "DOCKER_ALLOW_SCHEMA1_PUSH_DONOTUSE" ) == "" {
if err . Error ( ) == "tag invalid" {
msg := "[DEPRECATED] support for pushing manifest v2 schema1 images has been removed. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/"
logrus . WithError ( err ) . Error ( msg )
return errors . Wrap ( err , msg )
}
return err
}
2019-06-18 00:23:44 +00:00
logrus . Warnf ( "failed to upload schema2 manifest: %v - falling back to schema1" , err )
2020-03-11 18:50:58 +00:00
// Note: this fallback is deprecated, see log messages below
2019-06-18 00:23:44 +00:00
manifestRef , err := reference . WithTag ( p . repo . Named ( ) , ref . Tag ( ) )
if err != nil {
return err
}
builder = schema1 . NewConfigManifestBuilder ( p . repo . Blobs ( ctx ) , p . config . TrustKey , manifestRef , imgConfig )
manifest , err = manifestFromBuilder ( ctx , builder , descriptors )
if err != nil {
return err
}
if _ , err = manSvc . Put ( ctx , manifest , putOptions ... ) ; err != nil {
return err
}
2020-03-11 18:50:58 +00:00
// schema2 failed but schema1 succeeded
2020-07-29 14:33:40 +00:00
msg := fmt . Sprintf ( "[DEPRECATION NOTICE] support for pushing manifest v2 schema1 images will be removed in an upcoming release. Please contact admins of the %s registry NOW to avoid future disruption. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/" , reference . Domain ( ref ) )
2020-03-11 18:50:58 +00:00
logrus . Warn ( msg )
progress . Message ( p . config . ProgressOutput , "" , msg )
2015-11-18 22:18:44 +00:00
}
2015-12-14 23:44:45 +00:00
var canonicalManifest [ ] byte
switch v := manifest . ( type ) {
case * schema1 . SignedManifest :
canonicalManifest = v . Canonical
case * schema2 . DeserializedManifest :
_ , canonicalManifest , err = v . Payload ( )
if err != nil {
return err
}
2015-11-18 22:18:44 +00:00
}
2015-12-11 00:27:58 +00:00
2015-12-14 23:44:45 +00:00
manifestDigest := digest . FromBytes ( canonicalManifest )
progress . Messagef ( p . config . ProgressOutput , "" , "%s: digest: %s size: %d" , ref . Tag ( ) , manifestDigest , len ( canonicalManifest ) )
2016-06-27 17:09:57 +00:00
2016-09-15 23:37:32 +00:00
if err := addDigestReference ( p . config . ReferenceStore , ref , manifestDigest , id ) ; err != nil {
2016-06-27 17:09:57 +00:00
return err
}
2015-12-14 23:44:45 +00:00
// Signal digest to the trust client so it can sign the
// push, if appropriate.
2016-12-22 19:44:09 +00:00
progress . Aux ( p . config . ProgressOutput , apitypes . PushResult { Tag : ref . Tag ( ) , Digest : manifestDigest . String ( ) , Size : len ( canonicalManifest ) } )
2015-12-14 23:44:45 +00:00
return nil
}
func manifestFromBuilder ( ctx context . Context , builder distribution . ManifestBuilder , descriptors [ ] xfer . UploadDescriptor ) ( distribution . Manifest , error ) {
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len ( descriptors ) - 1 ; i >= 0 ; i -- {
if err := builder . AppendReference ( descriptors [ i ] . ( * v2PushDescriptor ) ) ; err != nil {
return nil , err
}
2015-12-11 00:27:58 +00:00
}
2015-12-14 23:44:45 +00:00
return builder . Build ( ctx )
2015-11-18 22:18:44 +00:00
}
2015-11-14 00:59:01 +00:00
type v2PushDescriptor struct {
2016-12-16 19:19:05 +00:00
layer PushLayer
2016-09-21 15:01:09 +00:00
v2MetadataService metadata . V2MetadataService
2016-09-18 08:55:28 +00:00
hmacKey [ ] byte
2016-01-14 03:34:27 +00:00
repoInfo reference . Named
2016-06-18 05:27:03 +00:00
ref reference . Named
2017-05-09 21:00:31 +00:00
endpoint registry . APIEndpoint
2016-01-14 03:34:27 +00:00
repo distribution . Repository
pushState * pushState
2016-03-01 18:56:05 +00:00
remoteDescriptor distribution . Descriptor
2016-09-16 11:58:36 +00:00
// a set of digests whose presence has been checked in a target repository
checkedDigests map [ digest . Digest ] struct { }
2015-11-14 00:59:01 +00:00
}
func ( pd * v2PushDescriptor ) Key ( ) string {
2017-01-26 00:54:18 +00:00
return "v2push:" + pd . ref . Name ( ) + " " + pd . layer . DiffID ( ) . String ( )
2015-11-14 00:59:01 +00:00
}
func ( pd * v2PushDescriptor ) ID ( ) string {
return stringid . TruncateID ( pd . layer . DiffID ( ) . String ( ) )
}
func ( pd * v2PushDescriptor ) DiffID ( ) layer . DiffID {
return pd . layer . DiffID ( )
}
2016-03-01 18:56:05 +00:00
func ( pd * v2PushDescriptor ) Upload ( ctx context . Context , progressOutput progress . Output ) ( distribution . Descriptor , error ) {
2017-05-09 21:00:31 +00:00
// Skip foreign layers unless this registry allows nondistributable artifacts.
if ! pd . endpoint . AllowNondistributableArtifacts {
if fs , ok := pd . layer . ( distribution . Describable ) ; ok {
if d := fs . Descriptor ( ) ; len ( d . URLs ) > 0 {
progress . Update ( progressOutput , pd . ID ( ) , "Skipped foreign layer" )
return d , nil
}
2016-05-26 02:11:51 +00:00
}
}
2015-11-14 00:59:01 +00:00
diffID := pd . DiffID ( )
2015-12-11 00:27:58 +00:00
pd . pushState . Lock ( )
2016-03-01 18:56:05 +00:00
if descriptor , ok := pd . pushState . remoteLayers [ diffID ] ; ok {
2015-12-11 00:27:58 +00:00
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd . pushState . Unlock ( )
progress . Update ( progressOutput , pd . ID ( ) , "Layer already exists" )
2016-03-01 18:56:05 +00:00
return descriptor , nil
2015-12-11 00:27:58 +00:00
}
pd . pushState . Unlock ( )
2015-11-18 22:18:44 +00:00
2016-09-16 11:58:36 +00:00
maxMountAttempts , maxExistenceChecks , checkOtherRepositories := getMaxMountAndExistenceCheckAttempts ( pd . layer )
2016-01-14 03:34:27 +00:00
// Do we have any metadata associated with this layer's DiffID?
v2Metadata , err := pd . v2MetadataService . GetMetadata ( diffID )
2015-11-18 22:18:44 +00:00
if err == nil {
2017-03-08 05:56:46 +00:00
// check for blob existence in the target repository
descriptor , exists , err := pd . layerAlreadyExists ( ctx , progressOutput , diffID , true , 1 , v2Metadata )
2016-09-16 11:58:36 +00:00
if exists || err != nil {
return descriptor , err
2015-11-18 22:18:44 +00:00
}
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob.
2015-11-14 00:59:01 +00:00
bs := pd . repo . Blobs ( ctx )
2016-02-22 19:27:17 +00:00
var layerUpload distribution . BlobWriter
2016-09-18 08:55:28 +00:00
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
2016-09-16 11:58:36 +00:00
candidates := getRepositoryMountCandidates ( pd . repoInfo , pd . hmacKey , maxMountAttempts , v2Metadata )
2018-03-07 11:30:17 +00:00
isUnauthorizedError := false
2016-09-18 08:55:28 +00:00
for _ , mountCandidate := range candidates {
logrus . Debugf ( "attempting to mount layer %s (%s) from %s" , diffID , mountCandidate . Digest , mountCandidate . SourceRepository )
createOpts := [ ] distribution . BlobCreateOption { }
if len ( mountCandidate . SourceRepository ) > 0 {
2017-01-26 00:54:18 +00:00
namedRef , err := reference . ParseNormalizedNamed ( mountCandidate . SourceRepository )
2016-09-18 08:55:28 +00:00
if err != nil {
2017-01-26 00:54:18 +00:00
logrus . Errorf ( "failed to parse source repository reference %v: %v" , reference . FamiliarString ( namedRef ) , err )
2016-09-18 08:55:28 +00:00
pd . v2MetadataService . Remove ( mountCandidate )
continue
}
2016-01-05 22:17:42 +00:00
2017-01-26 00:54:18 +00:00
// Candidates are always under same domain, create remote reference
// with only path to set mount from with
remoteRef , err := reference . WithName ( reference . Path ( namedRef ) )
2016-09-18 08:55:28 +00:00
if err != nil {
2017-01-26 00:54:18 +00:00
logrus . Errorf ( "failed to make remote reference out of %q: %v" , reference . Path ( namedRef ) , err )
2016-09-18 08:55:28 +00:00
continue
}
2016-01-05 22:17:42 +00:00
2017-01-26 00:54:18 +00:00
canonicalRef , err := reference . WithDigest ( reference . TrimNamed ( remoteRef ) , mountCandidate . Digest )
2016-09-18 08:55:28 +00:00
if err != nil {
logrus . Errorf ( "failed to make canonical reference: %v" , err )
continue
}
2016-01-05 22:17:42 +00:00
2016-09-18 08:55:28 +00:00
createOpts = append ( createOpts , client . WithMountFrom ( canonicalRef ) )
2016-01-05 22:17:42 +00:00
}
2016-01-14 03:34:27 +00:00
2016-09-18 08:55:28 +00:00
// send the layer
lu , err := bs . Create ( ctx , createOpts ... )
2016-02-22 19:27:17 +00:00
switch err := err . ( type ) {
2016-09-18 08:55:28 +00:00
case nil :
// noop
2016-02-22 19:27:17 +00:00
case distribution . ErrBlobMounted :
progress . Updatef ( progressOutput , pd . ID ( ) , "Mounted from %s" , err . From . Name ( ) )
2016-01-14 03:34:27 +00:00
2016-02-22 19:27:17 +00:00
err . Descriptor . MediaType = schema2 . MediaTypeLayer
2016-01-20 19:39:32 +00:00
2016-02-22 19:27:17 +00:00
pd . pushState . Lock ( )
pd . pushState . confirmedV2 = true
pd . pushState . remoteLayers [ diffID ] = err . Descriptor
pd . pushState . Unlock ( )
2016-01-14 03:34:27 +00:00
2016-02-22 19:27:17 +00:00
// Cache mapping from this layer's DiffID to the blobsum
2016-09-18 08:55:28 +00:00
if err := pd . v2MetadataService . TagAndAdd ( diffID , pd . hmacKey , metadata . V2Metadata {
Digest : err . Descriptor . Digest ,
2017-01-26 00:54:18 +00:00
SourceRepository : pd . repoInfo . Name ( ) ,
2016-09-18 08:55:28 +00:00
} ) ; err != nil {
2016-03-01 18:56:05 +00:00
return distribution . Descriptor { } , xfer . DoNotRetry { Err : err }
2016-02-22 19:27:17 +00:00
}
2016-03-01 18:56:05 +00:00
return err . Descriptor , nil
2018-03-07 11:30:17 +00:00
case errcode . Errors :
for _ , e := range err {
switch e := e . ( type ) {
case errcode . Error :
if e . Code == errcode . ErrorCodeUnauthorized {
// when unauthorized error that indicate user don't has right to push layer to register
logrus . Debugln ( "failed to push layer to registry because unauthorized error" )
isUnauthorizedError = true
}
default :
}
}
2016-02-22 19:27:17 +00:00
default :
2016-09-18 08:55:28 +00:00
logrus . Infof ( "failed to mount layer %s (%s) from %s: %v" , diffID , mountCandidate . Digest , mountCandidate . SourceRepository , err )
}
2018-03-07 11:30:17 +00:00
// when error is unauthorizedError and user don't hasAuthInfo that's the case user don't has right to push layer to register
// and he hasn't login either, in this case candidate cache should be removed
2016-09-18 08:55:28 +00:00
if len ( mountCandidate . SourceRepository ) > 0 &&
2018-03-07 11:30:17 +00:00
! ( isUnauthorizedError && ! pd . pushState . hasAuthInfo ) &&
2016-09-18 08:55:28 +00:00
( metadata . CheckV2MetadataHMAC ( & mountCandidate , pd . hmacKey ) ||
len ( mountCandidate . HMAC ) == 0 ) {
cause := "blob mount failure"
if err != nil {
cause = fmt . Sprintf ( "an error: %v" , err . Error ( ) )
}
logrus . Debugf ( "removing association between layer %s and %s due to %s" , mountCandidate . Digest , mountCandidate . SourceRepository , cause )
pd . v2MetadataService . Remove ( mountCandidate )
}
2016-09-16 12:05:51 +00:00
if lu != nil {
// cancel previous upload
cancelLayerUpload ( ctx , mountCandidate . Digest , layerUpload )
layerUpload = lu
2016-01-14 03:34:27 +00:00
}
}
2016-09-16 11:58:36 +00:00
if maxExistenceChecks - len ( pd . checkedDigests ) > 0 {
// do additional layer existence checks with other known digests if any
descriptor , exists , err := pd . layerAlreadyExists ( ctx , progressOutput , diffID , checkOtherRepositories , maxExistenceChecks - len ( pd . checkedDigests ) , v2Metadata )
if exists || err != nil {
return descriptor , err
2016-01-14 03:34:27 +00:00
}
}
2016-09-16 11:58:36 +00:00
logrus . Debugf ( "Pushing layer: %s" , diffID )
2016-02-22 19:27:17 +00:00
if layerUpload == nil {
layerUpload , err = bs . Create ( ctx )
if err != nil {
2016-03-01 18:56:05 +00:00
return distribution . Descriptor { } , retryOnError ( err )
2016-02-22 19:27:17 +00:00
}
2015-11-14 00:59:01 +00:00
}
defer layerUpload . Close ( )
2016-09-18 08:55:28 +00:00
// upload the blob
2017-09-11 18:55:05 +00:00
return pd . uploadUsingSession ( ctx , progressOutput , diffID , layerUpload )
2016-09-18 08:55:28 +00:00
}
func ( pd * v2PushDescriptor ) SetRemoteDescriptor ( descriptor distribution . Descriptor ) {
pd . remoteDescriptor = descriptor
}
func ( pd * v2PushDescriptor ) Descriptor ( ) distribution . Descriptor {
return pd . remoteDescriptor
}
func ( pd * v2PushDescriptor ) uploadUsingSession (
ctx context . Context ,
progressOutput progress . Output ,
diffID layer . DiffID ,
layerUpload distribution . BlobWriter ,
) ( distribution . Descriptor , error ) {
2016-12-16 19:19:05 +00:00
var reader io . ReadCloser
contentReader , err := pd . layer . Open ( )
2017-05-09 11:24:21 +00:00
if err != nil {
return distribution . Descriptor { } , retryOnError ( err )
}
2016-12-16 19:19:05 +00:00
size , _ := pd . layer . Size ( )
reader = progress . NewProgressReader ( ioutils . NewCancelReadCloser ( ctx , contentReader ) , progressOutput , size , pd . ID ( ) , "Pushing" )
switch m := pd . layer . MediaType ( ) ; m {
case schema2 . MediaTypeUncompressedLayer :
compressedReader , compressionDone := compress ( reader )
defer func ( closer io . Closer ) {
closer . Close ( )
<- compressionDone
} ( reader )
reader = compressedReader
case schema2 . MediaTypeLayer :
default :
2016-01-29 22:34:50 +00:00
reader . Close ( )
2016-12-16 19:19:05 +00:00
return distribution . Descriptor { } , fmt . Errorf ( "unsupported layer media type %s" , m )
}
2015-11-14 00:59:01 +00:00
2017-01-07 01:23:18 +00:00
digester := digest . Canonical . Digester ( )
2016-12-16 19:19:05 +00:00
tee := io . TeeReader ( reader , digester . Hash ( ) )
2015-11-14 00:59:01 +00:00
nn , err := layerUpload . ReadFrom ( tee )
2016-12-16 19:19:05 +00:00
reader . Close ( )
2015-11-14 00:59:01 +00:00
if err != nil {
2016-03-01 18:56:05 +00:00
return distribution . Descriptor { } , retryOnError ( err )
2015-11-14 00:59:01 +00:00
}
pushDigest := digester . Digest ( )
if _ , err := layerUpload . Commit ( ctx , distribution . Descriptor { Digest : pushDigest } ) ; err != nil {
2016-03-01 18:56:05 +00:00
return distribution . Descriptor { } , retryOnError ( err )
2015-11-14 00:59:01 +00:00
}
logrus . Debugf ( "uploaded layer %s (%s), %d bytes" , diffID , pushDigest , nn )
progress . Update ( progressOutput , pd . ID ( ) , "Pushed" )
2015-11-18 22:18:44 +00:00
// Cache mapping from this layer's DiffID to the blobsum
2016-09-18 08:55:28 +00:00
if err := pd . v2MetadataService . TagAndAdd ( diffID , pd . hmacKey , metadata . V2Metadata {
Digest : pushDigest ,
2017-01-26 00:54:18 +00:00
SourceRepository : pd . repoInfo . Name ( ) ,
2016-09-18 08:55:28 +00:00
} ) ; err != nil {
2016-03-01 18:56:05 +00:00
return distribution . Descriptor { } , xfer . DoNotRetry { Err : err }
2015-11-18 22:18:44 +00:00
}
2016-09-16 11:58:36 +00:00
desc := distribution . Descriptor {
2015-12-11 00:27:58 +00:00
Digest : pushDigest ,
MediaType : schema2 . MediaTypeLayer ,
Size : nn ,
}
2016-09-16 11:58:36 +00:00
pd . pushState . Lock ( )
// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
pd . pushState . confirmedV2 = true
pd . pushState . remoteLayers [ diffID ] = desc
2015-12-11 00:27:58 +00:00
pd . pushState . Unlock ( )
2016-09-16 11:58:36 +00:00
return desc , nil
2015-11-18 22:18:44 +00:00
}
2016-09-16 11:58:36 +00:00
// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func ( pd * v2PushDescriptor ) layerAlreadyExists (
ctx context . Context ,
progressOutput progress . Output ,
diffID layer . DiffID ,
checkOtherRepositories bool ,
maxExistenceCheckAttempts int ,
v2Metadata [ ] metadata . V2Metadata ,
) ( desc distribution . Descriptor , exists bool , err error ) {
// filter the metadata
candidates := [ ] metadata . V2Metadata { }
for _ , meta := range v2Metadata {
2017-01-26 00:54:18 +00:00
if len ( meta . SourceRepository ) > 0 && ! checkOtherRepositories && meta . SourceRepository != pd . repoInfo . Name ( ) {
2016-09-16 11:58:36 +00:00
continue
}
candidates = append ( candidates , meta )
}
// sort the candidates by similarity
sortV2MetadataByLikenessAndAge ( pd . repoInfo , pd . hmacKey , candidates )
digestToMetadata := make ( map [ digest . Digest ] * metadata . V2Metadata )
// an array of unique blob digests ordered from the best mount candidates to worst
layerDigests := [ ] digest . Digest { }
for i := 0 ; i < len ( candidates ) ; i ++ {
if len ( layerDigests ) >= maxExistenceCheckAttempts {
break
}
meta := & candidates [ i ]
if _ , exists := digestToMetadata [ meta . Digest ] ; exists {
// keep reference just to the first mapping (the best mount candidate)
2016-01-05 22:17:42 +00:00
continue
}
2016-09-16 11:58:36 +00:00
if _ , exists := pd . checkedDigests [ meta . Digest ] ; exists {
// existence of this digest has already been tested
2016-01-05 22:17:42 +00:00
continue
}
2016-09-16 11:58:36 +00:00
digestToMetadata [ meta . Digest ] = meta
layerDigests = append ( layerDigests , meta . Digest )
}
2016-11-30 01:06:48 +00:00
attempts :
2016-09-16 11:58:36 +00:00
for _ , dgst := range layerDigests {
meta := digestToMetadata [ dgst ]
2017-01-26 00:54:18 +00:00
logrus . Debugf ( "Checking for presence of layer %s (%s) in %s" , diffID , dgst , pd . repoInfo . Name ( ) )
2016-09-16 11:58:36 +00:00
desc , err = pd . repo . Blobs ( ctx ) . Stat ( ctx , dgst )
pd . checkedDigests [ meta . Digest ] = struct { } { }
2015-11-18 22:18:44 +00:00
switch err {
case nil :
2017-01-26 00:54:18 +00:00
if m , ok := digestToMetadata [ desc . Digest ] ; ! ok || m . SourceRepository != pd . repoInfo . Name ( ) || ! metadata . CheckV2MetadataHMAC ( m , pd . hmacKey ) {
2016-09-16 11:58:36 +00:00
// cache mapping from this layer's DiffID to the blobsum
if err := pd . v2MetadataService . TagAndAdd ( diffID , pd . hmacKey , metadata . V2Metadata {
Digest : desc . Digest ,
2017-01-26 00:54:18 +00:00
SourceRepository : pd . repoInfo . Name ( ) ,
2016-09-16 11:58:36 +00:00
} ) ; err != nil {
return distribution . Descriptor { } , false , xfer . DoNotRetry { Err : err }
}
}
desc . MediaType = schema2 . MediaTypeLayer
exists = true
2016-11-30 01:06:48 +00:00
break attempts
2015-11-18 22:18:44 +00:00
case distribution . ErrBlobUnknown :
2017-01-26 00:54:18 +00:00
if meta . SourceRepository == pd . repoInfo . Name ( ) {
2016-09-16 11:58:36 +00:00
// remove the mapping to the target repository
pd . v2MetadataService . Remove ( * meta )
}
2015-11-18 22:18:44 +00:00
default :
2017-01-26 00:54:18 +00:00
logrus . WithError ( err ) . Debugf ( "Failed to check for presence of layer %s (%s) in %s" , diffID , dgst , pd . repoInfo . Name ( ) )
2015-11-18 22:18:44 +00:00
}
}
2016-09-16 11:58:36 +00:00
if exists {
progress . Update ( progressOutput , pd . ID ( ) , "Layer already exists" )
pd . pushState . Lock ( )
pd . pushState . remoteLayers [ diffID ] = desc
pd . pushState . Unlock ( )
}
return desc , exists , nil
}
// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
// source repositories of target registry, maximum number of layer existence checks performed on the target
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
2016-12-16 19:19:05 +00:00
func getMaxMountAndExistenceCheckAttempts ( layer PushLayer ) ( maxMountAttempts , maxExistenceCheckAttempts int , checkOtherRepositories bool ) {
size , err := layer . Size ( )
2016-09-16 11:58:36 +00:00
switch {
// big blob
case size > middleLayerMaximumSize :
// 1st attempt to mount the blob few times
// 2nd few existence checks with digests associated to any repository
// then fallback to upload
return 4 , 3 , true
// middle sized blobs; if we could not get the size, assume we deal with middle sized blob
case size > smallLayerMaximumSize , err != nil :
// 1st attempt to mount blobs of average size few times
// 2nd try at most 1 existence check if there's an existing mapping to the target repository
// then fallback to upload
return 3 , 1 , false
// small blobs, do a minimum number of checks
default :
return 1 , 1 , false
}
2015-11-18 22:18:44 +00:00
}
2016-09-18 08:55:28 +00:00
// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
2017-02-16 12:08:57 +00:00
// array is sorted from youngest to oldest. If requireRegistryMatch is true, the resulting array will contain
2016-09-18 08:55:28 +00:00
// only metadata entries having registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates (
repoInfo reference . Named ,
hmacKey [ ] byte ,
max int ,
v2Metadata [ ] metadata . V2Metadata ,
) [ ] metadata . V2Metadata {
candidates := [ ] metadata . V2Metadata { }
for _ , meta := range v2Metadata {
sourceRepo , err := reference . ParseNamed ( meta . SourceRepository )
2017-01-26 00:54:18 +00:00
if err != nil || reference . Domain ( repoInfo ) != reference . Domain ( sourceRepo ) {
2016-09-18 08:55:28 +00:00
continue
}
// target repository is not a viable candidate
2017-01-26 00:54:18 +00:00
if meta . SourceRepository == repoInfo . Name ( ) {
2016-09-18 08:55:28 +00:00
continue
}
candidates = append ( candidates , meta )
}
sortV2MetadataByLikenessAndAge ( repoInfo , hmacKey , candidates )
if max >= 0 && len ( candidates ) > max {
// select the youngest metadata
candidates = candidates [ : max ]
}
return candidates
}
// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//
// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
// "b" was not
// 2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
arr [ ] metadata . V2Metadata
hmacKey [ ] byte
pathComponents [ ] string
}
func ( bla byLikeness ) Less ( i , j int ) bool {
aMacMatch := metadata . CheckV2MetadataHMAC ( & bla . arr [ i ] , bla . hmacKey )
bMacMatch := metadata . CheckV2MetadataHMAC ( & bla . arr [ j ] , bla . hmacKey )
if aMacMatch != bMacMatch {
return aMacMatch
}
aMatch := numOfMatchingPathComponents ( bla . arr [ i ] . SourceRepository , bla . pathComponents )
bMatch := numOfMatchingPathComponents ( bla . arr [ j ] . SourceRepository , bla . pathComponents )
return aMatch > bMatch
}
func ( bla byLikeness ) Swap ( i , j int ) {
bla . arr [ i ] , bla . arr [ j ] = bla . arr [ j ] , bla . arr [ i ]
}
func ( bla byLikeness ) Len ( ) int { return len ( bla . arr ) }
func sortV2MetadataByLikenessAndAge ( repoInfo reference . Named , hmacKey [ ] byte , marr [ ] metadata . V2Metadata ) {
// reverse the metadata array to shift the newest entries to the beginning
for i := 0 ; i < len ( marr ) / 2 ; i ++ {
marr [ i ] , marr [ len ( marr ) - i - 1 ] = marr [ len ( marr ) - i - 1 ] , marr [ i ]
}
// keep equal entries ordered from the youngest to the oldest
sort . Stable ( byLikeness {
arr : marr ,
hmacKey : hmacKey ,
2017-01-26 00:54:18 +00:00
pathComponents : getPathComponents ( repoInfo . Name ( ) ) ,
2016-09-18 08:55:28 +00:00
} )
}
// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents ( pth string , matchComponents [ ] string ) int {
pthComponents := getPathComponents ( pth )
i := 0
for ; i < len ( pthComponents ) && i < len ( matchComponents ) ; i ++ {
if matchComponents [ i ] != pthComponents [ i ] {
return i
}
}
return i
}
func getPathComponents ( path string ) [ ] string {
return strings . Split ( path , "/" )
}
2016-09-16 12:05:51 +00:00
func cancelLayerUpload ( ctx context . Context , dgst digest . Digest , layerUpload distribution . BlobWriter ) {
if layerUpload != nil {
logrus . Debugf ( "cancelling upload of blob %s" , dgst )
err := layerUpload . Cancel ( ctx )
if err != nil {
logrus . Warnf ( "failed to cancel upload: %v" , err )
2015-11-18 22:18:44 +00:00
}
}
}