sequential_handler.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package dbus
  2. import (
  3. "sync"
  4. )
  5. // NewSequentialSignalHandler returns an instance of a new
  6. // signal handler that guarantees sequential processing of signals. It is a
  7. // guarantee of this signal handler that signals will be written to
  8. // channels in the order they are received on the DBus connection.
  9. func NewSequentialSignalHandler() SignalHandler {
  10. return &sequentialSignalHandler{}
  11. }
  12. type sequentialSignalHandler struct {
  13. mu sync.RWMutex
  14. closed bool
  15. signals []*sequentialSignalChannelData
  16. }
  17. func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
  18. sh.mu.RLock()
  19. defer sh.mu.RUnlock()
  20. if sh.closed {
  21. return
  22. }
  23. for _, scd := range sh.signals {
  24. scd.deliver(signal)
  25. }
  26. }
  27. func (sh *sequentialSignalHandler) Terminate() {
  28. sh.mu.Lock()
  29. defer sh.mu.Unlock()
  30. if sh.closed {
  31. return
  32. }
  33. for _, scd := range sh.signals {
  34. scd.close()
  35. close(scd.ch)
  36. }
  37. sh.closed = true
  38. sh.signals = nil
  39. }
  40. func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
  41. sh.mu.Lock()
  42. defer sh.mu.Unlock()
  43. if sh.closed {
  44. return
  45. }
  46. sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
  47. }
  48. func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
  49. sh.mu.Lock()
  50. defer sh.mu.Unlock()
  51. if sh.closed {
  52. return
  53. }
  54. for i := len(sh.signals) - 1; i >= 0; i-- {
  55. if ch == sh.signals[i].ch {
  56. sh.signals[i].close()
  57. copy(sh.signals[i:], sh.signals[i+1:])
  58. sh.signals[len(sh.signals)-1] = nil
  59. sh.signals = sh.signals[:len(sh.signals)-1]
  60. }
  61. }
  62. }
  63. type sequentialSignalChannelData struct {
  64. ch chan<- *Signal
  65. in chan *Signal
  66. done chan struct{}
  67. }
  68. func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
  69. scd := &sequentialSignalChannelData{
  70. ch: ch,
  71. in: make(chan *Signal),
  72. done: make(chan struct{}),
  73. }
  74. go scd.bufferSignals()
  75. return scd
  76. }
  77. func (scd *sequentialSignalChannelData) bufferSignals() {
  78. defer close(scd.done)
  79. // Ensure that signals are delivered to scd.ch in the same
  80. // order they are received from scd.in.
  81. var queue []*Signal
  82. for {
  83. if len(queue) == 0 {
  84. signal, ok := <- scd.in
  85. if !ok {
  86. return
  87. }
  88. queue = append(queue, signal)
  89. }
  90. select {
  91. case scd.ch <- queue[0]:
  92. copy(queue, queue[1:])
  93. queue[len(queue)-1] = nil
  94. queue = queue[:len(queue)-1]
  95. case signal, ok := <-scd.in:
  96. if !ok {
  97. return
  98. }
  99. queue = append(queue, signal)
  100. }
  101. }
  102. }
  103. func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
  104. scd.in <- signal
  105. }
  106. func (scd *sequentialSignalChannelData) close() {
  107. close(scd.in)
  108. // Ensure that bufferSignals() has exited and won't attempt
  109. // any future sends on scd.ch
  110. <-scd.done
  111. }