push.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package distribution // import "github.com/docker/docker/distribution"
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "github.com/containerd/log"
  9. "github.com/distribution/reference"
  10. "github.com/docker/docker/api/types/events"
  11. "github.com/docker/docker/pkg/progress"
  12. )
  13. const compressionBufSize = 32768
  14. // Push initiates a push operation on ref. ref is the specific variant of the
  15. // image to push. If no tag is provided, all tags are pushed.
  16. func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error {
  17. // FIXME: Allow to interrupt current push when new push of same image is done.
  18. // Resolve the Repository name from fqn to RepositoryInfo
  19. repoInfo, err := config.RegistryService.ResolveRepository(ref)
  20. if err != nil {
  21. return err
  22. }
  23. endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
  24. if err != nil {
  25. return err
  26. }
  27. progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
  28. associations := config.ReferenceStore.ReferencesByName(repoInfo.Name)
  29. if len(associations) == 0 {
  30. return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
  31. }
  32. var (
  33. lastErr error
  34. // confirmedTLSRegistries is a map indicating which registries
  35. // are known to be using TLS. There should never be a plaintext
  36. // retry for any of these.
  37. confirmedTLSRegistries = make(map[string]struct{})
  38. )
  39. for _, endpoint := range endpoints {
  40. if endpoint.URL.Scheme != "https" {
  41. if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
  42. log.G(ctx).Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
  43. continue
  44. }
  45. }
  46. log.G(ctx).Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL)
  47. if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil {
  48. // Was this push cancelled? If so, don't try to fall
  49. // back.
  50. select {
  51. case <-ctx.Done():
  52. default:
  53. if fallbackErr, ok := err.(fallbackError); ok {
  54. if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
  55. confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
  56. }
  57. err = fallbackErr.err
  58. lastErr = err
  59. log.G(ctx).Infof("Attempting next endpoint for push after error: %v", err)
  60. continue
  61. }
  62. }
  63. log.G(ctx).Errorf("Not continuing with push after error: %v", err)
  64. return err
  65. }
  66. config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), events.ActionPush)
  67. return nil
  68. }
  69. if lastErr == nil {
  70. lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.Name.Name())
  71. }
  72. return lastErr
  73. }
  74. // compress returns an io.ReadCloser which will supply a compressed version of
  75. // the provided Reader. The caller must close the ReadCloser after reading the
  76. // compressed data.
  77. //
  78. // Note that this function returns a reader instead of taking a writer as an
  79. // argument so that it can be used with httpBlobWriter's ReadFrom method.
  80. // Using httpBlobWriter's Write method would send a PATCH request for every
  81. // Write call.
  82. //
  83. // The second return value is a channel that gets closed when the goroutine
  84. // is finished. This allows the caller to make sure the goroutine finishes
  85. // before it releases any resources connected with the reader that was
  86. // passed in.
  87. func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
  88. compressionDone := make(chan struct{})
  89. pipeReader, pipeWriter := io.Pipe()
  90. // Use a bufio.Writer to avoid excessive chunking in HTTP request.
  91. bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
  92. compressor := gzip.NewWriter(bufWriter)
  93. go func() {
  94. _, err := io.Copy(compressor, in)
  95. if err == nil {
  96. err = compressor.Close()
  97. }
  98. if err == nil {
  99. err = bufWriter.Flush()
  100. }
  101. if err != nil {
  102. pipeWriter.CloseWithError(err)
  103. } else {
  104. pipeWriter.Close()
  105. }
  106. close(compressionDone)
  107. }()
  108. return pipeReader, compressionDone
  109. }