watch.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package watch
  2. import (
  3. "sync"
  4. "github.com/docker/go-events"
  5. )
  6. // dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
  7. // debug log messages that may be confusing. It is possible that the queue
  8. // will try to write an event to its destination channel while the queue is
  9. // being removed from the broadcaster. Since the channel is closed before the
  10. // queue, there is a narrow window when this is possible. In some event-based
  11. // dropping events when a sink is removed from a broadcaster is a problem, but
  12. // for the usage in this watch package that's the expected behavior.
  13. type dropErrClosed struct {
  14. sink events.Sink
  15. }
  16. func (s dropErrClosed) Write(event events.Event) error {
  17. err := s.sink.Write(event)
  18. if err == events.ErrSinkClosed {
  19. return nil
  20. }
  21. return err
  22. }
  23. func (s dropErrClosed) Close() error {
  24. return s.sink.Close()
  25. }
  26. // Queue is the structure used to publish events and watch for them.
  27. type Queue struct {
  28. mu sync.Mutex
  29. broadcast *events.Broadcaster
  30. cancelFuncs map[*events.Channel]func()
  31. }
  32. // NewQueue creates a new publish/subscribe queue which supports watchers.
  33. // The channels that it will create for subscriptions will have the buffer
  34. // size specified by buffer.
  35. func NewQueue() *Queue {
  36. return &Queue{
  37. broadcast: events.NewBroadcaster(),
  38. cancelFuncs: make(map[*events.Channel]func()),
  39. }
  40. }
  41. // Watch returns a channel which will receive all items published to the
  42. // queue from this point, until cancel is called.
  43. func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
  44. return q.CallbackWatch(nil)
  45. }
  46. // CallbackWatch returns a channel which will receive all events published to
  47. // the queue from this point that pass the check in the provided callback
  48. // function. The returned cancel function will stop the flow of events and
  49. // close the channel.
  50. func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func()) {
  51. ch := events.NewChannel(0)
  52. sink := events.Sink(events.NewQueue(dropErrClosed{sink: ch}))
  53. if matcher != nil {
  54. sink = events.NewFilter(sink, matcher)
  55. }
  56. q.broadcast.Add(sink)
  57. cancelFunc := func() {
  58. q.broadcast.Remove(sink)
  59. ch.Close()
  60. sink.Close()
  61. }
  62. q.mu.Lock()
  63. q.cancelFuncs[ch] = cancelFunc
  64. q.mu.Unlock()
  65. return ch.C, func() {
  66. q.mu.Lock()
  67. cancelFunc := q.cancelFuncs[ch]
  68. delete(q.cancelFuncs, ch)
  69. q.mu.Unlock()
  70. if cancelFunc != nil {
  71. cancelFunc()
  72. }
  73. }
  74. }
  75. // Publish adds an item to the queue.
  76. func (q *Queue) Publish(item events.Event) {
  77. q.broadcast.Write(item)
  78. }
  79. // Close closes the queue and frees the associated resources.
  80. func (q *Queue) Close() error {
  81. // Make sure all watchers have been closed to avoid a deadlock when
  82. // closing the broadcaster.
  83. q.mu.Lock()
  84. for _, cancelFunc := range q.cancelFuncs {
  85. cancelFunc()
  86. }
  87. q.cancelFuncs = make(map[*events.Channel]func())
  88. q.mu.Unlock()
  89. return q.broadcast.Close()
  90. }