123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- package distribution
- import (
- "errors"
- "fmt"
- "io"
- "runtime"
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution"
- "github.com/docker/distribution/digest"
- "github.com/docker/distribution/manifest/schema1"
- "github.com/docker/distribution/manifest/schema2"
- distreference "github.com/docker/distribution/reference"
- "github.com/docker/distribution/registry/client"
- "github.com/docker/docker/distribution/metadata"
- "github.com/docker/docker/distribution/xfer"
- "github.com/docker/docker/image"
- "github.com/docker/docker/layer"
- "github.com/docker/docker/pkg/ioutils"
- "github.com/docker/docker/pkg/progress"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/docker/reference"
- "github.com/docker/docker/registry"
- "golang.org/x/net/context"
- )
- // PushResult contains the tag, manifest digest, and manifest size from the
- // push. It's used to signal this information to the trust code in the client
- // so it can sign the manifest if necessary.
- type PushResult struct {
- Tag string
- Digest digest.Digest
- Size int
- }
- type v2Pusher struct {
- v2MetadataService *metadata.V2MetadataService
- ref reference.Named
- endpoint registry.APIEndpoint
- repoInfo *registry.RepositoryInfo
- config *ImagePushConfig
- repo distribution.Repository
- // pushState is state built by the Upload functions.
- pushState pushState
- }
- type pushState struct {
- sync.Mutex
- // remoteLayers is the set of layers known to exist on the remote side.
- // This avoids redundant queries when pushing multiple tags that
- // involve the same layers. It is also used to fill in digest and size
- // information when building the manifest.
- remoteLayers map[layer.DiffID]distribution.Descriptor
- // confirmedV2 is set to true if we confirm we're talking to a v2
- // registry. This is used to limit fallbacks to the v1 protocol.
- confirmedV2 bool
- }
- func (p *v2Pusher) Push(ctx context.Context) (err error) {
- p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
- p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
- if err != nil {
- logrus.Debugf("Error getting v2 registry: %v", err)
- return err
- }
- if err = p.pushV2Repository(ctx); err != nil {
- if continueOnError(err) {
- return fallbackError{
- err: err,
- confirmedV2: p.pushState.confirmedV2,
- transportOK: true,
- }
- }
- }
- return err
- }
- func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
- if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
- imageID, err := p.config.ReferenceStore.Get(p.ref)
- if err != nil {
- return fmt.Errorf("tag does not exist: %s", p.ref.String())
- }
- return p.pushV2Tag(ctx, namedTagged, imageID)
- }
- if !reference.IsNameOnly(p.ref) {
- return errors.New("cannot push a digest reference")
- }
- // Pull all tags
- pushed := 0
- for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
- if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
- pushed++
- if err := p.pushV2Tag(ctx, namedTagged, association.ImageID); err != nil {
- return err
- }
- }
- }
- if pushed == 0 {
- return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
- }
- return nil
- }
- func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, imageID image.ID) error {
- logrus.Debugf("Pushing repository: %s", ref.String())
- img, err := p.config.ImageStore.Get(imageID)
- if err != nil {
- return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
- }
- var l layer.Layer
- topLayerID := img.RootFS.ChainID()
- if topLayerID == "" {
- l = layer.EmptyLayer
- } else {
- l, err = p.config.LayerStore.Get(topLayerID)
- if err != nil {
- return fmt.Errorf("failed to get top layer from image: %v", err)
- }
- defer layer.ReleaseAndLog(p.config.LayerStore, l)
- }
- var descriptors []xfer.UploadDescriptor
- descriptorTemplate := v2PushDescriptor{
- v2MetadataService: p.v2MetadataService,
- repoInfo: p.repoInfo,
- ref: p.ref,
- repo: p.repo,
- pushState: &p.pushState,
- }
- // Loop bounds condition is to avoid pushing the base layer on Windows.
- for i := 0; i < len(img.RootFS.DiffIDs); i++ {
- descriptor := descriptorTemplate
- descriptor.layer = l
- descriptors = append(descriptors, &descriptor)
- l = l.Parent()
- }
- if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
- return err
- }
- // Try schema2 first
- builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
- manifest, err := manifestFromBuilder(ctx, builder, descriptors)
- if err != nil {
- return err
- }
- manSvc, err := p.repo.Manifests(ctx)
- if err != nil {
- return err
- }
- putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
- if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
- if runtime.GOOS == "windows" {
- logrus.Warnf("failed to upload schema2 manifest: %v", err)
- return err
- }
- logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
- manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
- if err != nil {
- return err
- }
- builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
- manifest, err = manifestFromBuilder(ctx, builder, descriptors)
- if err != nil {
- return err
- }
- if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
- return err
- }
- }
- var canonicalManifest []byte
- switch v := manifest.(type) {
- case *schema1.SignedManifest:
- canonicalManifest = v.Canonical
- case *schema2.DeserializedManifest:
- _, canonicalManifest, err = v.Payload()
- if err != nil {
- return err
- }
- }
- manifestDigest := digest.FromBytes(canonicalManifest)
- progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
- if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, imageID); err != nil {
- return err
- }
- // Signal digest to the trust client so it can sign the
- // push, if appropriate.
- progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})
- return nil
- }
- func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
- // descriptors is in reverse order; iterate backwards to get references
- // appended in the right order.
- for i := len(descriptors) - 1; i >= 0; i-- {
- if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
- return nil, err
- }
- }
- return builder.Build(ctx)
- }
- type v2PushDescriptor struct {
- layer layer.Layer
- v2MetadataService *metadata.V2MetadataService
- repoInfo reference.Named
- ref reference.Named
- repo distribution.Repository
- pushState *pushState
- remoteDescriptor distribution.Descriptor
- }
- func (pd *v2PushDescriptor) Key() string {
- return "v2push:" + pd.ref.FullName() + " " + pd.layer.DiffID().String()
- }
- func (pd *v2PushDescriptor) ID() string {
- return stringid.TruncateID(pd.layer.DiffID().String())
- }
- func (pd *v2PushDescriptor) DiffID() layer.DiffID {
- return pd.layer.DiffID()
- }
- func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
- if fs, ok := pd.layer.(distribution.Describable); ok {
- if d := fs.Descriptor(); len(d.URLs) > 0 {
- progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
- return d, nil
- }
- }
- diffID := pd.DiffID()
- pd.pushState.Lock()
- if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
- // it is already known that the push is not needed and
- // therefore doing a stat is unnecessary
- pd.pushState.Unlock()
- progress.Update(progressOutput, pd.ID(), "Layer already exists")
- return descriptor, nil
- }
- pd.pushState.Unlock()
- // Do we have any metadata associated with this layer's DiffID?
- v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
- if err == nil {
- descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
- if err != nil {
- progress.Update(progressOutput, pd.ID(), "Image push failed")
- return distribution.Descriptor{}, retryOnError(err)
- }
- if exists {
- progress.Update(progressOutput, pd.ID(), "Layer already exists")
- pd.pushState.Lock()
- pd.pushState.remoteLayers[diffID] = descriptor
- pd.pushState.Unlock()
- return descriptor, nil
- }
- }
- logrus.Debugf("Pushing layer: %s", diffID)
- // if digest was empty or not saved, or if blob does not exist on the remote repository,
- // then push the blob.
- bs := pd.repo.Blobs(ctx)
- var layerUpload distribution.BlobWriter
- mountAttemptsRemaining := 3
- // Attempt to find another repository in the same registry to mount the layer
- // from to avoid an unnecessary upload.
- // Note: metadata is stored from oldest to newest, so we iterate through this
- // slice in reverse to maximize our chances of the blob still existing in the
- // remote repository.
- for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
- mountFrom := v2Metadata[i]
- sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
- if err != nil {
- continue
- }
- if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
- // don't mount blobs from another registry
- continue
- }
- namedRef, err := reference.WithName(mountFrom.SourceRepository)
- if err != nil {
- continue
- }
- // TODO (brianbland): We need to construct a reference where the Name is
- // only the full remote name, so clean this up when distribution has a
- // richer reference package
- remoteRef, err := distreference.WithName(namedRef.RemoteName())
- if err != nil {
- continue
- }
- canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
- if err != nil {
- continue
- }
- logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
- layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
- switch err := err.(type) {
- case distribution.ErrBlobMounted:
- progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
- err.Descriptor.MediaType = schema2.MediaTypeLayer
- pd.pushState.Lock()
- pd.pushState.confirmedV2 = true
- pd.pushState.remoteLayers[diffID] = err.Descriptor
- pd.pushState.Unlock()
- // Cache mapping from this layer's DiffID to the blobsum
- if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
- return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
- }
- return err.Descriptor, nil
- case nil:
- // blob upload session created successfully, so begin the upload
- mountAttemptsRemaining = 0
- default:
- // unable to mount layer from this repository, so this source mapping is no longer valid
- logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
- pd.v2MetadataService.Remove(mountFrom)
- mountAttemptsRemaining--
- }
- }
- if layerUpload == nil {
- layerUpload, err = bs.Create(ctx)
- if err != nil {
- return distribution.Descriptor{}, retryOnError(err)
- }
- }
- defer layerUpload.Close()
- arch, err := pd.layer.TarStream()
- if err != nil {
- return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
- }
- // don't care if this fails; best effort
- size, _ := pd.layer.DiffSize()
- reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
- compressedReader, compressionDone := compress(reader)
- defer func() {
- reader.Close()
- <-compressionDone
- }()
- digester := digest.Canonical.New()
- tee := io.TeeReader(compressedReader, digester.Hash())
- nn, err := layerUpload.ReadFrom(tee)
- compressedReader.Close()
- if err != nil {
- return distribution.Descriptor{}, retryOnError(err)
- }
- pushDigest := digester.Digest()
- if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
- return distribution.Descriptor{}, retryOnError(err)
- }
- logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
- progress.Update(progressOutput, pd.ID(), "Pushed")
- // Cache mapping from this layer's DiffID to the blobsum
- if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
- return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
- }
- pd.pushState.Lock()
- // If Commit succeeded, that's an indication that the remote registry
- // speaks the v2 protocol.
- pd.pushState.confirmedV2 = true
- descriptor := distribution.Descriptor{
- Digest: pushDigest,
- MediaType: schema2.MediaTypeLayer,
- Size: nn,
- }
- pd.pushState.remoteLayers[diffID] = descriptor
- pd.pushState.Unlock()
- return descriptor, nil
- }
- func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
- pd.remoteDescriptor = descriptor
- }
- func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
- return pd.remoteDescriptor
- }
- // layerAlreadyExists checks if the registry already know about any of the
- // metadata passed in the "metadata" slice. If it finds one that the registry
- // knows about, it returns the known digest and "true".
- func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
- for _, meta := range metadata {
- // Only check blobsums that are known to this repository or have an unknown source
- if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() {
- continue
- }
- descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest)
- switch err {
- case nil:
- descriptor.MediaType = schema2.MediaTypeLayer
- return descriptor, true, nil
- case distribution.ErrBlobUnknown:
- // nop
- default:
- return distribution.Descriptor{}, false, err
- }
- }
- return distribution.Descriptor{}, false, nil
- }
|