12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package events
- import (
- "fmt"
- "sync"
- )
- // Channel provides a sink that can be listened on. The writer and channel
- // listener must operate in separate goroutines.
- //
- // Consumers should listen on Channel.C until Closed is closed.
- type Channel struct {
- C chan Event
- closed chan struct{}
- once sync.Once
- }
- // NewChannel returns a channel. If buffer is zero, the channel is
- // unbuffered.
- func NewChannel(buffer int) *Channel {
- return &Channel{
- C: make(chan Event, buffer),
- closed: make(chan struct{}),
- }
- }
- // Done returns a channel that will always proceed once the sink is closed.
- func (ch *Channel) Done() chan struct{} {
- return ch.closed
- }
- // Write the event to the channel. Must be called in a separate goroutine from
- // the listener.
- func (ch *Channel) Write(event Event) error {
- select {
- case ch.C <- event:
- return nil
- case <-ch.closed:
- return ErrSinkClosed
- }
- }
- // Close the channel sink.
- func (ch *Channel) Close() error {
- ch.once.Do(func() {
- close(ch.closed)
- })
- return nil
- }
- func (ch Channel) String() string {
- // Serialize a copy of the Channel that doesn't contain the sync.Once,
- // to avoid a data race.
- ch2 := map[string]interface{}{
- "C": ch.C,
- "closed": ch.closed,
- }
- return fmt.Sprint(ch2)
- }
|