push.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package distribution
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/distribution/metadata"
  9. "github.com/docker/docker/distribution/xfer"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/layer"
  12. "github.com/docker/docker/pkg/progress"
  13. "github.com/docker/docker/reference"
  14. "github.com/docker/docker/registry"
  15. "github.com/docker/engine-api/types"
  16. "github.com/docker/libtrust"
  17. "golang.org/x/net/context"
  18. )
  19. // ImagePushConfig stores push configuration.
  20. type ImagePushConfig struct {
  21. // MetaHeaders store HTTP headers with metadata about the image
  22. MetaHeaders map[string][]string
  23. // AuthConfig holds authentication credentials for authenticating with
  24. // the registry.
  25. AuthConfig *types.AuthConfig
  26. // ProgressOutput is the interface for showing the status of the push
  27. // operation.
  28. ProgressOutput progress.Output
  29. // RegistryService is the registry service to use for TLS configuration
  30. // and endpoint lookup.
  31. RegistryService registry.Service
  32. // ImageEventLogger notifies events for a given image
  33. ImageEventLogger func(id, name, action string)
  34. // MetadataStore is the storage backend for distribution-specific
  35. // metadata.
  36. MetadataStore metadata.Store
  37. // LayerStore manages layers.
  38. LayerStore layer.Store
  39. // ImageStore manages images.
  40. ImageStore image.Store
  41. // ReferenceStore manages tags.
  42. ReferenceStore reference.Store
  43. // TrustKey is the private key for legacy signatures. This is typically
  44. // an ephemeral key, since these signatures are no longer verified.
  45. TrustKey libtrust.PrivateKey
  46. // UploadManager dispatches uploads.
  47. UploadManager *xfer.LayerUploadManager
  48. }
  49. // Pusher is an interface that abstracts pushing for different API versions.
  50. type Pusher interface {
  51. // Push tries to push the image configured at the creation of Pusher.
  52. // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
  53. //
  54. // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
  55. Push(ctx context.Context) error
  56. }
  57. const compressionBufSize = 32768
  58. // NewPusher creates a new Pusher interface that will push to either a v1 or v2
  59. // registry. The endpoint argument contains a Version field that determines
  60. // whether a v1 or v2 pusher will be created. The other parameters are passed
  61. // through to the underlying pusher implementation for use during the actual
  62. // push operation.
  63. func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
  64. switch endpoint.Version {
  65. case registry.APIVersion2:
  66. return &v2Pusher{
  67. v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
  68. ref: ref,
  69. endpoint: endpoint,
  70. repoInfo: repoInfo,
  71. config: imagePushConfig,
  72. }, nil
  73. case registry.APIVersion1:
  74. return &v1Pusher{
  75. v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
  76. ref: ref,
  77. endpoint: endpoint,
  78. repoInfo: repoInfo,
  79. config: imagePushConfig,
  80. }, nil
  81. }
  82. return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
  83. }
  84. // Push initiates a push operation on ref.
  85. // ref is the specific variant of the image to be pushed.
  86. // If no tag is provided, all tags will be pushed.
  87. func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
  88. // FIXME: Allow to interrupt current push when new push of same image is done.
  89. // Resolve the Repository name from fqn to RepositoryInfo
  90. repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
  91. if err != nil {
  92. return err
  93. }
  94. endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(repoInfo.Hostname())
  95. if err != nil {
  96. return err
  97. }
  98. progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.FullName())
  99. associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo)
  100. if len(associations) == 0 {
  101. return fmt.Errorf("An image does not exist locally with the tag: %s", repoInfo.Name())
  102. }
  103. var (
  104. lastErr error
  105. // confirmedV2 is set to true if a push attempt managed to
  106. // confirm that it was talking to a v2 registry. This will
  107. // prevent fallback to the v1 protocol.
  108. confirmedV2 bool
  109. // confirmedTLSRegistries is a map indicating which registries
  110. // are known to be using TLS. There should never be a plaintext
  111. // retry for any of these.
  112. confirmedTLSRegistries = make(map[string]struct{})
  113. )
  114. for _, endpoint := range endpoints {
  115. if confirmedV2 && endpoint.Version == registry.APIVersion1 {
  116. logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
  117. continue
  118. }
  119. if endpoint.URL.Scheme != "https" {
  120. if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
  121. logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
  122. continue
  123. }
  124. }
  125. logrus.Debugf("Trying to push %s to %s %s", repoInfo.FullName(), endpoint.URL, endpoint.Version)
  126. pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
  127. if err != nil {
  128. lastErr = err
  129. continue
  130. }
  131. if err := pusher.Push(ctx); err != nil {
  132. // Was this push cancelled? If so, don't try to fall
  133. // back.
  134. select {
  135. case <-ctx.Done():
  136. default:
  137. if fallbackErr, ok := err.(fallbackError); ok {
  138. confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
  139. if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
  140. confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
  141. }
  142. err = fallbackErr.err
  143. lastErr = err
  144. logrus.Errorf("Attempting next endpoint for push after error: %v", err)
  145. continue
  146. }
  147. }
  148. logrus.Errorf("Not continuing with push after error: %v", err)
  149. return err
  150. }
  151. imagePushConfig.ImageEventLogger(ref.String(), repoInfo.Name(), "push")
  152. return nil
  153. }
  154. if lastErr == nil {
  155. lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.FullName())
  156. }
  157. return lastErr
  158. }
  159. // compress returns an io.ReadCloser which will supply a compressed version of
  160. // the provided Reader. The caller must close the ReadCloser after reading the
  161. // compressed data.
  162. //
  163. // Note that this function returns a reader instead of taking a writer as an
  164. // argument so that it can be used with httpBlobWriter's ReadFrom method.
  165. // Using httpBlobWriter's Write method would send a PATCH request for every
  166. // Write call.
  167. //
  168. // The second return value is a channel that gets closed when the goroutine
  169. // is finished. This allows the caller to make sure the goroutine finishes
  170. // before it releases any resources connected with the reader that was
  171. // passed in.
  172. func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
  173. compressionDone := make(chan struct{})
  174. pipeReader, pipeWriter := io.Pipe()
  175. // Use a bufio.Writer to avoid excessive chunking in HTTP request.
  176. bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
  177. compressor := gzip.NewWriter(bufWriter)
  178. go func() {
  179. _, err := io.Copy(compressor, in)
  180. if err == nil {
  181. err = compressor.Close()
  182. }
  183. if err == nil {
  184. err = bufWriter.Flush()
  185. }
  186. if err != nil {
  187. pipeWriter.CloseWithError(err)
  188. } else {
  189. pipeWriter.Close()
  190. }
  191. close(compressionDone)
  192. }()
  193. return pipeReader, compressionDone
  194. }