123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package watch
- import (
- "sync"
- "github.com/docker/go-events"
- )
- // dropErrClosed is a sink that suppresses ErrSinkClosed from Write, to avoid
- // debug log messages that may be confusing. It is possible that the queue
- // will try to write an event to its destination channel while the queue is
- // being removed from the broadcaster. Since the channel is closed before the
- // queue, there is a narrow window when this is possible. In some event-based
- // dropping events when a sink is removed from a broadcaster is a problem, but
- // for the usage in this watch package that's the expected behavior.
- type dropErrClosed struct {
- sink events.Sink
- }
- func (s dropErrClosed) Write(event events.Event) error {
- err := s.sink.Write(event)
- if err == events.ErrSinkClosed {
- return nil
- }
- return err
- }
- func (s dropErrClosed) Close() error {
- return s.sink.Close()
- }
- // Queue is the structure used to publish events and watch for them.
- type Queue struct {
- mu sync.Mutex
- broadcast *events.Broadcaster
- cancelFuncs map[*events.Channel]func()
- }
- // NewQueue creates a new publish/subscribe queue which supports watchers.
- // The channels that it will create for subscriptions will have the buffer
- // size specified by buffer.
- func NewQueue() *Queue {
- return &Queue{
- broadcast: events.NewBroadcaster(),
- cancelFuncs: make(map[*events.Channel]func()),
- }
- }
- // Watch returns a channel which will receive all items published to the
- // queue from this point, until cancel is called.
- func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
- return q.CallbackWatch(nil)
- }
- // CallbackWatch returns a channel which will receive all events published to
- // the queue from this point that pass the check in the provided callback
- // function. The returned cancel function will stop the flow of events and
- // close the channel.
- func (q *Queue) CallbackWatch(matcher events.Matcher) (eventq chan events.Event, cancel func()) {
- ch := events.NewChannel(0)
- sink := events.Sink(events.NewQueue(dropErrClosed{sink: ch}))
- if matcher != nil {
- sink = events.NewFilter(sink, matcher)
- }
- q.broadcast.Add(sink)
- cancelFunc := func() {
- q.broadcast.Remove(sink)
- ch.Close()
- sink.Close()
- }
- q.mu.Lock()
- q.cancelFuncs[ch] = cancelFunc
- q.mu.Unlock()
- return ch.C, func() {
- q.mu.Lock()
- cancelFunc := q.cancelFuncs[ch]
- delete(q.cancelFuncs, ch)
- q.mu.Unlock()
- if cancelFunc != nil {
- cancelFunc()
- }
- }
- }
- // Publish adds an item to the queue.
- func (q *Queue) Publish(item events.Event) {
- q.broadcast.Write(item)
- }
- // Close closes the queue and frees the associated resources.
- func (q *Queue) Close() error {
- // Make sure all watchers have been closed to avoid a deadlock when
- // closing the broadcaster.
- q.mu.Lock()
- for _, cancelFunc := range q.cancelFuncs {
- cancelFunc()
- }
- q.cancelFuncs = make(map[*events.Channel]func())
- q.mu.Unlock()
- return q.broadcast.Close()
- }
|