publisher.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package pubsub
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
  7. // NewPublisher creates a new pub/sub publisher to broadcast messages.
  8. // The duration is used as the send timeout as to not block the publisher publishing
  9. // messages to other clients if one client is slow or unresponsive.
  10. // The buffer is used when creating new channels for subscribers.
  11. func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
  12. return &Publisher{
  13. buffer: buffer,
  14. timeout: publishTimeout,
  15. subscribers: make(map[subscriber]topicFunc),
  16. }
  17. }
  18. type subscriber chan interface{}
  19. type topicFunc func(v interface{}) bool
  20. // Publisher is basic pub/sub structure. Allows to send events and subscribe
  21. // to them. Can be safely used from multiple goroutines.
  22. type Publisher struct {
  23. m sync.RWMutex
  24. buffer int
  25. timeout time.Duration
  26. subscribers map[subscriber]topicFunc
  27. }
  28. // Len returns the number of subscribers for the publisher
  29. func (p *Publisher) Len() int {
  30. p.m.RLock()
  31. i := len(p.subscribers)
  32. p.m.RUnlock()
  33. return i
  34. }
  35. // Subscribe adds a new subscriber to the publisher returning the channel.
  36. func (p *Publisher) Subscribe() chan interface{} {
  37. return p.SubscribeTopic(nil)
  38. }
  39. // SubscribeTopic adds a new subscriber that filters messages sent by a topic.
  40. func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
  41. ch := make(chan interface{}, p.buffer)
  42. p.m.Lock()
  43. p.subscribers[ch] = topic
  44. p.m.Unlock()
  45. return ch
  46. }
  47. // Evict removes the specified subscriber from receiving any more messages.
  48. func (p *Publisher) Evict(sub chan interface{}) {
  49. p.m.Lock()
  50. delete(p.subscribers, sub)
  51. close(sub)
  52. p.m.Unlock()
  53. }
  54. // Publish sends the data in v to all subscribers currently registered with the publisher.
  55. func (p *Publisher) Publish(v interface{}) {
  56. p.m.RLock()
  57. if len(p.subscribers) == 0 {
  58. p.m.RUnlock()
  59. return
  60. }
  61. wg := wgPool.Get().(*sync.WaitGroup)
  62. for sub, topic := range p.subscribers {
  63. wg.Add(1)
  64. go p.sendTopic(sub, topic, v, wg)
  65. }
  66. wg.Wait()
  67. wgPool.Put(wg)
  68. p.m.RUnlock()
  69. }
  70. // Close closes the channels to all subscribers registered with the publisher.
  71. func (p *Publisher) Close() {
  72. p.m.Lock()
  73. for sub := range p.subscribers {
  74. delete(p.subscribers, sub)
  75. close(sub)
  76. }
  77. p.m.Unlock()
  78. }
  79. func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
  80. defer wg.Done()
  81. if topic != nil && !topic(v) {
  82. return
  83. }
  84. // send under a select as to not block if the receiver is unavailable
  85. if p.timeout > 0 {
  86. select {
  87. case sub <- v:
  88. case <-time.After(p.timeout):
  89. }
  90. return
  91. }
  92. select {
  93. case sub <- v:
  94. default:
  95. }
  96. }