push.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package distribution
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/distribution/reference"
  9. "github.com/docker/docker/distribution/metadata"
  10. "github.com/docker/docker/pkg/progress"
  11. "github.com/docker/docker/registry"
  12. "golang.org/x/net/context"
  13. )
  14. // Pusher is an interface that abstracts pushing for different API versions.
  15. type Pusher interface {
  16. // Push tries to push the image configured at the creation of Pusher.
  17. // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
  18. //
  19. // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
  20. Push(ctx context.Context) error
  21. }
  22. const compressionBufSize = 32768
  23. // NewPusher creates a new Pusher interface that will push to either a v1 or v2
  24. // registry. The endpoint argument contains a Version field that determines
  25. // whether a v1 or v2 pusher will be created. The other parameters are passed
  26. // through to the underlying pusher implementation for use during the actual
  27. // push operation.
  28. func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
  29. switch endpoint.Version {
  30. case registry.APIVersion2:
  31. return &v2Pusher{
  32. v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
  33. ref: ref,
  34. endpoint: endpoint,
  35. repoInfo: repoInfo,
  36. config: imagePushConfig,
  37. }, nil
  38. case registry.APIVersion1:
  39. return &v1Pusher{
  40. v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
  41. ref: ref,
  42. endpoint: endpoint,
  43. repoInfo: repoInfo,
  44. config: imagePushConfig,
  45. }, nil
  46. }
  47. return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
  48. }
  49. // Push initiates a push operation on ref.
  50. // ref is the specific variant of the image to be pushed.
  51. // If no tag is provided, all tags will be pushed.
  52. func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
  53. // FIXME: Allow to interrupt current push when new push of same image is done.
  54. // Resolve the Repository name from fqn to RepositoryInfo
  55. repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
  56. if err != nil {
  57. return err
  58. }
  59. endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
  60. if err != nil {
  61. return err
  62. }
  63. progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.Name.Name())
  64. associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
  65. if len(associations) == 0 {
  66. return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
  67. }
  68. var (
  69. lastErr error
  70. // confirmedV2 is set to true if a push attempt managed to
  71. // confirm that it was talking to a v2 registry. This will
  72. // prevent fallback to the v1 protocol.
  73. confirmedV2 bool
  74. // confirmedTLSRegistries is a map indicating which registries
  75. // are known to be using TLS. There should never be a plaintext
  76. // retry for any of these.
  77. confirmedTLSRegistries = make(map[string]struct{})
  78. )
  79. for _, endpoint := range endpoints {
  80. if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
  81. continue
  82. }
  83. if confirmedV2 && endpoint.Version == registry.APIVersion1 {
  84. logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
  85. continue
  86. }
  87. if endpoint.URL.Scheme != "https" {
  88. if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
  89. logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
  90. continue
  91. }
  92. }
  93. logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
  94. pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
  95. if err != nil {
  96. lastErr = err
  97. continue
  98. }
  99. if err := pusher.Push(ctx); err != nil {
  100. // Was this push cancelled? If so, don't try to fall
  101. // back.
  102. select {
  103. case <-ctx.Done():
  104. default:
  105. if fallbackErr, ok := err.(fallbackError); ok {
  106. confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
  107. if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
  108. confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
  109. }
  110. err = fallbackErr.err
  111. lastErr = err
  112. logrus.Infof("Attempting next endpoint for push after error: %v", err)
  113. continue
  114. }
  115. }
  116. logrus.Errorf("Not continuing with push after error: %v", err)
  117. return err
  118. }
  119. imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
  120. return nil
  121. }
  122. if lastErr == nil {
  123. lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
  124. }
  125. return lastErr
  126. }
  127. // compress returns an io.ReadCloser which will supply a compressed version of
  128. // the provided Reader. The caller must close the ReadCloser after reading the
  129. // compressed data.
  130. //
  131. // Note that this function returns a reader instead of taking a writer as an
  132. // argument so that it can be used with httpBlobWriter's ReadFrom method.
  133. // Using httpBlobWriter's Write method would send a PATCH request for every
  134. // Write call.
  135. //
  136. // The second return value is a channel that gets closed when the goroutine
  137. // is finished. This allows the caller to make sure the goroutine finishes
  138. // before it releases any resources connected with the reader that was
  139. // passed in.
  140. func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
  141. compressionDone := make(chan struct{})
  142. pipeReader, pipeWriter := io.Pipe()
  143. // Use a bufio.Writer to avoid excessive chunking in HTTP request.
  144. bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
  145. compressor := gzip.NewWriter(bufWriter)
  146. go func() {
  147. _, err := io.Copy(compressor, in)
  148. if err == nil {
  149. err = compressor.Close()
  150. }
  151. if err == nil {
  152. err = bufWriter.Flush()
  153. }
  154. if err != nil {
  155. pipeWriter.CloseWithError(err)
  156. } else {
  157. pipeWriter.Close()
  158. }
  159. close(compressionDone)
  160. }()
  161. return pipeReader, compressionDone
  162. }