upload.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package xfer
  2. import (
  3. "errors"
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/distribution"
  7. "github.com/docker/docker/layer"
  8. "github.com/docker/docker/pkg/progress"
  9. "golang.org/x/net/context"
  10. )
  11. const maxUploadAttempts = 5
  12. // LayerUploadManager provides task management and progress reporting for
  13. // uploads.
  14. type LayerUploadManager struct {
  15. tm TransferManager
  16. }
  17. // SetConcurrency set the max concurrent uploads for each push
  18. func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
  19. lum.tm.SetConcurrency(concurrency)
  20. }
  21. // NewLayerUploadManager returns a new LayerUploadManager.
  22. func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
  23. return &LayerUploadManager{
  24. tm: NewTransferManager(concurrencyLimit),
  25. }
  26. }
  27. type uploadTransfer struct {
  28. Transfer
  29. remoteDescriptor distribution.Descriptor
  30. err error
  31. }
  32. // An UploadDescriptor references a layer that may need to be uploaded.
  33. type UploadDescriptor interface {
  34. // Key returns the key used to deduplicate uploads.
  35. Key() string
  36. // ID returns the ID for display purposes.
  37. ID() string
  38. // DiffID should return the DiffID for this layer.
  39. DiffID() layer.DiffID
  40. // Upload is called to perform the Upload.
  41. Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
  42. // SetRemoteDescriptor provides the distribution.Descriptor that was
  43. // returned by Upload. This descriptor is not to be confused with
  44. // the UploadDescriptor interface, which is used for internally
  45. // identifying layers that are being uploaded.
  46. SetRemoteDescriptor(descriptor distribution.Descriptor)
  47. }
  48. // Upload is a blocking function which ensures the listed layers are present on
  49. // the remote registry. It uses the string returned by the Key method to
  50. // deduplicate uploads.
  51. func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
  52. var (
  53. uploads []*uploadTransfer
  54. dedupDescriptors = make(map[string]*uploadTransfer)
  55. )
  56. for _, descriptor := range layers {
  57. progress.Update(progressOutput, descriptor.ID(), "Preparing")
  58. key := descriptor.Key()
  59. if _, present := dedupDescriptors[key]; present {
  60. continue
  61. }
  62. xferFunc := lum.makeUploadFunc(descriptor)
  63. upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
  64. defer upload.Release(watcher)
  65. uploads = append(uploads, upload.(*uploadTransfer))
  66. dedupDescriptors[key] = upload.(*uploadTransfer)
  67. }
  68. for _, upload := range uploads {
  69. select {
  70. case <-ctx.Done():
  71. return ctx.Err()
  72. case <-upload.Transfer.Done():
  73. if upload.err != nil {
  74. return upload.err
  75. }
  76. }
  77. }
  78. for _, l := range layers {
  79. l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
  80. }
  81. return nil
  82. }
  83. func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
  84. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  85. u := &uploadTransfer{
  86. Transfer: NewTransfer(),
  87. }
  88. go func() {
  89. defer func() {
  90. close(progressChan)
  91. }()
  92. progressOutput := progress.ChanOutput(progressChan)
  93. select {
  94. case <-start:
  95. default:
  96. progress.Update(progressOutput, descriptor.ID(), "Waiting")
  97. <-start
  98. }
  99. retries := 0
  100. for {
  101. remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
  102. if err == nil {
  103. u.remoteDescriptor = remoteDescriptor
  104. break
  105. }
  106. // If an error was returned because the context
  107. // was cancelled, we shouldn't retry.
  108. select {
  109. case <-u.Transfer.Context().Done():
  110. u.err = err
  111. return
  112. default:
  113. }
  114. retries++
  115. if _, isDNR := err.(DoNotRetry); isDNR || retries == maxUploadAttempts {
  116. logrus.Errorf("Upload failed: %v", err)
  117. u.err = err
  118. return
  119. }
  120. logrus.Errorf("Upload failed, retrying: %v", err)
  121. delay := retries * 5
  122. ticker := time.NewTicker(time.Second)
  123. selectLoop:
  124. for {
  125. progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
  126. select {
  127. case <-ticker.C:
  128. delay--
  129. if delay == 0 {
  130. ticker.Stop()
  131. break selectLoop
  132. }
  133. case <-u.Transfer.Context().Done():
  134. ticker.Stop()
  135. u.err = errors.New("upload cancelled during retry delay")
  136. return
  137. }
  138. }
  139. }
  140. }()
  141. return u
  142. }
  143. }