Merge pull request #44963 from vvoland/c8d-push-upstream
c8d: Implement push
This commit is contained in:
commit
f537ef5746
7 changed files with 427 additions and 22 deletions
|
@ -38,7 +38,7 @@ type importExportBackend interface {
|
||||||
|
|
||||||
type registryBackend interface {
|
type registryBackend interface {
|
||||||
PullImage(ctx context.Context, image, tag string, platform *specs.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
PullImage(ctx context.Context, image, tag string, platform *specs.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
||||||
PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
PushImage(ctx context.Context, ref reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Searcher interface {
|
type Searcher interface {
|
||||||
|
|
|
@ -154,7 +154,25 @@ func (ir *imageRouter) postImagesPush(ctx context.Context, w http.ResponseWriter
|
||||||
|
|
||||||
img := vars["name"]
|
img := vars["name"]
|
||||||
tag := r.Form.Get("tag")
|
tag := r.Form.Get("tag")
|
||||||
if err := ir.backend.PushImage(ctx, img, tag, metaHeaders, authConfig, output); err != nil {
|
|
||||||
|
var ref reference.Named
|
||||||
|
|
||||||
|
// Tag is empty only in case ImagePushOptions.All is true.
|
||||||
|
if tag != "" {
|
||||||
|
r, err := httputils.RepoTagReference(img, tag)
|
||||||
|
if err != nil {
|
||||||
|
return errdefs.InvalidParameter(err)
|
||||||
|
}
|
||||||
|
ref = r
|
||||||
|
} else {
|
||||||
|
r, err := reference.ParseNormalizedNamed(img)
|
||||||
|
if err != nil {
|
||||||
|
return errdefs.InvalidParameter(err)
|
||||||
|
}
|
||||||
|
ref = r
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ir.backend.PushImage(ctx, ref, metaHeaders, authConfig, output); err != nil {
|
||||||
if !output.Flushed() {
|
if !output.Flushed() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,14 +2,255 @@ package containerd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
cerrdefs "github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/containerd/containerd/images"
|
||||||
|
containerdimages "github.com/containerd/containerd/images"
|
||||||
|
"github.com/containerd/containerd/platforms"
|
||||||
|
"github.com/containerd/containerd/remotes"
|
||||||
|
"github.com/containerd/containerd/remotes/docker"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/docker/api/types/registry"
|
"github.com/docker/docker/api/types/registry"
|
||||||
"github.com/docker/docker/errdefs"
|
"github.com/docker/docker/errdefs"
|
||||||
|
"github.com/docker/docker/pkg/streamformatter"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PushImage initiates a push operation on the repository named localName.
|
// PushImage initiates a push operation of the image pointed to by targetRef.
|
||||||
func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
|
// Image manifest (or index) is pushed as is, which will probably fail if you
|
||||||
return errdefs.NotImplemented(errors.New("not implemented"))
|
// don't have all content referenced by the index.
|
||||||
|
// Cross-repo mounts will be attempted for non-existing blobs.
|
||||||
|
//
|
||||||
|
// It will also add distribution source labels to the pushed content
|
||||||
|
// pointing to the new target repository. This will allow subsequent pushes
|
||||||
|
// to perform cross-repo mounts of the shared content when pushing to a different
|
||||||
|
// repository on the same registry.
|
||||||
|
func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
|
||||||
|
if _, tagged := targetRef.(reference.Tagged); !tagged {
|
||||||
|
if _, digested := targetRef.(reference.Digested); !digested {
|
||||||
|
return errdefs.NotImplemented(errors.New("push all tags is not implemented"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
leasedCtx, release, err := i.client.WithLease(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer release(leasedCtx)
|
||||||
|
out := streamformatter.NewJSONProgressOutput(outStream, false)
|
||||||
|
|
||||||
|
img, err := i.client.ImageService().Get(ctx, targetRef.String())
|
||||||
|
if err != nil {
|
||||||
|
return errdefs.NotFound(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
target := img.Target
|
||||||
|
store := i.client.ContentStore()
|
||||||
|
|
||||||
|
resolver, tracker := i.newResolverFromAuthConfig(authConfig)
|
||||||
|
pushProgress := pushProgress{Tracker: tracker}
|
||||||
|
jobs := newJobs()
|
||||||
|
finishProgress := jobs.showProgress(ctx, out, combinedProgress([]progressUpdater{
|
||||||
|
&pushProgress,
|
||||||
|
pullProgress{ShowExists: false, Store: store},
|
||||||
|
}))
|
||||||
|
defer finishProgress()
|
||||||
|
|
||||||
|
var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
|
||||||
|
|
||||||
|
mountableBlobs, err := i.findMissingMountable(ctx, store, jobs, target, targetRef, limiter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for dgst := range mountableBlobs {
|
||||||
|
pushProgress.addMountable(dgst)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a store which fakes the local existence of possibly mountable blobs.
|
||||||
|
// Otherwise they can't be pushed at all.
|
||||||
|
realStore := store
|
||||||
|
wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
|
||||||
|
store = wrapped
|
||||||
|
|
||||||
|
pusher, err := resolver.Pusher(ctx, targetRef.String())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
addChildrenToJobs := containerdimages.HandlerFunc(
|
||||||
|
func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
|
children, err := containerdimages.Children(ctx, store, desc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, c := range children {
|
||||||
|
jobs.Add(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs.Add(desc)
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
|
||||||
|
if err != nil {
|
||||||
|
// This shouldn't happen at this point because the reference would have to be invalid
|
||||||
|
// and if it was, then it would error out earlier.
|
||||||
|
return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
|
||||||
|
}
|
||||||
|
|
||||||
|
handlerWrapper := func(h images.Handler) images.Handler {
|
||||||
|
return containerdimages.Handlers(addChildrenToJobs, h, appendSource)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
|
||||||
|
if err != nil {
|
||||||
|
if containerdimages.IsIndexType(target.MediaType) {
|
||||||
|
if cerrdefs.IsNotFound(err) {
|
||||||
|
err = errdefs.NotFound(fmt.Errorf(
|
||||||
|
"missing content: %w\n"+
|
||||||
|
"Note: You're trying to push a manifest list/index which "+
|
||||||
|
"references multiple platform specific manifests, but not all of them are available locally "+
|
||||||
|
"or available to the remote repository.\n"+
|
||||||
|
"Make sure you have all the referenced content and try again.",
|
||||||
|
err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// findMissingMountable will walk the target descriptor recursively and return
|
||||||
|
// missing contents with their distribution source which could potentially
|
||||||
|
// be cross-repo mounted.
|
||||||
|
func (i *ImageService) findMissingMountable(ctx context.Context, store content.Store, jobs *jobs,
|
||||||
|
target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
|
||||||
|
) (map[digest.Digest]distributionSource, error) {
|
||||||
|
mountableBlobs := map[digest.Digest]distributionSource{}
|
||||||
|
sources, err := getDigestSources(ctx, store, target.Digest)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
logrus.WithField("target", target).Debug("distribution source label not found")
|
||||||
|
return mountableBlobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||||
|
_, err := store.Info(ctx, desc.Digest)
|
||||||
|
if err != nil {
|
||||||
|
if !cerrdefs.IsNotFound(err) {
|
||||||
|
return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, source := range sources {
|
||||||
|
if canBeMounted(desc.MediaType, targetRef, i.registryService.IsInsecureRegistry, source) {
|
||||||
|
mountableBlobs[desc.Digest] = source
|
||||||
|
jobs.Add(desc)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return containerdimages.Children(ctx, store, desc)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return mountableBlobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
|
||||||
|
info, err := store.Info(ctx, digest)
|
||||||
|
if err != nil {
|
||||||
|
if cerrdefs.IsNotFound(err) {
|
||||||
|
return nil, errdefs.NotFound(err)
|
||||||
|
}
|
||||||
|
return nil, errdefs.System(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sources := extractDistributionSources(info.Labels)
|
||||||
|
if sources == nil {
|
||||||
|
return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", labelDistributionSource, digest.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return sources, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(vvoland): Remove and use containerd const in containerd 1.7+
|
||||||
|
// https://github.com/containerd/containerd/pull/8224
|
||||||
|
const labelDistributionSource = "containerd.io/distribution.source."
|
||||||
|
|
||||||
|
func extractDistributionSources(labels map[string]string) []distributionSource {
|
||||||
|
var sources []distributionSource
|
||||||
|
|
||||||
|
// Check if this blob has a distributionSource label
|
||||||
|
// if yes, read it as source
|
||||||
|
for k, v := range labels {
|
||||||
|
registry := strings.TrimPrefix(k, labelDistributionSource)
|
||||||
|
if registry != k {
|
||||||
|
ref, err := reference.ParseNamed(registry + "/" + v)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sources = append(sources, distributionSource{
|
||||||
|
registryRef: ref,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sources
|
||||||
|
}
|
||||||
|
|
||||||
|
type distributionSource struct {
|
||||||
|
registryRef reference.Named
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToAnnotation returns key and value
|
||||||
|
func (source distributionSource) ToAnnotation() (string, string) {
|
||||||
|
domain := reference.Domain(source.registryRef)
|
||||||
|
v := reference.Path(source.registryRef)
|
||||||
|
return labelDistributionSource + domain, v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
|
||||||
|
return reference.WithDigest(source.registryRef, dgst)
|
||||||
|
}
|
||||||
|
|
||||||
|
// canBeMounted returns if the content with given media type can be cross-repo
|
||||||
|
// mounted when pushing it to a remote reference ref.
|
||||||
|
func canBeMounted(mediaType string, targetRef reference.Named, isInsecureFunc func(string) bool, source distributionSource) bool {
|
||||||
|
if containerdimages.IsManifestType(mediaType) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if containerdimages.IsIndexType(mediaType) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
registry := reference.Domain(targetRef)
|
||||||
|
|
||||||
|
// Cross-repo mount doesn't seem to work with insecure registries.
|
||||||
|
isInsecure := isInsecureFunc(registry)
|
||||||
|
if isInsecure {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the source registry is the same as the one we are pushing to
|
||||||
|
// then the cross-repo mount will work.
|
||||||
|
return registry == reference.Domain(source.registryRef)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
cerrdefs "github.com/containerd/containerd/errdefs"
|
cerrdefs "github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
|
"github.com/containerd/containerd/remotes/docker"
|
||||||
"github.com/docker/docker/pkg/progress"
|
"github.com/docker/docker/pkg/progress"
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
|
@ -158,3 +159,76 @@ func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out pro
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type pushProgress struct {
|
||||||
|
Tracker docker.StatusTracker
|
||||||
|
mountable map[digest.Digest]struct{}
|
||||||
|
mutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pushProgress) addMountable(dgst digest.Digest) {
|
||||||
|
p.mutex.Lock()
|
||||||
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
|
if p.mountable == nil {
|
||||||
|
p.mountable = map[digest.Digest]struct{}{}
|
||||||
|
}
|
||||||
|
p.mountable[dgst] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pushProgress) isMountable(dgst digest.Digest) bool {
|
||||||
|
p.mutex.Lock()
|
||||||
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
|
if p.mountable == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_, has := p.mountable[dgst]
|
||||||
|
return has
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pushProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
|
||||||
|
for _, j := range ongoing.Jobs() {
|
||||||
|
key := remotes.MakeRefKey(ctx, j)
|
||||||
|
id := stringid.TruncateID(j.Digest.Encoded())
|
||||||
|
|
||||||
|
status, err := p.Tracker.GetStatus(key)
|
||||||
|
if err != nil {
|
||||||
|
if cerrdefs.IsNotFound(err) {
|
||||||
|
progress.Update(out, id, "Waiting")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.Committed && status.Offset >= status.Total {
|
||||||
|
if p.isMountable(j.Digest) {
|
||||||
|
progress.Update(out, id, "Mounted")
|
||||||
|
} else {
|
||||||
|
progress.Update(out, id, "Pushed")
|
||||||
|
}
|
||||||
|
ongoing.Remove(j)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
out.WriteProgress(progress.Progress{
|
||||||
|
ID: id,
|
||||||
|
Action: "Pushing",
|
||||||
|
Current: status.Offset,
|
||||||
|
Total: status.Total,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type combinedProgress []progressUpdater
|
||||||
|
|
||||||
|
func (combined combinedProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
|
||||||
|
for _, p := range combined {
|
||||||
|
err := p.UpdateProgress(ctx, ongoing, out, start)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
84
daemon/containerd/store.go
Normal file
84
daemon/containerd/store.go
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
package containerd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
cerrdefs "github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeStoreWithSources fakes the existence of the specified content.
|
||||||
|
// Only existence is faked - Info function will include the distribution source label
|
||||||
|
// which makes it possible to perform cross-repo mount.
|
||||||
|
// ReaderAt will still fail with ErrNotFound.
|
||||||
|
type fakeStoreWithSources struct {
|
||||||
|
s content.Store
|
||||||
|
sources map[digest.Digest]distributionSource
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrapWithFakeMountableBlobs wraps the provided content store.
|
||||||
|
func wrapWithFakeMountableBlobs(s content.Store, sources map[digest.Digest]distributionSource) fakeStoreWithSources {
|
||||||
|
return fakeStoreWithSources{
|
||||||
|
s: s,
|
||||||
|
sources: sources,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
|
return p.s.Delete(ctx, dgst)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||||
|
info, err := p.s.Info(ctx, dgst)
|
||||||
|
if err != nil {
|
||||||
|
if !cerrdefs.IsNotFound(err) {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
source, ok := p.sources[dgst]
|
||||||
|
if !ok {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := labelDistributionSource + reference.Domain(source.registryRef)
|
||||||
|
value := reference.Path(source.registryRef)
|
||||||
|
return content.Info{
|
||||||
|
Digest: dgst,
|
||||||
|
Labels: map[string]string{
|
||||||
|
key: value,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||||
|
return p.s.Update(ctx, info, fieldpaths...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
|
||||||
|
return p.s.Walk(ctx, fn, filters...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
|
||||||
|
return p.s.ReaderAt(ctx, desc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Abort(ctx context.Context, ref string) error {
|
||||||
|
return p.s.Abort(ctx, ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
|
||||||
|
return p.s.ListStatuses(ctx, filters...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Status(ctx context.Context, ref string) (content.Status, error) {
|
||||||
|
return p.s.Status(ctx, ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p fakeStoreWithSources) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||||
|
return p.s.Writer(ctx, opts...)
|
||||||
|
}
|
|
@ -26,8 +26,8 @@ import (
|
||||||
type ImageService interface {
|
type ImageService interface {
|
||||||
// Images
|
// Images
|
||||||
|
|
||||||
PullImage(ctx context.Context, image, tag string, platform *v1.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
PullImage(ctx context.Context, name, tag string, platform *v1.Platform, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
||||||
PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
PushImage(ctx context.Context, ref reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error
|
||||||
CreateImage(config []byte, parent string) (builder.Image, error)
|
CreateImage(config []byte, parent string) (builder.Image, error)
|
||||||
ImageDelete(ctx context.Context, imageRef string, force, prune bool) ([]types.ImageDeleteResponseItem, error)
|
ImageDelete(ctx context.Context, imageRef string, force, prune bool) ([]types.ImageDeleteResponseItem, error)
|
||||||
ExportImage(ctx context.Context, names []string, outStream io.Writer) error
|
ExportImage(ctx context.Context, names []string, outStream io.Writer) error
|
||||||
|
|
|
@ -14,20 +14,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// PushImage initiates a push operation on the repository named localName.
|
// PushImage initiates a push operation on the repository named localName.
|
||||||
func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
|
func (i *ImageService) PushImage(ctx context.Context, ref reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
ref, err := reference.ParseNormalizedNamed(image)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if tag != "" {
|
|
||||||
// Push by digest is not supported, so only tags are supported.
|
|
||||||
ref, err = reference.WithTag(ref, tag)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Include a buffer so that slow client connections don't affect
|
// Include a buffer so that slow client connections don't affect
|
||||||
// transfer performance.
|
// transfer performance.
|
||||||
progressChan := make(chan progress.Progress, 100)
|
progressChan := make(chan progress.Progress, 100)
|
||||||
|
@ -57,7 +45,7 @@ func (i *ImageService) PushImage(ctx context.Context, image, tag string, metaHea
|
||||||
UploadManager: i.uploadManager,
|
UploadManager: i.uploadManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = distribution.Push(ctx, ref, imagePushConfig)
|
err := distribution.Push(ctx, ref, imagePushConfig)
|
||||||
close(progressChan)
|
close(progressChan)
|
||||||
<-writesDone
|
<-writesDone
|
||||||
imageActions.WithValues("push").UpdateSince(start)
|
imageActions.WithValues("push").UpdateSince(start)
|
||||||
|
|
Loading…
Reference in a new issue