singleflight.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package singleflight
  5. import (
  6. "bytes"
  7. "errors"
  8. "fmt"
  9. "runtime"
  10. "runtime/debug"
  11. "sync"
  12. )
  13. // errGoexit indicates the runtime.Goexit was called in
  14. // the user given function.
  15. var errGoexit = errors.New("runtime.Goexit was called")
  16. // A panicError is an arbitrary value recovered from a panic
  17. // with the stack trace during the execution of given function.
  18. type panicError struct {
  19. value interface{}
  20. stack []byte
  21. }
  22. // Error implements error interface.
  23. func (p *panicError) Error() string {
  24. return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
  25. }
  26. func newPanicError(v interface{}) error {
  27. stack := debug.Stack()
  28. // The first line of the stack trace is of the form "goroutine N [status]:"
  29. // but by the time the panic reaches Do the goroutine may no longer exist
  30. // and its status will have changed. Trim out the misleading line.
  31. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  32. stack = stack[line+1:]
  33. }
  34. return &panicError{value: v, stack: stack}
  35. }
  36. // call is an in-flight or completed singleflight.Do call
  37. type call struct {
  38. wg sync.WaitGroup
  39. // These fields are written once before the WaitGroup is done
  40. // and are only read after the WaitGroup is done.
  41. val interface{}
  42. err error
  43. // forgotten indicates whether Forget was called with this call's key
  44. // while the call was still in flight.
  45. forgotten bool
  46. // These fields are read and written with the singleflight
  47. // mutex held before the WaitGroup is done, and are read but
  48. // not written after the WaitGroup is done.
  49. dups int
  50. chans []chan<- Result
  51. }
  52. // Group represents a class of work and forms a namespace in
  53. // which units of work can be executed with duplicate suppression.
  54. type Group struct {
  55. mu sync.Mutex // protects m
  56. m map[string]*call // lazily initialized
  57. }
  58. // Result holds the results of Do, so they can be passed
  59. // on a channel.
  60. type Result struct {
  61. Val interface{}
  62. Err error
  63. Shared bool
  64. }
  65. // Do executes and returns the results of the given function, making
  66. // sure that only one execution is in-flight for a given key at a
  67. // time. If a duplicate comes in, the duplicate caller waits for the
  68. // original to complete and receives the same results.
  69. // The return value shared indicates whether v was given to multiple callers.
  70. func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  71. g.mu.Lock()
  72. if g.m == nil {
  73. g.m = make(map[string]*call)
  74. }
  75. if c, ok := g.m[key]; ok {
  76. c.dups++
  77. g.mu.Unlock()
  78. c.wg.Wait()
  79. if e, ok := c.err.(*panicError); ok {
  80. panic(e)
  81. } else if c.err == errGoexit {
  82. runtime.Goexit()
  83. }
  84. return c.val, c.err, true
  85. }
  86. c := new(call)
  87. c.wg.Add(1)
  88. g.m[key] = c
  89. g.mu.Unlock()
  90. g.doCall(c, key, fn)
  91. return c.val, c.err, c.dups > 0
  92. }
  93. // DoChan is like Do but returns a channel that will receive the
  94. // results when they are ready.
  95. //
  96. // The returned channel will not be closed.
  97. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
  98. ch := make(chan Result, 1)
  99. g.mu.Lock()
  100. if g.m == nil {
  101. g.m = make(map[string]*call)
  102. }
  103. if c, ok := g.m[key]; ok {
  104. c.dups++
  105. c.chans = append(c.chans, ch)
  106. g.mu.Unlock()
  107. return ch
  108. }
  109. c := &call{chans: []chan<- Result{ch}}
  110. c.wg.Add(1)
  111. g.m[key] = c
  112. g.mu.Unlock()
  113. go g.doCall(c, key, fn)
  114. return ch
  115. }
  116. // doCall handles the single call for a key.
  117. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
  118. normalReturn := false
  119. recovered := false
  120. // use double-defer to distinguish panic from runtime.Goexit,
  121. // more details see https://golang.org/cl/134395
  122. defer func() {
  123. // the given function invoked runtime.Goexit
  124. if !normalReturn && !recovered {
  125. c.err = errGoexit
  126. }
  127. c.wg.Done()
  128. g.mu.Lock()
  129. defer g.mu.Unlock()
  130. if !c.forgotten {
  131. delete(g.m, key)
  132. }
  133. if e, ok := c.err.(*panicError); ok {
  134. // In order to prevent the waiting channels from being blocked forever,
  135. // needs to ensure that this panic cannot be recovered.
  136. if len(c.chans) > 0 {
  137. go panic(e)
  138. select {} // Keep this goroutine around so that it will appear in the crash dump.
  139. } else {
  140. panic(e)
  141. }
  142. } else if c.err == errGoexit {
  143. // Already in the process of goexit, no need to call again
  144. } else {
  145. // Normal return
  146. for _, ch := range c.chans {
  147. ch <- Result{c.val, c.err, c.dups > 0}
  148. }
  149. }
  150. }()
  151. func() {
  152. defer func() {
  153. if !normalReturn {
  154. // Ideally, we would wait to take a stack trace until we've determined
  155. // whether this is a panic or a runtime.Goexit.
  156. //
  157. // Unfortunately, the only way we can distinguish the two is to see
  158. // whether the recover stopped the goroutine from terminating, and by
  159. // the time we know that, the part of the stack trace relevant to the
  160. // panic has been discarded.
  161. if r := recover(); r != nil {
  162. c.err = newPanicError(r)
  163. }
  164. }
  165. }()
  166. c.val, c.err = fn()
  167. normalReturn = true
  168. }()
  169. if !normalReturn {
  170. recovered = true
  171. }
  172. }
  173. // Forget tells the singleflight to forget about a key. Future calls
  174. // to Do for this key will call the function rather than waiting for
  175. // an earlier call to complete.
  176. func (g *Group) Forget(key string) {
  177. g.mu.Lock()
  178. if c, ok := g.m[key]; ok {
  179. c.forgotten = true
  180. }
  181. delete(g.m, key)
  182. g.mu.Unlock()
  183. }