package distribution // import "github.com/docker/docker/distribution" import ( "bufio" "compress/gzip" "context" "fmt" "io" "github.com/containerd/log" "github.com/distribution/reference" "github.com/docker/docker/api/types/events" "github.com/docker/docker/pkg/progress" ) const compressionBufSize = 32768 // Push initiates a push operation on ref. ref is the specific variant of the // image to push. If no tag is provided, all tags are pushed. func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error { // FIXME: Allow to interrupt current push when new push of same image is done. // Resolve the Repository name from fqn to RepositoryInfo repoInfo, err := config.RegistryService.ResolveRepository(ref) if err != nil { return err } endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) if err != nil { return err } progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) associations := config.ReferenceStore.ReferencesByName(repoInfo.Name) if len(associations) == 0 { return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name)) } var ( lastErr error // confirmedTLSRegistries is a map indicating which registries // are known to be using TLS. There should never be a plaintext // retry for any of these. confirmedTLSRegistries = make(map[string]struct{}) ) for _, endpoint := range endpoints { if endpoint.URL.Scheme != "https" { if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS { log.G(ctx).Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL) continue } } log.G(ctx).Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL) if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil { // Was this push cancelled? If so, don't try to fall // back. select { case <-ctx.Done(): default: if fallbackErr, ok := err.(fallbackError); ok { if fallbackErr.transportOK && endpoint.URL.Scheme == "https" { confirmedTLSRegistries[endpoint.URL.Host] = struct{}{} } err = fallbackErr.err lastErr = err log.G(ctx).Infof("Attempting next endpoint for push after error: %v", err) continue } } log.G(ctx).Errorf("Not continuing with push after error: %v", err) return err } config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPush) return nil } if lastErr == nil { lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name()) } return lastErr } // compress returns an io.ReadCloser which will supply a compressed version of // the provided Reader. The caller must close the ReadCloser after reading the // compressed data. // // Note that this function returns a reader instead of taking a writer as an // argument so that it can be used with httpBlobWriter's ReadFrom method. // Using httpBlobWriter's Write method would send a PATCH request for every // Write call. // // The second return value is a channel that gets closed when the goroutine // is finished. This allows the caller to make sure the goroutine finishes // before it releases any resources connected with the reader that was // passed in. func compress(in io.Reader) (io.ReadCloser, chan struct{}) { compressionDone := make(chan struct{}) pipeReader, pipeWriter := io.Pipe() // Use a bufio.Writer to avoid excessive chunking in HTTP request. bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize) compressor := gzip.NewWriter(bufWriter) go func() { _, err := io.Copy(compressor, in) if err == nil { err = compressor.Close() } if err == nil { err = bufWriter.Flush() } if err != nil { pipeWriter.CloseWithError(err) } else { pipeWriter.Close() } close(compressionDone) }() return pipeReader, compressionDone }