123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package memdb
- import (
- "context"
- "time"
- )
- // WatchSet is a collection of watch channels.
- type WatchSet map[<-chan struct{}]struct{}
- // NewWatchSet constructs a new watch set.
- func NewWatchSet() WatchSet {
- return make(map[<-chan struct{}]struct{})
- }
- // Add appends a watchCh to the WatchSet if non-nil.
- func (w WatchSet) Add(watchCh <-chan struct{}) {
- if w == nil {
- return
- }
- if _, ok := w[watchCh]; !ok {
- w[watchCh] = struct{}{}
- }
- }
- // AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
- // softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
- // channel. It's expected that the altCh will be the same on many calls to this
- // function, so you will exceed the soft limit a little bit if you hit this, but
- // not by much.
- //
- // This is useful if you want to track individual items up to some limit, after
- // which you watch a higher-level channel (usually a channel from start start of
- // an iterator higher up in the radix tree) that will watch a superset of items.
- func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
- // This is safe for a nil WatchSet so we don't need to check that here.
- if len(w) < softLimit {
- w.Add(watchCh)
- } else {
- w.Add(altCh)
- }
- }
- // Watch is used to wait for either the watch set to trigger or a timeout.
- // Returns true on timeout.
- func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
- if w == nil {
- return false
- }
- // Create a context that gets cancelled when the timeout is triggered
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
- select {
- case <-timeoutCh:
- cancel()
- case <-ctx.Done():
- }
- }()
- return w.WatchCtx(ctx) == context.Canceled
- }
- // WatchCtx is used to wait for either the watch set to trigger or for the
- // context to be cancelled. Watch with a timeout channel can be mimicked by
- // creating a context with a deadline. WatchCtx should be preferred over Watch.
- func (w WatchSet) WatchCtx(ctx context.Context) error {
- if w == nil {
- return nil
- }
- if n := len(w); n <= aFew {
- idx := 0
- chunk := make([]<-chan struct{}, aFew)
- for watchCh := range w {
- chunk[idx] = watchCh
- idx++
- }
- return watchFew(ctx, chunk)
- }
- return w.watchMany(ctx)
- }
- // watchMany is used if there are many watchers.
- func (w WatchSet) watchMany(ctx context.Context) error {
- // Set up a goroutine for each watcher.
- triggerCh := make(chan struct{}, 1)
- watcher := func(chunk []<-chan struct{}) {
- if err := watchFew(ctx, chunk); err == nil {
- select {
- case triggerCh <- struct{}{}:
- default:
- }
- }
- }
- // Apportion the watch channels into chunks we can feed into the
- // watchFew helper.
- idx := 0
- chunk := make([]<-chan struct{}, aFew)
- for watchCh := range w {
- subIdx := idx % aFew
- chunk[subIdx] = watchCh
- idx++
- // Fire off this chunk and start a fresh one.
- if idx%aFew == 0 {
- go watcher(chunk)
- chunk = make([]<-chan struct{}, aFew)
- }
- }
- // Make sure to watch any residual channels in the last chunk.
- if idx%aFew != 0 {
- go watcher(chunk)
- }
- // Wait for a channel to trigger or timeout.
- select {
- case <-triggerCh:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // WatchCh returns a channel that is used to wait for either the watch set to trigger
- // or for the context to be cancelled. WatchCh creates a new goroutine each call, so
- // callers may need to cache the returned channel to avoid creating extra goroutines.
- func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
- // Create the outgoing channel
- triggerCh := make(chan error, 1)
- // Create a goroutine to collect the error from WatchCtx
- go func() {
- triggerCh <- w.WatchCtx(ctx)
- }()
- return triggerCh
- }
|