channel.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package events
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // Channel provides a sink that can be listened on. The writer and channel
  7. // listener must operate in separate goroutines.
  8. //
  9. // Consumers should listen on Channel.C until Closed is closed.
  10. type Channel struct {
  11. C chan Event
  12. closed chan struct{}
  13. once sync.Once
  14. }
  15. // NewChannel returns a channel. If buffer is zero, the channel is
  16. // unbuffered.
  17. func NewChannel(buffer int) *Channel {
  18. return &Channel{
  19. C: make(chan Event, buffer),
  20. closed: make(chan struct{}),
  21. }
  22. }
  23. // Done returns a channel that will always proceed once the sink is closed.
  24. func (ch *Channel) Done() chan struct{} {
  25. return ch.closed
  26. }
  27. // Write the event to the channel. Must be called in a separate goroutine from
  28. // the listener.
  29. func (ch *Channel) Write(event Event) error {
  30. select {
  31. case ch.C <- event:
  32. return nil
  33. case <-ch.closed:
  34. return ErrSinkClosed
  35. }
  36. }
  37. // Close the channel sink.
  38. func (ch *Channel) Close() error {
  39. ch.once.Do(func() {
  40. close(ch.closed)
  41. })
  42. return nil
  43. }
  44. func (ch Channel) String() string {
  45. // Serialize a copy of the Channel that doesn't contain the sync.Once,
  46. // to avoid a data race.
  47. ch2 := map[string]interface{}{
  48. "C": ch.C,
  49. "closed": ch.closed,
  50. }
  51. return fmt.Sprint(ch2)
  52. }