queue.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package volumequeue
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // baseRetryInterval is the base interval to retry volume operations. each
  7. // subsequent attempt is exponential from this one
  8. const baseRetryInterval = 100 * time.Millisecond
  9. // maxRetryInterval is the maximum amount of time we will wait between retrying
  10. // volume operations.
  11. const maxRetryInterval = 10 * time.Minute
  12. // vqTimerSource is an interface for creating timers for the volumeQueue
  13. type vqTimerSource interface {
  14. // NewTimer takes an attempt number and returns a vqClockTrigger which will
  15. // trigger after a set period based on that attempt number.
  16. NewTimer(attempt uint) vqTimer
  17. }
  18. // vqTimer is an interface representing a timer. However, the timer
  19. // trigger channel, C, is instead wrapped in a Done method, so that in testing,
  20. // the timer can be substituted for a different object.
  21. type vqTimer interface {
  22. Done() <-chan time.Time
  23. Stop() bool
  24. }
  25. // timerSource is an empty struct type which is used to represent the default
  26. // vqTimerSource, which uses time.Timer.
  27. type timerSource struct{}
  28. // NewTimer creates a new timer.
  29. func (timerSource) NewTimer(attempt uint) vqTimer {
  30. var waitFor time.Duration
  31. if attempt == 0 {
  32. waitFor = 0
  33. } else {
  34. // bit-shifting the base retry interval will raise it by 2 to the power
  35. // of attempt. this is an easy way to do an exponent solely with
  36. // integers
  37. waitFor = baseRetryInterval << attempt
  38. if waitFor > maxRetryInterval {
  39. waitFor = maxRetryInterval
  40. }
  41. }
  42. return timer{Timer: time.NewTimer(waitFor)}
  43. }
  44. // timer wraps a time.Timer to provide a Done method.
  45. type timer struct {
  46. *time.Timer
  47. }
  48. // Done returns the timer's C channel, which triggers in response to the timer
  49. // expiring.
  50. func (t timer) Done() <-chan time.Time {
  51. return t.C
  52. }
  53. // VolumeQueue manages the exponential backoff of retrying volumes. it behaves
  54. // somewhat like a priority queue. however, the key difference is that volumes
  55. // which are ready to process or reprocess are read off of an unbuffered
  56. // channel, meaning the order in which ready volumes are processed is at the
  57. // mercy of the golang scheduler. in practice, this does not matter.
  58. type VolumeQueue struct {
  59. sync.Mutex
  60. // next returns the next volumeQueueEntry when it is ready.
  61. next chan *volumeQueueEntry
  62. // outstanding is the set of all pending volumeQueueEntries, mapped by
  63. // volume ID.
  64. outstanding map[string]*volumeQueueEntry
  65. // stopChan stops the volumeQueue and cancels all entries.
  66. stopChan chan struct{}
  67. // timerSource is an object which is used to create the timer for a
  68. // volumeQueueEntry. it exists so that in testing, the timer can be
  69. // substituted for an object that we control.
  70. timerSource vqTimerSource
  71. }
  72. // volumeQueueEntry represents one entry in the volumeQueue
  73. type volumeQueueEntry struct {
  74. // id is the id of the volume this entry represents. we only need the ID,
  75. // because the CSI manager will look up the latest revision of the volume
  76. // before doing any work on it.
  77. id string
  78. // attempt is the current retry attempt of the entry.
  79. attempt uint
  80. // cancel is a function which is called to abort the retry attempt.
  81. cancel func()
  82. }
  83. // NewVolumeQueue returns a new VolumeQueue with the default timerSource.
  84. func NewVolumeQueue() *VolumeQueue {
  85. return &VolumeQueue{
  86. next: make(chan *volumeQueueEntry),
  87. outstanding: make(map[string]*volumeQueueEntry),
  88. stopChan: make(chan struct{}),
  89. timerSource: timerSource{},
  90. }
  91. }
  92. // Enqueue adds an entry to the VolumeQueue with the specified retry attempt.
  93. // if an entry for the specified id already exists, enqueue will remove it and
  94. // create a new entry.
  95. func (vq *VolumeQueue) Enqueue(id string, attempt uint) {
  96. // we must lock the volumeQueue when we add entries, because we will be
  97. // accessing the outstanding map
  98. vq.Lock()
  99. defer vq.Unlock()
  100. if entry, ok := vq.outstanding[id]; ok {
  101. entry.cancel()
  102. delete(vq.outstanding, id)
  103. }
  104. cancelChan := make(chan struct{})
  105. v := &volumeQueueEntry{
  106. id: id,
  107. attempt: attempt,
  108. cancel: func() {
  109. close(cancelChan)
  110. },
  111. }
  112. t := vq.timerSource.NewTimer(attempt)
  113. // this goroutine is the meat of the volumeQueue. when the timer triggers,
  114. // the volume queue entry is written out to the next channel.
  115. //
  116. // the nature of the select statement, and of goroutines and of
  117. // ansynchronous operations means that this is not actually strictly
  118. // ordered. if several entries are ready, then the one that actually gets
  119. // dequeued is at the mercy of the golang scheduler.
  120. //
  121. // however, the flip side of this is that canceling an entry truly cancels
  122. // it. because we're blocking on a write attempt, if we cancel, we don't
  123. // do that write attempt, and there's no need to try to remove from the
  124. // queue a ready-but-now-canceled entry before it is processed.
  125. go func() {
  126. select {
  127. case <-t.Done():
  128. // once the timer fires, we will try to write this entry to the
  129. // next channel. however, because next is unbuffered, if we ended
  130. // up in a situation where no read occurred, we would be
  131. // deadlocked. to avoid this, we select on both a vq.next write and
  132. // on a read from cancelChan, which allows us to abort our write
  133. // attempt.
  134. select {
  135. case vq.next <- v:
  136. case <-cancelChan:
  137. }
  138. case <-cancelChan:
  139. // the documentation for timer recommends draining the channel like
  140. // this.
  141. if !t.Stop() {
  142. <-t.Done()
  143. }
  144. }
  145. }()
  146. vq.outstanding[id] = v
  147. }
  148. // Wait returns the ID and attempt number of the next Volume ready to process.
  149. // If no volume is ready, wait blocks until one is ready. if the volumeQueue
  150. // is stopped, wait returns "", 0
  151. func (vq *VolumeQueue) Wait() (string, uint) {
  152. select {
  153. case v := <-vq.next:
  154. vq.Lock()
  155. defer vq.Unlock()
  156. // we need to be certain that this entry is the same entry that we
  157. // read, because otherwise there may be a race.
  158. //
  159. // it would be possible for the read from next to succeed, but before
  160. // the lock is acquired, a new attempt is enqueued. enqueuing the new
  161. // attempt deletes the old entry before replacing it with the new entry
  162. // and releasing the lock. then, this routine may acquire the lock, and
  163. // delete a new entry.
  164. //
  165. // in practice, it is unclear if this race could happen or would matter
  166. // if it did, but always better safe than sorry.
  167. e, ok := vq.outstanding[v.id]
  168. if ok && e == v {
  169. delete(vq.outstanding, v.id)
  170. }
  171. return v.id, v.attempt
  172. case <-vq.stopChan:
  173. // if the volumeQueue is stopped, then there may be no more writes, so
  174. // we should return an empty result from wait
  175. return "", 0
  176. }
  177. }
  178. // Outstanding returns the number of items outstanding in this queue
  179. func (vq *VolumeQueue) Outstanding() int {
  180. return len(vq.outstanding)
  181. }
  182. // Stop stops the volumeQueue and cancels all outstanding entries. stop may
  183. // only be called once.
  184. func (vq *VolumeQueue) Stop() {
  185. vq.Lock()
  186. defer vq.Unlock()
  187. close(vq.stopChan)
  188. for _, entry := range vq.outstanding {
  189. entry.cancel()
  190. }
  191. return
  192. }