progress.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package containerd
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/containerd/containerd/content"
  8. cerrdefs "github.com/containerd/containerd/errdefs"
  9. "github.com/containerd/containerd/remotes"
  10. "github.com/docker/docker/pkg/progress"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/opencontainers/go-digest"
  13. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  14. "github.com/sirupsen/logrus"
  15. )
  16. type progressUpdater interface {
  17. UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
  18. }
  19. type jobs struct {
  20. descs map[digest.Digest]ocispec.Descriptor
  21. mu sync.Mutex
  22. }
  23. // newJobs creates a new instance of the job status tracker
  24. func newJobs() *jobs {
  25. return &jobs{
  26. descs: map[digest.Digest]ocispec.Descriptor{},
  27. }
  28. }
  29. func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
  30. ctx, cancelProgress := context.WithCancel(ctx)
  31. start := time.Now()
  32. go func() {
  33. ticker := time.NewTicker(100 * time.Millisecond)
  34. defer ticker.Stop()
  35. for {
  36. select {
  37. case <-ticker.C:
  38. if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
  39. if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
  40. logrus.WithError(err).Error("Updating progress failed")
  41. }
  42. return
  43. }
  44. case <-ctx.Done():
  45. return
  46. }
  47. }
  48. }()
  49. return cancelProgress
  50. }
  51. // Add adds a descriptor to be tracked
  52. func (j *jobs) Add(desc ocispec.Descriptor) {
  53. j.mu.Lock()
  54. defer j.mu.Unlock()
  55. if _, ok := j.descs[desc.Digest]; ok {
  56. return
  57. }
  58. j.descs[desc.Digest] = desc
  59. }
  60. // Remove removes a descriptor
  61. func (j *jobs) Remove(desc ocispec.Descriptor) {
  62. j.mu.Lock()
  63. defer j.mu.Unlock()
  64. delete(j.descs, desc.Digest)
  65. }
  66. // Jobs returns a list of all tracked descriptors
  67. func (j *jobs) Jobs() []ocispec.Descriptor {
  68. j.mu.Lock()
  69. defer j.mu.Unlock()
  70. descs := make([]ocispec.Descriptor, 0, len(j.descs))
  71. for _, d := range j.descs {
  72. descs = append(descs, d)
  73. }
  74. return descs
  75. }
  76. type pullProgress struct {
  77. Store content.Store
  78. ShowExists bool
  79. }
  80. func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
  81. actives, err := p.Store.ListStatuses(ctx, "")
  82. if err != nil {
  83. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  84. return err
  85. }
  86. logrus.WithError(err).Error("status check failed")
  87. return nil
  88. }
  89. pulling := make(map[string]content.Status, len(actives))
  90. // update status of status entries!
  91. for _, status := range actives {
  92. pulling[status.Ref] = status
  93. }
  94. for _, j := range ongoing.Jobs() {
  95. key := remotes.MakeRefKey(ctx, j)
  96. if info, ok := pulling[key]; ok {
  97. out.WriteProgress(progress.Progress{
  98. ID: stringid.TruncateID(j.Digest.Encoded()),
  99. Action: "Downloading",
  100. Current: info.Offset,
  101. Total: info.Total,
  102. })
  103. continue
  104. }
  105. info, err := p.Store.Info(ctx, j.Digest)
  106. if err != nil {
  107. if !cerrdefs.IsNotFound(err) {
  108. return err
  109. }
  110. } else if info.CreatedAt.After(start) {
  111. out.WriteProgress(progress.Progress{
  112. ID: stringid.TruncateID(j.Digest.Encoded()),
  113. Action: "Download complete",
  114. HideCounts: true,
  115. LastUpdate: true,
  116. })
  117. ongoing.Remove(j)
  118. } else if p.ShowExists {
  119. out.WriteProgress(progress.Progress{
  120. ID: stringid.TruncateID(j.Digest.Encoded()),
  121. Action: "Exists",
  122. HideCounts: true,
  123. LastUpdate: true,
  124. })
  125. ongoing.Remove(j)
  126. }
  127. }
  128. return nil
  129. }