broadcast.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package events
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/Sirupsen/logrus"
  6. )
  7. // Broadcaster sends events to multiple, reliable Sinks. The goal of this
  8. // component is to dispatch events to configured endpoints. Reliability can be
  9. // provided by wrapping incoming sinks.
  10. type Broadcaster struct {
  11. sinks []Sink
  12. events chan Event
  13. adds chan configureRequest
  14. removes chan configureRequest
  15. shutdown chan struct{}
  16. closed chan struct{}
  17. once sync.Once
  18. }
  19. // NewBroadcaster appends one or more sinks to the list of sinks. The
  20. // broadcaster behavior will be affected by the properties of the sink.
  21. // Generally, the sink should accept all messages and deal with reliability on
  22. // its own. Use of EventQueue and RetryingSink should be used here.
  23. func NewBroadcaster(sinks ...Sink) *Broadcaster {
  24. b := Broadcaster{
  25. sinks: sinks,
  26. events: make(chan Event),
  27. adds: make(chan configureRequest),
  28. removes: make(chan configureRequest),
  29. shutdown: make(chan struct{}),
  30. closed: make(chan struct{}),
  31. }
  32. // Start the broadcaster
  33. go b.run()
  34. return &b
  35. }
  36. // Write accepts an event to be dispatched to all sinks. This method will never
  37. // fail and should never block (hopefully!). The caller cedes the memory to the
  38. // broadcaster and should not modify it after calling write.
  39. func (b *Broadcaster) Write(event Event) error {
  40. select {
  41. case b.events <- event:
  42. case <-b.closed:
  43. return ErrSinkClosed
  44. }
  45. return nil
  46. }
  47. // Add the sink to the broadcaster.
  48. //
  49. // The provided sink must be comparable with equality. Typically, this just
  50. // works with a regular pointer type.
  51. func (b *Broadcaster) Add(sink Sink) error {
  52. return b.configure(b.adds, sink)
  53. }
  54. // Remove the provided sink.
  55. func (b *Broadcaster) Remove(sink Sink) error {
  56. return b.configure(b.removes, sink)
  57. }
  58. type configureRequest struct {
  59. sink Sink
  60. response chan error
  61. }
  62. func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
  63. response := make(chan error, 1)
  64. for {
  65. select {
  66. case ch <- configureRequest{
  67. sink: sink,
  68. response: response}:
  69. ch = nil
  70. case err := <-response:
  71. return err
  72. case <-b.closed:
  73. return ErrSinkClosed
  74. }
  75. }
  76. }
  77. // Close the broadcaster, ensuring that all messages are flushed to the
  78. // underlying sink before returning.
  79. func (b *Broadcaster) Close() error {
  80. b.once.Do(func() {
  81. close(b.shutdown)
  82. })
  83. <-b.closed
  84. return nil
  85. }
  86. // run is the main broadcast loop, started when the broadcaster is created.
  87. // Under normal conditions, it waits for events on the event channel. After
  88. // Close is called, this goroutine will exit.
  89. func (b *Broadcaster) run() {
  90. defer close(b.closed)
  91. remove := func(target Sink) {
  92. for i, sink := range b.sinks {
  93. if sink == target {
  94. b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
  95. break
  96. }
  97. }
  98. }
  99. for {
  100. select {
  101. case event := <-b.events:
  102. for _, sink := range b.sinks {
  103. if err := sink.Write(event); err != nil {
  104. if err == ErrSinkClosed {
  105. // remove closed sinks
  106. remove(sink)
  107. continue
  108. }
  109. logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
  110. Errorf("broadcaster: dropping event")
  111. }
  112. }
  113. case request := <-b.adds:
  114. // while we have to iterate for add/remove, common iteration for
  115. // send is faster against slice.
  116. var found bool
  117. for _, sink := range b.sinks {
  118. if request.sink == sink {
  119. found = true
  120. break
  121. }
  122. }
  123. if !found {
  124. b.sinks = append(b.sinks, request.sink)
  125. }
  126. // b.sinks[request.sink] = struct{}{}
  127. request.response <- nil
  128. case request := <-b.removes:
  129. remove(request.sink)
  130. request.response <- nil
  131. case <-b.shutdown:
  132. // close all the underlying sinks
  133. for _, sink := range b.sinks {
  134. if err := sink.Close(); err != nil && err != ErrSinkClosed {
  135. logrus.WithField("events.sink", sink).WithError(err).
  136. Errorf("broadcaster: closing sink failed")
  137. }
  138. }
  139. return
  140. }
  141. }
  142. }
  143. func (b Broadcaster) String() string {
  144. // Serialize copy of this broadcaster without the sync.Once, to avoid
  145. // a data race.
  146. b2 := map[string]interface{}{
  147. "sinks": b.sinks,
  148. "events": b.events,
  149. "adds": b.adds,
  150. "removes": b.removes,
  151. "shutdown": b.shutdown,
  152. "closed": b.closed,
  153. }
  154. return fmt.Sprint(b2)
  155. }