123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- package volumequeue
- import (
- "sync"
- "time"
- )
- // baseRetryInterval is the base interval to retry volume operations. each
- // subsequent attempt is exponential from this one
- const baseRetryInterval = 100 * time.Millisecond
- // maxRetryInterval is the maximum amount of time we will wait between retrying
- // volume operations.
- const maxRetryInterval = 10 * time.Minute
- // vqTimerSource is an interface for creating timers for the volumeQueue
- type vqTimerSource interface {
- // NewTimer takes an attempt number and returns a vqClockTrigger which will
- // trigger after a set period based on that attempt number.
- NewTimer(attempt uint) vqTimer
- }
- // vqTimer is an interface representing a timer. However, the timer
- // trigger channel, C, is instead wrapped in a Done method, so that in testing,
- // the timer can be substituted for a different object.
- type vqTimer interface {
- Done() <-chan time.Time
- Stop() bool
- }
- // timerSource is an empty struct type which is used to represent the default
- // vqTimerSource, which uses time.Timer.
- type timerSource struct{}
- // NewTimer creates a new timer.
- func (timerSource) NewTimer(attempt uint) vqTimer {
- var waitFor time.Duration
- if attempt == 0 {
- waitFor = 0
- } else {
- // bit-shifting the base retry interval will raise it by 2 to the power
- // of attempt. this is an easy way to do an exponent solely with
- // integers
- waitFor = baseRetryInterval << attempt
- if waitFor > maxRetryInterval {
- waitFor = maxRetryInterval
- }
- }
- return timer{Timer: time.NewTimer(waitFor)}
- }
- // timer wraps a time.Timer to provide a Done method.
- type timer struct {
- *time.Timer
- }
- // Done returns the timer's C channel, which triggers in response to the timer
- // expiring.
- func (t timer) Done() <-chan time.Time {
- return t.C
- }
- // VolumeQueue manages the exponential backoff of retrying volumes. it behaves
- // somewhat like a priority queue. however, the key difference is that volumes
- // which are ready to process or reprocess are read off of an unbuffered
- // channel, meaning the order in which ready volumes are processed is at the
- // mercy of the golang scheduler. in practice, this does not matter.
- type VolumeQueue struct {
- sync.Mutex
- // next returns the next volumeQueueEntry when it is ready.
- next chan *volumeQueueEntry
- // outstanding is the set of all pending volumeQueueEntries, mapped by
- // volume ID.
- outstanding map[string]*volumeQueueEntry
- // stopChan stops the volumeQueue and cancels all entries.
- stopChan chan struct{}
- // timerSource is an object which is used to create the timer for a
- // volumeQueueEntry. it exists so that in testing, the timer can be
- // substituted for an object that we control.
- timerSource vqTimerSource
- }
- // volumeQueueEntry represents one entry in the volumeQueue
- type volumeQueueEntry struct {
- // id is the id of the volume this entry represents. we only need the ID,
- // because the CSI manager will look up the latest revision of the volume
- // before doing any work on it.
- id string
- // attempt is the current retry attempt of the entry.
- attempt uint
- // cancel is a function which is called to abort the retry attempt.
- cancel func()
- }
- // NewVolumeQueue returns a new VolumeQueue with the default timerSource.
- func NewVolumeQueue() *VolumeQueue {
- return &VolumeQueue{
- next: make(chan *volumeQueueEntry),
- outstanding: make(map[string]*volumeQueueEntry),
- stopChan: make(chan struct{}),
- timerSource: timerSource{},
- }
- }
- // Enqueue adds an entry to the VolumeQueue with the specified retry attempt.
- // if an entry for the specified id already exists, enqueue will remove it and
- // create a new entry.
- func (vq *VolumeQueue) Enqueue(id string, attempt uint) {
- // we must lock the volumeQueue when we add entries, because we will be
- // accessing the outstanding map
- vq.Lock()
- defer vq.Unlock()
- if entry, ok := vq.outstanding[id]; ok {
- entry.cancel()
- delete(vq.outstanding, id)
- }
- cancelChan := make(chan struct{})
- v := &volumeQueueEntry{
- id: id,
- attempt: attempt,
- cancel: func() {
- close(cancelChan)
- },
- }
- t := vq.timerSource.NewTimer(attempt)
- // this goroutine is the meat of the volumeQueue. when the timer triggers,
- // the volume queue entry is written out to the next channel.
- //
- // the nature of the select statement, and of goroutines and of
- // ansynchronous operations means that this is not actually strictly
- // ordered. if several entries are ready, then the one that actually gets
- // dequeued is at the mercy of the golang scheduler.
- //
- // however, the flip side of this is that canceling an entry truly cancels
- // it. because we're blocking on a write attempt, if we cancel, we don't
- // do that write attempt, and there's no need to try to remove from the
- // queue a ready-but-now-canceled entry before it is processed.
- go func() {
- select {
- case <-t.Done():
- // once the timer fires, we will try to write this entry to the
- // next channel. however, because next is unbuffered, if we ended
- // up in a situation where no read occurred, we would be
- // deadlocked. to avoid this, we select on both a vq.next write and
- // on a read from cancelChan, which allows us to abort our write
- // attempt.
- select {
- case vq.next <- v:
- case <-cancelChan:
- }
- case <-cancelChan:
- // the documentation for timer recommends draining the channel like
- // this.
- if !t.Stop() {
- <-t.Done()
- }
- }
- }()
- vq.outstanding[id] = v
- }
- // Wait returns the ID and attempt number of the next Volume ready to process.
- // If no volume is ready, wait blocks until one is ready. if the volumeQueue
- // is stopped, wait returns "", 0
- func (vq *VolumeQueue) Wait() (string, uint) {
- select {
- case v := <-vq.next:
- vq.Lock()
- defer vq.Unlock()
- // we need to be certain that this entry is the same entry that we
- // read, because otherwise there may be a race.
- //
- // it would be possible for the read from next to succeed, but before
- // the lock is acquired, a new attempt is enqueued. enqueuing the new
- // attempt deletes the old entry before replacing it with the new entry
- // and releasing the lock. then, this routine may acquire the lock, and
- // delete a new entry.
- //
- // in practice, it is unclear if this race could happen or would matter
- // if it did, but always better safe than sorry.
- e, ok := vq.outstanding[v.id]
- if ok && e == v {
- delete(vq.outstanding, v.id)
- }
- return v.id, v.attempt
- case <-vq.stopChan:
- // if the volumeQueue is stopped, then there may be no more writes, so
- // we should return an empty result from wait
- return "", 0
- }
- }
- // Outstanding returns the number of items outstanding in this queue
- func (vq *VolumeQueue) Outstanding() int {
- return len(vq.outstanding)
- }
- // Stop stops the volumeQueue and cancels all outstanding entries. stop may
- // only be called once.
- func (vq *VolumeQueue) Stop() {
- vq.Lock()
- defer vq.Unlock()
- close(vq.stopChan)
- for _, entry := range vq.outstanding {
- entry.cancel()
- }
- return
- }
|