progress.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package containerd
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/containerd/containerd/content"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/remotes"
  10. "github.com/docker/docker/pkg/progress"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/opencontainers/go-digest"
  13. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  14. "github.com/sirupsen/logrus"
  15. )
  16. type progressUpdater interface {
  17. UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
  18. }
  19. type jobs struct {
  20. descs map[digest.Digest]ocispec.Descriptor
  21. mu sync.Mutex
  22. }
  23. // newJobs creates a new instance of the job status tracker
  24. func newJobs() *jobs {
  25. return &jobs{
  26. descs: map[digest.Digest]ocispec.Descriptor{},
  27. }
  28. }
  29. func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
  30. ctx, cancelProgress := context.WithCancel(ctx)
  31. start := time.Now()
  32. lastUpdate := make(chan struct{})
  33. go func() {
  34. ticker := time.NewTicker(100 * time.Millisecond)
  35. defer ticker.Stop()
  36. for {
  37. select {
  38. case <-ticker.C:
  39. if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
  40. if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
  41. logrus.WithError(err).Error("Updating progress failed")
  42. }
  43. }
  44. case <-ctx.Done():
  45. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
  46. defer cancel()
  47. updater.UpdateProgress(ctx, j, out, start)
  48. close(lastUpdate)
  49. return
  50. }
  51. }
  52. }()
  53. return func() {
  54. cancelProgress()
  55. // Wait for the last update to finish.
  56. // UpdateProgress may still write progress to output and we need
  57. // to keep the caller from closing it before we finish.
  58. <-lastUpdate
  59. }
  60. }
  61. // Add adds a descriptor to be tracked
  62. func (j *jobs) Add(desc ocispec.Descriptor) {
  63. j.mu.Lock()
  64. defer j.mu.Unlock()
  65. if _, ok := j.descs[desc.Digest]; ok {
  66. return
  67. }
  68. j.descs[desc.Digest] = desc
  69. }
  70. // Remove removes a descriptor
  71. func (j *jobs) Remove(desc ocispec.Descriptor) {
  72. j.mu.Lock()
  73. defer j.mu.Unlock()
  74. delete(j.descs, desc.Digest)
  75. }
  76. // Jobs returns a list of all tracked descriptors
  77. func (j *jobs) Jobs() []ocispec.Descriptor {
  78. j.mu.Lock()
  79. defer j.mu.Unlock()
  80. descs := make([]ocispec.Descriptor, 0, len(j.descs))
  81. for _, d := range j.descs {
  82. descs = append(descs, d)
  83. }
  84. return descs
  85. }
  86. type pullProgress struct {
  87. Store content.Store
  88. ShowExists bool
  89. }
  90. func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
  91. actives, err := p.Store.ListStatuses(ctx, "")
  92. if err != nil {
  93. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  94. return err
  95. }
  96. logrus.WithError(err).Error("status check failed")
  97. return nil
  98. }
  99. pulling := make(map[string]content.Status, len(actives))
  100. // update status of status entries!
  101. for _, status := range actives {
  102. pulling[status.Ref] = status
  103. }
  104. for _, j := range ongoing.Jobs() {
  105. key := remotes.MakeRefKey(ctx, j)
  106. if info, ok := pulling[key]; ok {
  107. out.WriteProgress(progress.Progress{
  108. ID: stringid.TruncateID(j.Digest.Encoded()),
  109. Action: "Downloading",
  110. Current: info.Offset,
  111. Total: info.Total,
  112. })
  113. continue
  114. }
  115. info, err := p.Store.Info(ctx, j.Digest)
  116. if err != nil {
  117. if !cerrdefs.IsNotFound(err) {
  118. return err
  119. }
  120. } else if info.CreatedAt.After(start) {
  121. out.WriteProgress(progress.Progress{
  122. ID: stringid.TruncateID(j.Digest.Encoded()),
  123. Action: "Download complete",
  124. HideCounts: true,
  125. LastUpdate: true,
  126. })
  127. ongoing.Remove(j)
  128. } else if p.ShowExists {
  129. out.WriteProgress(progress.Progress{
  130. ID: stringid.TruncateID(j.Digest.Encoded()),
  131. Action: "Exists",
  132. HideCounts: true,
  133. LastUpdate: true,
  134. })
  135. ongoing.Remove(j)
  136. }
  137. }
  138. return nil
  139. }