mq.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package queue
  2. import (
  3. "errors"
  4. "sync"
  5. )
  6. var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
  7. // MessageQueue represents a threadsafe message queue to be used to retrieve or
  8. // write messages to.
  9. type MessageQueue struct {
  10. m *sync.RWMutex
  11. c *sync.Cond
  12. messages []interface{}
  13. closed bool
  14. }
  15. // NewMessageQueue returns a new MessageQueue.
  16. func NewMessageQueue() *MessageQueue {
  17. m := &sync.RWMutex{}
  18. return &MessageQueue{
  19. m: m,
  20. c: sync.NewCond(m),
  21. messages: []interface{}{},
  22. }
  23. }
  24. // Enqueue writes `msg` to the queue.
  25. func (mq *MessageQueue) Enqueue(msg interface{}) error {
  26. mq.m.Lock()
  27. defer mq.m.Unlock()
  28. if mq.closed {
  29. return ErrQueueClosed
  30. }
  31. mq.messages = append(mq.messages, msg)
  32. // Signal a waiter that there is now a value available in the queue.
  33. mq.c.Signal()
  34. return nil
  35. }
  36. // Dequeue will read a value from the queue and remove it. If the queue
  37. // is empty, this will block until the queue is closed or a value gets enqueued.
  38. func (mq *MessageQueue) Dequeue() (interface{}, error) {
  39. mq.m.Lock()
  40. defer mq.m.Unlock()
  41. for !mq.closed && mq.size() == 0 {
  42. mq.c.Wait()
  43. }
  44. // We got woken up, check if it's because the queue got closed.
  45. if mq.closed {
  46. return nil, ErrQueueClosed
  47. }
  48. val := mq.messages[0]
  49. mq.messages[0] = nil
  50. mq.messages = mq.messages[1:]
  51. return val, nil
  52. }
  53. // Size returns the size of the queue.
  54. func (mq *MessageQueue) Size() int {
  55. mq.m.RLock()
  56. defer mq.m.RUnlock()
  57. return mq.size()
  58. }
  59. // Nonexported size check to check if the queue is empty inside already locked functions.
  60. func (mq *MessageQueue) size() int {
  61. return len(mq.messages)
  62. }
  63. // Close closes the queue for future writes or reads. Any attempts to read or write from the
  64. // queue after close will return ErrQueueClosed. This is safe to call multiple times.
  65. func (mq *MessageQueue) Close() {
  66. mq.m.Lock()
  67. defer mq.m.Unlock()
  68. // Already closed, noop
  69. if mq.closed {
  70. return
  71. }
  72. mq.messages = nil
  73. mq.closed = true
  74. // If there's anybody currently waiting on a value from Dequeue, we need to
  75. // broadcast so the read(s) can return ErrQueueClosed.
  76. mq.c.Broadcast()
  77. }