progress.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package containerd
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/containerd/containerd/content"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/images"
  10. "github.com/containerd/containerd/remotes"
  11. "github.com/containerd/containerd/remotes/docker"
  12. "github.com/containerd/log"
  13. "github.com/distribution/reference"
  14. "github.com/docker/docker/internal/compatcontext"
  15. "github.com/docker/docker/pkg/progress"
  16. "github.com/docker/docker/pkg/stringid"
  17. "github.com/opencontainers/go-digest"
  18. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  19. )
  20. type progressUpdater interface {
  21. UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
  22. }
  23. type jobs struct {
  24. descs map[digest.Digest]ocispec.Descriptor
  25. mu sync.Mutex
  26. }
  27. // newJobs creates a new instance of the job status tracker
  28. func newJobs() *jobs {
  29. return &jobs{
  30. descs: map[digest.Digest]ocispec.Descriptor{},
  31. }
  32. }
  33. func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
  34. ctx, cancelProgress := context.WithCancel(ctx)
  35. start := time.Now()
  36. lastUpdate := make(chan struct{})
  37. go func() {
  38. ticker := time.NewTicker(100 * time.Millisecond)
  39. defer ticker.Stop()
  40. for {
  41. select {
  42. case <-ticker.C:
  43. if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
  44. if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
  45. log.G(ctx).WithError(err).Error("Updating progress failed")
  46. }
  47. }
  48. case <-ctx.Done():
  49. ctx, cancel := context.WithTimeout(compatcontext.WithoutCancel(ctx), time.Millisecond*500)
  50. defer cancel()
  51. updater.UpdateProgress(ctx, j, out, start)
  52. close(lastUpdate)
  53. return
  54. }
  55. }
  56. }()
  57. return func() {
  58. cancelProgress()
  59. // Wait for the last update to finish.
  60. // UpdateProgress may still write progress to output and we need
  61. // to keep the caller from closing it before we finish.
  62. <-lastUpdate
  63. }
  64. }
  65. // Add adds a descriptor to be tracked
  66. func (j *jobs) Add(desc ...ocispec.Descriptor) {
  67. j.mu.Lock()
  68. defer j.mu.Unlock()
  69. for _, d := range desc {
  70. if _, ok := j.descs[d.Digest]; ok {
  71. continue
  72. }
  73. j.descs[d.Digest] = d
  74. }
  75. }
  76. // Remove removes a descriptor
  77. func (j *jobs) Remove(desc ocispec.Descriptor) {
  78. j.mu.Lock()
  79. defer j.mu.Unlock()
  80. delete(j.descs, desc.Digest)
  81. }
  82. // Jobs returns a list of all tracked descriptors
  83. func (j *jobs) Jobs() []ocispec.Descriptor {
  84. j.mu.Lock()
  85. defer j.mu.Unlock()
  86. descs := make([]ocispec.Descriptor, 0, len(j.descs))
  87. for _, d := range j.descs {
  88. descs = append(descs, d)
  89. }
  90. return descs
  91. }
  92. type pullProgress struct {
  93. store content.Store
  94. showExists bool
  95. hideLayers bool
  96. }
  97. func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
  98. actives, err := p.store.ListStatuses(ctx, "")
  99. if err != nil {
  100. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  101. return err
  102. }
  103. log.G(ctx).WithError(err).Error("status check failed")
  104. return nil
  105. }
  106. pulling := make(map[string]content.Status, len(actives))
  107. // update status of status entries!
  108. for _, status := range actives {
  109. pulling[status.Ref] = status
  110. }
  111. for _, j := range ongoing.Jobs() {
  112. if p.hideLayers {
  113. ongoing.Remove(j)
  114. continue
  115. }
  116. key := remotes.MakeRefKey(ctx, j)
  117. if info, ok := pulling[key]; ok {
  118. out.WriteProgress(progress.Progress{
  119. ID: stringid.TruncateID(j.Digest.Encoded()),
  120. Action: "Downloading",
  121. Current: info.Offset,
  122. Total: info.Total,
  123. })
  124. continue
  125. }
  126. info, err := p.store.Info(ctx, j.Digest)
  127. if err != nil {
  128. if !cerrdefs.IsNotFound(err) {
  129. return err
  130. }
  131. } else if info.CreatedAt.After(start) {
  132. out.WriteProgress(progress.Progress{
  133. ID: stringid.TruncateID(j.Digest.Encoded()),
  134. Action: "Download complete",
  135. HideCounts: true,
  136. LastUpdate: true,
  137. })
  138. ongoing.Remove(j)
  139. } else if p.showExists {
  140. out.WriteProgress(progress.Progress{
  141. ID: stringid.TruncateID(j.Digest.Encoded()),
  142. Action: "Already exists",
  143. HideCounts: true,
  144. LastUpdate: true,
  145. })
  146. ongoing.Remove(j)
  147. }
  148. }
  149. return nil
  150. }
  151. type pushProgress struct {
  152. Tracker docker.StatusTracker
  153. }
  154. func (p *pushProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
  155. for _, j := range ongoing.Jobs() {
  156. key := remotes.MakeRefKey(ctx, j)
  157. id := stringid.TruncateID(j.Digest.Encoded())
  158. status, err := p.Tracker.GetStatus(key)
  159. if err != nil {
  160. if cerrdefs.IsNotFound(err) {
  161. progress.Update(out, id, "Waiting")
  162. continue
  163. }
  164. }
  165. if status.Committed && status.Offset >= status.Total {
  166. if status.MountedFrom != "" {
  167. from := status.MountedFrom
  168. if ref, err := reference.ParseNormalizedNamed(from); err == nil {
  169. from = reference.Path(ref)
  170. }
  171. progress.Update(out, id, "Mounted from "+from)
  172. } else if status.Exists {
  173. if images.IsLayerType(j.MediaType) {
  174. progress.Update(out, id, "Layer already exists")
  175. } else {
  176. progress.Update(out, id, "Already exists")
  177. }
  178. } else {
  179. progress.Update(out, id, "Pushed")
  180. }
  181. ongoing.Remove(j)
  182. continue
  183. }
  184. out.WriteProgress(progress.Progress{
  185. ID: id,
  186. Action: "Pushing",
  187. Current: status.Offset,
  188. Total: status.Total,
  189. })
  190. }
  191. return nil
  192. }
  193. type combinedProgress []progressUpdater
  194. func (combined combinedProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
  195. for _, p := range combined {
  196. err := p.UpdateProgress(ctx, ongoing, out, start)
  197. if err != nil {
  198. return err
  199. }
  200. }
  201. return nil
  202. }