reporter.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package agent
  2. import (
  3. "reflect"
  4. "sync"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/log"
  7. "golang.org/x/net/context"
  8. )
  9. // StatusReporter receives updates to task status. Method may be called
  10. // concurrently, so implementations should be goroutine-safe.
  11. type StatusReporter interface {
  12. UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error
  13. }
  14. type statusReporterFunc func(ctx context.Context, taskID string, status *api.TaskStatus) error
  15. func (fn statusReporterFunc) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
  16. return fn(ctx, taskID, status)
  17. }
  18. // statusReporter creates a reliable StatusReporter that will always succeed.
  19. // It handles several tasks at once, ensuring all statuses are reported.
  20. //
  21. // The reporter will continue reporting the current status until it succeeds.
  22. type statusReporter struct {
  23. reporter StatusReporter
  24. statuses map[string]*api.TaskStatus
  25. mu sync.Mutex
  26. cond sync.Cond
  27. closed bool
  28. }
  29. func newStatusReporter(ctx context.Context, upstream StatusReporter) *statusReporter {
  30. r := &statusReporter{
  31. reporter: upstream,
  32. statuses: make(map[string]*api.TaskStatus),
  33. }
  34. r.cond.L = &r.mu
  35. go r.run(ctx)
  36. return r
  37. }
  38. func (sr *statusReporter) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
  39. sr.mu.Lock()
  40. defer sr.mu.Unlock()
  41. current, ok := sr.statuses[taskID]
  42. if ok {
  43. if reflect.DeepEqual(current, status) {
  44. return nil
  45. }
  46. if current.State > status.State {
  47. return nil // ignore old updates
  48. }
  49. }
  50. sr.statuses[taskID] = status
  51. sr.cond.Signal()
  52. return nil
  53. }
  54. func (sr *statusReporter) Close() error {
  55. sr.mu.Lock()
  56. defer sr.mu.Unlock()
  57. sr.closed = true
  58. sr.cond.Signal()
  59. return nil
  60. }
  61. func (sr *statusReporter) run(ctx context.Context) {
  62. done := make(chan struct{})
  63. defer close(done)
  64. sr.mu.Lock() // released during wait, below.
  65. defer sr.mu.Unlock()
  66. go func() {
  67. select {
  68. case <-ctx.Done():
  69. sr.Close()
  70. case <-done:
  71. return
  72. }
  73. }()
  74. for {
  75. if len(sr.statuses) == 0 {
  76. sr.cond.Wait()
  77. }
  78. if sr.closed {
  79. // TODO(stevvooe): Add support here for waiting until all
  80. // statuses are flushed before shutting down.
  81. return
  82. }
  83. for taskID, status := range sr.statuses {
  84. delete(sr.statuses, taskID) // delete the entry, while trying to send.
  85. sr.mu.Unlock()
  86. err := sr.reporter.UpdateTaskStatus(ctx, taskID, status)
  87. sr.mu.Lock()
  88. // reporter might be closed during UpdateTaskStatus call
  89. if sr.closed {
  90. return
  91. }
  92. if err != nil {
  93. log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")
  94. // place it back in the map, if not there, allowing us to pick
  95. // the value if a new one came in when we were sending the last
  96. // update.
  97. if _, ok := sr.statuses[taskID]; !ok {
  98. sr.statuses[taskID] = status
  99. }
  100. }
  101. }
  102. }
  103. }