wait.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package raft
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type waitItem struct {
  7. // channel to wait up the waiter
  8. ch chan interface{}
  9. // callback which is called synchronously when the wait is triggered
  10. cb func()
  11. // callback which is called to cancel a waiter
  12. cancel func()
  13. }
  14. type wait struct {
  15. l sync.Mutex
  16. m map[uint64]waitItem
  17. }
  18. func newWait() *wait {
  19. return &wait{m: make(map[uint64]waitItem)}
  20. }
  21. func (w *wait) register(id uint64, cb func(), cancel func()) <-chan interface{} {
  22. w.l.Lock()
  23. defer w.l.Unlock()
  24. _, ok := w.m[id]
  25. if !ok {
  26. ch := make(chan interface{}, 1)
  27. w.m[id] = waitItem{ch: ch, cb: cb, cancel: cancel}
  28. return ch
  29. }
  30. panic(fmt.Sprintf("duplicate id %x", id))
  31. }
  32. func (w *wait) trigger(id uint64, x interface{}) bool {
  33. w.l.Lock()
  34. waitItem, ok := w.m[id]
  35. delete(w.m, id)
  36. w.l.Unlock()
  37. if ok {
  38. if waitItem.cb != nil {
  39. waitItem.cb()
  40. }
  41. waitItem.ch <- x
  42. return true
  43. }
  44. return false
  45. }
  46. func (w *wait) cancel(id uint64) {
  47. w.l.Lock()
  48. waitItem, ok := w.m[id]
  49. delete(w.m, id)
  50. w.l.Unlock()
  51. if ok {
  52. if waitItem.cancel != nil {
  53. waitItem.cancel()
  54. }
  55. close(waitItem.ch)
  56. }
  57. }
  58. func (w *wait) cancelAll() {
  59. w.l.Lock()
  60. defer w.l.Unlock()
  61. for id, waitItem := range w.m {
  62. delete(w.m, id)
  63. if waitItem.cancel != nil {
  64. waitItem.cancel()
  65. }
  66. close(waitItem.ch)
  67. }
  68. }