watch.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package memdb
  2. import (
  3. "context"
  4. "time"
  5. )
  6. // WatchSet is a collection of watch channels.
  7. type WatchSet map[<-chan struct{}]struct{}
  8. // NewWatchSet constructs a new watch set.
  9. func NewWatchSet() WatchSet {
  10. return make(map[<-chan struct{}]struct{})
  11. }
  12. // Add appends a watchCh to the WatchSet if non-nil.
  13. func (w WatchSet) Add(watchCh <-chan struct{}) {
  14. if w == nil {
  15. return
  16. }
  17. if _, ok := w[watchCh]; !ok {
  18. w[watchCh] = struct{}{}
  19. }
  20. }
  21. // AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
  22. // softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
  23. // channel. It's expected that the altCh will be the same on many calls to this
  24. // function, so you will exceed the soft limit a little bit if you hit this, but
  25. // not by much.
  26. //
  27. // This is useful if you want to track individual items up to some limit, after
  28. // which you watch a higher-level channel (usually a channel from start start of
  29. // an iterator higher up in the radix tree) that will watch a superset of items.
  30. func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
  31. // This is safe for a nil WatchSet so we don't need to check that here.
  32. if len(w) < softLimit {
  33. w.Add(watchCh)
  34. } else {
  35. w.Add(altCh)
  36. }
  37. }
  38. // Watch is used to wait for either the watch set to trigger or a timeout.
  39. // Returns true on timeout.
  40. func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
  41. if w == nil {
  42. return false
  43. }
  44. // Create a context that gets cancelled when the timeout is triggered
  45. ctx, cancel := context.WithCancel(context.Background())
  46. defer cancel()
  47. go func() {
  48. select {
  49. case <-timeoutCh:
  50. cancel()
  51. case <-ctx.Done():
  52. }
  53. }()
  54. return w.WatchCtx(ctx) == context.Canceled
  55. }
  56. // WatchCtx is used to wait for either the watch set to trigger or for the
  57. // context to be cancelled. Watch with a timeout channel can be mimicked by
  58. // creating a context with a deadline. WatchCtx should be preferred over Watch.
  59. func (w WatchSet) WatchCtx(ctx context.Context) error {
  60. if w == nil {
  61. return nil
  62. }
  63. if n := len(w); n <= aFew {
  64. idx := 0
  65. chunk := make([]<-chan struct{}, aFew)
  66. for watchCh := range w {
  67. chunk[idx] = watchCh
  68. idx++
  69. }
  70. return watchFew(ctx, chunk)
  71. }
  72. return w.watchMany(ctx)
  73. }
  74. // watchMany is used if there are many watchers.
  75. func (w WatchSet) watchMany(ctx context.Context) error {
  76. // Set up a goroutine for each watcher.
  77. triggerCh := make(chan struct{}, 1)
  78. watcher := func(chunk []<-chan struct{}) {
  79. if err := watchFew(ctx, chunk); err == nil {
  80. select {
  81. case triggerCh <- struct{}{}:
  82. default:
  83. }
  84. }
  85. }
  86. // Apportion the watch channels into chunks we can feed into the
  87. // watchFew helper.
  88. idx := 0
  89. chunk := make([]<-chan struct{}, aFew)
  90. for watchCh := range w {
  91. subIdx := idx % aFew
  92. chunk[subIdx] = watchCh
  93. idx++
  94. // Fire off this chunk and start a fresh one.
  95. if idx%aFew == 0 {
  96. go watcher(chunk)
  97. chunk = make([]<-chan struct{}, aFew)
  98. }
  99. }
  100. // Make sure to watch any residual channels in the last chunk.
  101. if idx%aFew != 0 {
  102. go watcher(chunk)
  103. }
  104. // Wait for a channel to trigger or timeout.
  105. select {
  106. case <-triggerCh:
  107. return nil
  108. case <-ctx.Done():
  109. return ctx.Err()
  110. }
  111. }
  112. // WatchCh returns a channel that is used to wait for either the watch set to trigger
  113. // or for the context to be cancelled. WatchCh creates a new goroutine each call, so
  114. // callers may need to cache the returned channel to avoid creating extra goroutines.
  115. func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
  116. // Create the outgoing channel
  117. triggerCh := make(chan error, 1)
  118. // Create a goroutine to collect the error from WatchCtx
  119. go func() {
  120. triggerCh <- w.WatchCtx(ctx)
  121. }()
  122. return triggerCh
  123. }