jsonmessagepublisher.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package utils
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. func NewJSONMessagePublisher() *JSONMessagePublisher {
  7. return &JSONMessagePublisher{}
  8. }
  9. type JSONMessageListener chan<- JSONMessage
  10. type JSONMessagePublisher struct {
  11. m sync.RWMutex
  12. subscribers []JSONMessageListener
  13. }
  14. func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
  15. p.m.Lock()
  16. p.subscribers = append(p.subscribers, l)
  17. p.m.Unlock()
  18. }
  19. func (p *JSONMessagePublisher) SubscribersCount() int {
  20. p.m.RLock()
  21. count := len(p.subscribers)
  22. p.m.RUnlock()
  23. return count
  24. }
  25. // Unsubscribe closes and removes the specified listener from the list of
  26. // previously registed ones.
  27. // It returns a boolean value indicating if the listener was successfully
  28. // found, closed and unregistered.
  29. func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
  30. p.m.Lock()
  31. defer p.m.Unlock()
  32. for i, subscriber := range p.subscribers {
  33. if subscriber == l {
  34. close(l)
  35. p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
  36. return true
  37. }
  38. }
  39. return false
  40. }
  41. func (p *JSONMessagePublisher) Publish(m JSONMessage) {
  42. p.m.RLock()
  43. for _, subscriber := range p.subscribers {
  44. // We give each subscriber a 100ms time window to receive the event,
  45. // after which we move to the next.
  46. select {
  47. case subscriber <- m:
  48. case <-time.After(100 * time.Millisecond):
  49. }
  50. }
  51. p.m.RUnlock()
  52. }