progressstatus.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package progressreader
  2. import (
  3. "bytes"
  4. "io"
  5. "sync"
  6. "github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus"
  7. )
  8. type ProgressStatus struct {
  9. sync.Mutex
  10. c chan struct{}
  11. observers []io.Writer
  12. history bytes.Buffer
  13. }
  14. func NewProgressStatus() *ProgressStatus {
  15. return &ProgressStatus{
  16. c: make(chan struct{}),
  17. observers: []io.Writer{},
  18. }
  19. }
  20. func (ps *ProgressStatus) Write(p []byte) (n int, err error) {
  21. ps.Lock()
  22. defer ps.Unlock()
  23. ps.history.Write(p)
  24. for _, w := range ps.observers {
  25. // copy paste from MultiWriter, replaced return with continue
  26. n, err = w.Write(p)
  27. if err != nil {
  28. continue
  29. }
  30. if n != len(p) {
  31. err = io.ErrShortWrite
  32. continue
  33. }
  34. }
  35. return len(p), nil
  36. }
  37. func (ps *ProgressStatus) AddObserver(w io.Writer) {
  38. ps.Lock()
  39. defer ps.Unlock()
  40. w.Write(ps.history.Bytes())
  41. ps.observers = append(ps.observers, w)
  42. }
  43. func (ps *ProgressStatus) Done() {
  44. ps.Lock()
  45. close(ps.c)
  46. ps.history.Reset()
  47. ps.Unlock()
  48. }
  49. func (ps *ProgressStatus) Wait(w io.Writer, msg []byte) error {
  50. ps.Lock()
  51. channel := ps.c
  52. ps.Unlock()
  53. if channel == nil {
  54. // defensive
  55. logrus.Debugf("Channel is nil ")
  56. }
  57. if w != nil {
  58. w.Write(msg)
  59. ps.AddObserver(w)
  60. }
  61. <-channel
  62. return nil
  63. }