moby/distribution/push.go

130 lines
3.9 KiB
Go
Raw Normal View History

package distribution // import "github.com/docker/docker/distribution"
import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
"github.com/containerd/log"
"github.com/distribution/reference"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/pkg/progress"
)
const compressionBufSize = 32768
// Push initiates a push operation on ref. ref is the specific variant of the
// image to push. If no tag is provided, all tags are pushed.
func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error {
// FIXME: Allow to interrupt current push when new push of same image is done.
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := config.RegistryService.ResolveRepository(ref)
if err != nil {
return err
}
endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return err
}
progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
associations := config.ReferenceStore.ReferencesByName(repoInfo.Name)
if len(associations) == 0 {
return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
}
var (
lastErr error
// confirmedTLSRegistries is a map indicating which registries
// are known to be using TLS. There should never be a plaintext
// retry for any of these.
confirmedTLSRegistries = make(map[string]struct{})
)
for _, endpoint := range endpoints {
if endpoint.URL.Scheme != "https" {
if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
log.G(ctx).Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
continue
}
}
log.G(ctx).Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL)
if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil {
// Was this push cancelled? If so, don't try to fall
// back.
select {
case <-ctx.Done():
default:
if fallbackErr, ok := err.(fallbackError); ok {
if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
}
err = fallbackErr.err
lastErr = err
log.G(ctx).Infof("Attempting next endpoint for push after error: %v", err)
continue
}
}
log.G(ctx).Errorf("Not continuing with push after error: %v", err)
return err
}
config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPush)
return nil
}
if lastErr == nil {
lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
}
return lastErr
}
// compress returns an io.ReadCloser which will supply a compressed version of
// the provided Reader. The caller must close the ReadCloser after reading the
// compressed data.
//
// Note that this function returns a reader instead of taking a writer as an
// argument so that it can be used with httpBlobWriter's ReadFrom method.
// Using httpBlobWriter's Write method would send a PATCH request for every
// Write call.
//
// The second return value is a channel that gets closed when the goroutine
// is finished. This allows the caller to make sure the goroutine finishes
// before it releases any resources connected with the reader that was
// passed in.
func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
compressionDone := make(chan struct{})
pipeReader, pipeWriter := io.Pipe()
// Use a bufio.Writer to avoid excessive chunking in HTTP request.
bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
compressor := gzip.NewWriter(bufWriter)
go func() {
_, err := io.Copy(compressor, in)
if err == nil {
err = compressor.Close()
}
if err == nil {
err = bufWriter.Flush()
}
if err != nil {
pipeWriter.CloseWithError(err)
} else {
pipeWriter.Close()
}
close(compressionDone)
}()
return pipeReader, compressionDone
}