123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package distribution
- import (
- "bufio"
- "compress/gzip"
- "fmt"
- "io"
- "github.com/Sirupsen/logrus"
- "github.com/docker/distribution/reference"
- "github.com/docker/docker/distribution/metadata"
- "github.com/docker/docker/pkg/progress"
- "github.com/docker/docker/registry"
- "golang.org/x/net/context"
- )
- // Pusher is an interface that abstracts pushing for different API versions.
- type Pusher interface {
- // Push tries to push the image configured at the creation of Pusher.
- // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
- //
- // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
- Push(ctx context.Context) error
- }
- const compressionBufSize = 32768
- // NewPusher creates a new Pusher interface that will push to either a v1 or v2
- // registry. The endpoint argument contains a Version field that determines
- // whether a v1 or v2 pusher will be created. The other parameters are passed
- // through to the underlying pusher implementation for use during the actual
- // push operation.
- func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
- switch endpoint.Version {
- case registry.APIVersion2:
- return &v2Pusher{
- v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
- ref: ref,
- endpoint: endpoint,
- repoInfo: repoInfo,
- config: imagePushConfig,
- }, nil
- case registry.APIVersion1:
- return &v1Pusher{
- v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
- ref: ref,
- endpoint: endpoint,
- repoInfo: repoInfo,
- config: imagePushConfig,
- }, nil
- }
- return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
- }
- // Push initiates a push operation on ref.
- // ref is the specific variant of the image to be pushed.
- // If no tag is provided, all tags will be pushed.
- func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
- // FIXME: Allow to interrupt current push when new push of same image is done.
- // Resolve the Repository name from fqn to RepositoryInfo
- repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
- if err != nil {
- return err
- }
- endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
- if err != nil {
- return err
- }
- progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.Name.Name())
- associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
- if len(associations) == 0 {
- return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
- }
- var (
- lastErr error
- // confirmedV2 is set to true if a push attempt managed to
- // confirm that it was talking to a v2 registry. This will
- // prevent fallback to the v1 protocol.
- confirmedV2 bool
- // confirmedTLSRegistries is a map indicating which registries
- // are known to be using TLS. There should never be a plaintext
- // retry for any of these.
- confirmedTLSRegistries = make(map[string]struct{})
- )
- for _, endpoint := range endpoints {
- if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
- continue
- }
- if confirmedV2 && endpoint.Version == registry.APIVersion1 {
- logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
- continue
- }
- if endpoint.URL.Scheme != "https" {
- if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
- logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
- continue
- }
- }
- logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
- pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
- if err != nil {
- lastErr = err
- continue
- }
- if err := pusher.Push(ctx); err != nil {
- // Was this push cancelled? If so, don't try to fall
- // back.
- select {
- case <-ctx.Done():
- default:
- if fallbackErr, ok := err.(fallbackError); ok {
- confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
- if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
- confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
- }
- err = fallbackErr.err
- lastErr = err
- logrus.Infof("Attempting next endpoint for push after error: %v", err)
- continue
- }
- }
- logrus.Errorf("Not continuing with push after error: %v", err)
- return err
- }
- imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
- return nil
- }
- if lastErr == nil {
- lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
- }
- return lastErr
- }
- // compress returns an io.ReadCloser which will supply a compressed version of
- // the provided Reader. The caller must close the ReadCloser after reading the
- // compressed data.
- //
- // Note that this function returns a reader instead of taking a writer as an
- // argument so that it can be used with httpBlobWriter's ReadFrom method.
- // Using httpBlobWriter's Write method would send a PATCH request for every
- // Write call.
- //
- // The second return value is a channel that gets closed when the goroutine
- // is finished. This allows the caller to make sure the goroutine finishes
- // before it releases any resources connected with the reader that was
- // passed in.
- func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
- compressionDone := make(chan struct{})
- pipeReader, pipeWriter := io.Pipe()
- // Use a bufio.Writer to avoid excessive chunking in HTTP request.
- bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
- compressor := gzip.NewWriter(bufWriter)
- go func() {
- _, err := io.Copy(compressor, in)
- if err == nil {
- err = compressor.Close()
- }
- if err == nil {
- err = bufWriter.Flush()
- }
- if err != nil {
- pipeWriter.CloseWithError(err)
- } else {
- pipeWriter.Close()
- }
- close(compressionDone)
- }()
- return pipeReader, compressionDone
- }
|