retry.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package events
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. )
  10. // RetryingSink retries the write until success or an ErrSinkClosed is
  11. // returned. Underlying sink must have p > 0 of succeeding or the sink will
  12. // block. Retry is configured with a RetryStrategy. Concurrent calls to a
  13. // retrying sink are serialized through the sink, meaning that if one is
  14. // in-flight, another will not proceed.
  15. type RetryingSink struct {
  16. sink Sink
  17. strategy RetryStrategy
  18. closed chan struct{}
  19. once sync.Once
  20. }
  21. // NewRetryingSink returns a sink that will retry writes to a sink, backing
  22. // off on failure. Parameters threshold and backoff adjust the behavior of the
  23. // circuit breaker.
  24. func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
  25. rs := &RetryingSink{
  26. sink: sink,
  27. strategy: strategy,
  28. closed: make(chan struct{}),
  29. }
  30. return rs
  31. }
  32. // Write attempts to flush the events to the downstream sink until it succeeds
  33. // or the sink is closed.
  34. func (rs *RetryingSink) Write(event Event) error {
  35. logger := logrus.WithField("event", event)
  36. retry:
  37. select {
  38. case <-rs.closed:
  39. return ErrSinkClosed
  40. default:
  41. }
  42. if backoff := rs.strategy.Proceed(event); backoff > 0 {
  43. select {
  44. case <-time.After(backoff):
  45. // TODO(stevvooe): This branch holds up the next try. Before, we
  46. // would simply break to the "retry" label and then possibly wait
  47. // again. However, this requires all retry strategies to have a
  48. // large probability of probing the sync for success, rather than
  49. // just backing off and sending the request.
  50. case <-rs.closed:
  51. return ErrSinkClosed
  52. }
  53. }
  54. if err := rs.sink.Write(event); err != nil {
  55. if err == ErrSinkClosed {
  56. // terminal!
  57. return err
  58. }
  59. logger := logger.WithError(err) // shadow!!
  60. if rs.strategy.Failure(event, err) {
  61. logger.Errorf("retryingsink: dropped event")
  62. return nil
  63. }
  64. logger.Errorf("retryingsink: error writing event, retrying")
  65. goto retry
  66. }
  67. rs.strategy.Success(event)
  68. return nil
  69. }
  70. // Close closes the sink and the underlying sink.
  71. func (rs *RetryingSink) Close() error {
  72. rs.once.Do(func() {
  73. close(rs.closed)
  74. })
  75. return nil
  76. }
  77. func (rs RetryingSink) String() string {
  78. // Serialize a copy of the RetryingSink without the sync.Once, to avoid
  79. // a data race.
  80. rs2 := map[string]interface{}{
  81. "sink": rs.sink,
  82. "strategy": rs.strategy,
  83. "closed": rs.closed,
  84. }
  85. return fmt.Sprint(rs2)
  86. }
  87. // RetryStrategy defines a strategy for retrying event sink writes.
  88. //
  89. // All methods should be goroutine safe.
  90. type RetryStrategy interface {
  91. // Proceed is called before every event send. If proceed returns a
  92. // positive, non-zero integer, the retryer will back off by the provided
  93. // duration.
  94. //
  95. // An event is provided, by may be ignored.
  96. Proceed(event Event) time.Duration
  97. // Failure reports a failure to the strategy. If this method returns true,
  98. // the event should be dropped.
  99. Failure(event Event, err error) bool
  100. // Success should be called when an event is sent successfully.
  101. Success(event Event)
  102. }
  103. // Breaker implements a circuit breaker retry strategy.
  104. //
  105. // The current implementation never drops events.
  106. type Breaker struct {
  107. threshold int
  108. recent int
  109. last time.Time
  110. backoff time.Duration // time after which we retry after failure.
  111. mu sync.Mutex
  112. }
  113. var _ RetryStrategy = &Breaker{}
  114. // NewBreaker returns a breaker that will backoff after the threshold has been
  115. // tripped. A Breaker is thread safe and may be shared by many goroutines.
  116. func NewBreaker(threshold int, backoff time.Duration) *Breaker {
  117. return &Breaker{
  118. threshold: threshold,
  119. backoff: backoff,
  120. }
  121. }
  122. // Proceed checks the failures against the threshold.
  123. func (b *Breaker) Proceed(event Event) time.Duration {
  124. b.mu.Lock()
  125. defer b.mu.Unlock()
  126. if b.recent < b.threshold {
  127. return 0
  128. }
  129. return b.last.Add(b.backoff).Sub(time.Now())
  130. }
  131. // Success resets the breaker.
  132. func (b *Breaker) Success(event Event) {
  133. b.mu.Lock()
  134. defer b.mu.Unlock()
  135. b.recent = 0
  136. b.last = time.Time{}
  137. }
  138. // Failure records the failure and latest failure time.
  139. func (b *Breaker) Failure(event Event, err error) bool {
  140. b.mu.Lock()
  141. defer b.mu.Unlock()
  142. b.recent++
  143. b.last = time.Now().UTC()
  144. return false // never drop events.
  145. }
  146. var (
  147. // DefaultExponentialBackoffConfig provides a default configuration for
  148. // exponential backoff.
  149. DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
  150. Base: time.Second,
  151. Factor: time.Second,
  152. Max: 20 * time.Second,
  153. }
  154. )
  155. // ExponentialBackoffConfig configures backoff parameters.
  156. //
  157. // Note that these parameters operate on the upper bound for choosing a random
  158. // value. For example, at Base=1s, a random value in [0,1s) will be chosen for
  159. // the backoff value.
  160. type ExponentialBackoffConfig struct {
  161. // Base is the minimum bound for backing off after failure.
  162. Base time.Duration
  163. // Factor sets the amount of time by which the backoff grows with each
  164. // failure.
  165. Factor time.Duration
  166. // Max is the absolute maxiumum bound for a single backoff.
  167. Max time.Duration
  168. }
  169. // ExponentialBackoff implements random backoff with exponentially increasing
  170. // bounds as the number consecutive failures increase.
  171. type ExponentialBackoff struct {
  172. config ExponentialBackoffConfig
  173. failures uint64 // consecutive failure counter.
  174. }
  175. // NewExponentialBackoff returns an exponential backoff strategy with the
  176. // desired config. If config is nil, the default is returned.
  177. func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
  178. return &ExponentialBackoff{
  179. config: config,
  180. }
  181. }
  182. // Proceed returns the next randomly bound exponential backoff time.
  183. func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
  184. return b.backoff(atomic.LoadUint64(&b.failures))
  185. }
  186. // Success resets the failures counter.
  187. func (b *ExponentialBackoff) Success(event Event) {
  188. atomic.StoreUint64(&b.failures, 0)
  189. }
  190. // Failure increments the failure counter.
  191. func (b *ExponentialBackoff) Failure(event Event, err error) bool {
  192. atomic.AddUint64(&b.failures, 1)
  193. return false
  194. }
  195. // backoff calculates the amount of time to wait based on the number of
  196. // consecutive failures.
  197. func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
  198. if failures <= 0 {
  199. // proceed normally when there are no failures.
  200. return 0
  201. }
  202. factor := b.config.Factor
  203. if factor <= 0 {
  204. factor = DefaultExponentialBackoffConfig.Factor
  205. }
  206. backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
  207. max := b.config.Max
  208. if max <= 0 {
  209. max = DefaultExponentialBackoffConfig.Max
  210. }
  211. if backoff > max || backoff < 0 {
  212. backoff = max
  213. }
  214. // Choose a uniformly distributed value from [0, backoff).
  215. return time.Duration(rand.Int63n(int64(backoff)))
  216. }