queue.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package events
  2. import (
  3. "container/list"
  4. "sync"
  5. "github.com/Sirupsen/logrus"
  6. )
  7. // Queue accepts all messages into a queue for asynchronous consumption
  8. // by a sink. It is unbounded and thread safe but the sink must be reliable or
  9. // events will be dropped.
  10. type Queue struct {
  11. dst Sink
  12. events *list.List
  13. cond *sync.Cond
  14. mu sync.Mutex
  15. closed bool
  16. }
  17. // NewQueue returns a queue to the provided Sink dst.
  18. func NewQueue(dst Sink) *Queue {
  19. eq := Queue{
  20. dst: dst,
  21. events: list.New(),
  22. }
  23. eq.cond = sync.NewCond(&eq.mu)
  24. go eq.run()
  25. return &eq
  26. }
  27. // Write accepts the events into the queue, only failing if the queue has
  28. // beend closed.
  29. func (eq *Queue) Write(event Event) error {
  30. eq.mu.Lock()
  31. defer eq.mu.Unlock()
  32. if eq.closed {
  33. return ErrSinkClosed
  34. }
  35. eq.events.PushBack(event)
  36. eq.cond.Signal() // signal waiters
  37. return nil
  38. }
  39. // Close shutsdown the event queue, flushing
  40. func (eq *Queue) Close() error {
  41. eq.mu.Lock()
  42. defer eq.mu.Unlock()
  43. if eq.closed {
  44. return nil
  45. }
  46. // set closed flag
  47. eq.closed = true
  48. eq.cond.Signal() // signal flushes queue
  49. eq.cond.Wait() // wait for signal from last flush
  50. return eq.dst.Close()
  51. }
  52. // run is the main goroutine to flush events to the target sink.
  53. func (eq *Queue) run() {
  54. for {
  55. event := eq.next()
  56. if event == nil {
  57. return // nil block means event queue is closed.
  58. }
  59. if err := eq.dst.Write(event); err != nil {
  60. // TODO(aaronl): Dropping events could be bad depending
  61. // on the application. We should have a way of
  62. // communicating this condition. However, logging
  63. // at a log level above debug may not be appropriate.
  64. // Eventually, go-events should not use logrus at all,
  65. // and should bubble up conditions like this through
  66. // error values.
  67. logrus.WithFields(logrus.Fields{
  68. "event": event,
  69. "sink": eq.dst,
  70. }).WithError(err).Debug("eventqueue: dropped event")
  71. }
  72. }
  73. }
  74. // next encompasses the critical section of the run loop. When the queue is
  75. // empty, it will block on the condition. If new data arrives, it will wake
  76. // and return a block. When closed, a nil slice will be returned.
  77. func (eq *Queue) next() Event {
  78. eq.mu.Lock()
  79. defer eq.mu.Unlock()
  80. for eq.events.Len() < 1 {
  81. if eq.closed {
  82. eq.cond.Broadcast()
  83. return nil
  84. }
  85. eq.cond.Wait()
  86. }
  87. front := eq.events.Front()
  88. block := front.Value.(Event)
  89. eq.events.Remove(front)
  90. return block
  91. }