12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package utils
- import (
- "sync"
- "time"
- )
- func NewJSONMessagePublisher() *JSONMessagePublisher {
- return &JSONMessagePublisher{}
- }
- type JSONMessageListener chan<- JSONMessage
- type JSONMessagePublisher struct {
- m sync.RWMutex
- subscribers []JSONMessageListener
- }
- func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
- p.m.Lock()
- p.subscribers = append(p.subscribers, l)
- p.m.Unlock()
- }
- func (p *JSONMessagePublisher) SubscribersCount() int {
- p.m.RLock()
- count := len(p.subscribers)
- p.m.RUnlock()
- return count
- }
- // Unsubscribe closes and removes the specified listener from the list of
- // previously registed ones.
- // It returns a boolean value indicating if the listener was successfully
- // found, closed and unregistered.
- func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
- p.m.Lock()
- defer p.m.Unlock()
- for i, subscriber := range p.subscribers {
- if subscriber == l {
- close(l)
- p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
- return true
- }
- }
- return false
- }
- func (p *JSONMessagePublisher) Publish(m JSONMessage) {
- p.m.RLock()
- for _, subscriber := range p.subscribers {
- // We give each subscriber a 100ms time window to receive the event,
- // after which we move to the next.
- select {
- case subscriber <- m:
- case <-time.After(100 * time.Millisecond):
- }
- }
- p.m.RUnlock()
- }
|