upload.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package xfer
  2. import (
  3. "errors"
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/distribution"
  7. "github.com/docker/docker/layer"
  8. "github.com/docker/docker/pkg/progress"
  9. "golang.org/x/net/context"
  10. )
  11. const maxUploadAttempts = 5
  12. // LayerUploadManager provides task management and progress reporting for
  13. // uploads.
  14. type LayerUploadManager struct {
  15. tm TransferManager
  16. waitDuration time.Duration
  17. }
  18. // SetConcurrency sets the max concurrent uploads for each push
  19. func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
  20. lum.tm.SetConcurrency(concurrency)
  21. }
  22. // NewLayerUploadManager returns a new LayerUploadManager.
  23. func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
  24. manager := LayerUploadManager{
  25. tm: NewTransferManager(concurrencyLimit),
  26. waitDuration: time.Second,
  27. }
  28. for _, option := range options {
  29. option(&manager)
  30. }
  31. return &manager
  32. }
  33. type uploadTransfer struct {
  34. Transfer
  35. remoteDescriptor distribution.Descriptor
  36. err error
  37. }
  38. // An UploadDescriptor references a layer that may need to be uploaded.
  39. type UploadDescriptor interface {
  40. // Key returns the key used to deduplicate uploads.
  41. Key() string
  42. // ID returns the ID for display purposes.
  43. ID() string
  44. // DiffID should return the DiffID for this layer.
  45. DiffID() layer.DiffID
  46. // Upload is called to perform the Upload.
  47. Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
  48. // SetRemoteDescriptor provides the distribution.Descriptor that was
  49. // returned by Upload. This descriptor is not to be confused with
  50. // the UploadDescriptor interface, which is used for internally
  51. // identifying layers that are being uploaded.
  52. SetRemoteDescriptor(descriptor distribution.Descriptor)
  53. }
  54. // Upload is a blocking function which ensures the listed layers are present on
  55. // the remote registry. It uses the string returned by the Key method to
  56. // deduplicate uploads.
  57. func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
  58. var (
  59. uploads []*uploadTransfer
  60. dedupDescriptors = make(map[string]*uploadTransfer)
  61. )
  62. for _, descriptor := range layers {
  63. progress.Update(progressOutput, descriptor.ID(), "Preparing")
  64. key := descriptor.Key()
  65. if _, present := dedupDescriptors[key]; present {
  66. continue
  67. }
  68. xferFunc := lum.makeUploadFunc(descriptor)
  69. upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
  70. defer upload.Release(watcher)
  71. uploads = append(uploads, upload.(*uploadTransfer))
  72. dedupDescriptors[key] = upload.(*uploadTransfer)
  73. }
  74. for _, upload := range uploads {
  75. select {
  76. case <-ctx.Done():
  77. return ctx.Err()
  78. case <-upload.Transfer.Done():
  79. if upload.err != nil {
  80. return upload.err
  81. }
  82. }
  83. }
  84. for _, l := range layers {
  85. l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
  86. }
  87. return nil
  88. }
  89. func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
  90. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  91. u := &uploadTransfer{
  92. Transfer: NewTransfer(),
  93. }
  94. go func() {
  95. defer func() {
  96. close(progressChan)
  97. }()
  98. progressOutput := progress.ChanOutput(progressChan)
  99. select {
  100. case <-start:
  101. default:
  102. progress.Update(progressOutput, descriptor.ID(), "Waiting")
  103. <-start
  104. }
  105. retries := 0
  106. for {
  107. remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
  108. if err == nil {
  109. u.remoteDescriptor = remoteDescriptor
  110. break
  111. }
  112. // If an error was returned because the context
  113. // was cancelled, we shouldn't retry.
  114. select {
  115. case <-u.Transfer.Context().Done():
  116. u.err = err
  117. return
  118. default:
  119. }
  120. retries++
  121. if _, isDNR := err.(DoNotRetry); isDNR || retries == maxUploadAttempts {
  122. logrus.Errorf("Upload failed: %v", err)
  123. u.err = err
  124. return
  125. }
  126. logrus.Errorf("Upload failed, retrying: %v", err)
  127. delay := retries * 5
  128. ticker := time.NewTicker(lum.waitDuration)
  129. selectLoop:
  130. for {
  131. progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
  132. select {
  133. case <-ticker.C:
  134. delay--
  135. if delay == 0 {
  136. ticker.Stop()
  137. break selectLoop
  138. }
  139. case <-u.Transfer.Context().Done():
  140. ticker.Stop()
  141. u.err = errors.New("upload cancelled during retry delay")
  142. return
  143. }
  144. }
  145. }
  146. }()
  147. return u
  148. }
  149. }