bundler.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. // Copyright 2016 Google LLC.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package bundler supports bundling (batching) of items. Bundling amortizes an
  5. // action with fixed costs over multiple items. For example, if an API provides
  6. // an RPC that accepts a list of items as input, but clients would prefer
  7. // adding items one at a time, then a Bundler can accept individual items from
  8. // the client and bundle many of them into a single RPC.
  9. //
  10. // This package is experimental and subject to change without notice.
  11. package bundler
  12. import (
  13. "context"
  14. "errors"
  15. "reflect"
  16. "sync"
  17. "time"
  18. "golang.org/x/sync/semaphore"
  19. )
  20. type mode int
  21. const (
  22. DefaultDelayThreshold = time.Second
  23. DefaultBundleCountThreshold = 10
  24. DefaultBundleByteThreshold = 1e6 // 1M
  25. DefaultBufferedByteLimit = 1e9 // 1G
  26. )
  27. const (
  28. none mode = iota
  29. add
  30. addWait
  31. )
  32. var (
  33. // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
  34. ErrOverflow = errors.New("bundler reached buffered byte limit")
  35. // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
  36. ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
  37. // errMixedMethods indicates that mutually exclusive methods has been
  38. // called subsequently.
  39. errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
  40. )
  41. // A Bundler collects items added to it into a bundle until the bundle
  42. // exceeds a given size, then calls a user-provided function to handle the
  43. // bundle.
  44. //
  45. // The exported fields are only safe to modify prior to the first call to Add
  46. // or AddWait.
  47. type Bundler struct {
  48. // Starting from the time that the first message is added to a bundle, once
  49. // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
  50. DelayThreshold time.Duration
  51. // Once a bundle has this many items, handle the bundle. Since only one
  52. // item at a time is added to a bundle, no bundle will exceed this
  53. // threshold, so it also serves as a limit. The default is
  54. // DefaultBundleCountThreshold.
  55. BundleCountThreshold int
  56. // Once the number of bytes in current bundle reaches this threshold, handle
  57. // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
  58. // but does not cap the total size of a bundle.
  59. BundleByteThreshold int
  60. // The maximum size of a bundle, in bytes. Zero means unlimited.
  61. BundleByteLimit int
  62. // The maximum number of bytes that the Bundler will keep in memory before
  63. // returning ErrOverflow. The default is DefaultBufferedByteLimit.
  64. BufferedByteLimit int
  65. // The maximum number of handler invocations that can be running at once.
  66. // The default is 1.
  67. HandlerLimit int
  68. handler func(interface{}) // called to handle a bundle
  69. itemSliceZero reflect.Value // nil (zero value) for slice of items
  70. mu sync.Mutex // guards access to fields below
  71. flushTimer *time.Timer // implements DelayThreshold
  72. handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them)
  73. sem *semaphore.Weighted // enforces BufferedByteLimit
  74. semOnce sync.Once // guards semaphore initialization
  75. // The current bundle we're adding items to. Not yet in the queue.
  76. // Appended to the queue once the flushTimer fires or the bundle
  77. // thresholds/limits are reached. If curBundle is nil and tail is
  78. // not, we first try to add items to tail. Once tail is full or handled,
  79. // we create a new curBundle for the incoming item.
  80. curBundle *bundle
  81. // The next bundle in the queue to be handled. Nil if the queue is
  82. // empty.
  83. head *bundle
  84. // The last bundle in the queue to be handled. Nil if the queue is
  85. // empty. If curBundle is nil and tail isn't, we attempt to add new
  86. // items to the tail until if becomes full or has been passed to the
  87. // handler.
  88. tail *bundle
  89. curFlush *sync.WaitGroup // counts outstanding bundles since last flush
  90. prevFlush chan bool // signal used to wait for prior flush
  91. // The first call to Add or AddWait, mode will be add or addWait respectively.
  92. // If there wasn't call yet then mode is none.
  93. mode mode
  94. // TODO: consider alternative queue implementation for head/tail bundle. see:
  95. // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
  96. }
  97. // A bundle is a group of items that were added individually and will be passed
  98. // to a handler as a slice.
  99. type bundle struct {
  100. items reflect.Value // slice of T
  101. size int // size in bytes of all items
  102. next *bundle // bundles are handled in order as a linked list queue
  103. flush *sync.WaitGroup // the counter that tracks flush completion
  104. }
  105. // add appends item to this bundle and increments the total size. It requires
  106. // that b.mu is locked.
  107. func (bu *bundle) add(item interface{}, size int) {
  108. bu.items = reflect.Append(bu.items, reflect.ValueOf(item))
  109. bu.size += size
  110. }
  111. // NewBundler creates a new Bundler.
  112. //
  113. // itemExample is a value of the type that will be bundled. For example, if you
  114. // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
  115. //
  116. // handler is a function that will be called on each bundle. If itemExample is
  117. // of type T, the argument to handler is of type []T. handler is always called
  118. // sequentially for each bundle, and never in parallel.
  119. //
  120. // Configure the Bundler by setting its thresholds and limits before calling
  121. // any of its methods.
  122. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
  123. b := &Bundler{
  124. DelayThreshold: DefaultDelayThreshold,
  125. BundleCountThreshold: DefaultBundleCountThreshold,
  126. BundleByteThreshold: DefaultBundleByteThreshold,
  127. BufferedByteLimit: DefaultBufferedByteLimit,
  128. HandlerLimit: 1,
  129. handler: handler,
  130. itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
  131. curFlush: &sync.WaitGroup{},
  132. }
  133. return b
  134. }
  135. func (b *Bundler) initSemaphores() {
  136. // Create the semaphores lazily, because the user may set limits
  137. // after NewBundler.
  138. b.semOnce.Do(func() {
  139. b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
  140. })
  141. }
  142. // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
  143. // handled immediately if we are below HandlerLimit. It requires that b.mu is
  144. // locked.
  145. func (b *Bundler) enqueueCurBundle() {
  146. // We don't require callers to check if there is a pending bundle. It
  147. // may have already been appended to the queue. If so, return early.
  148. if b.curBundle == nil {
  149. return
  150. }
  151. // If we are below the HandlerLimit, the queue must be empty. Handle
  152. // immediately with a new goroutine.
  153. if b.handlerCount < b.HandlerLimit {
  154. b.handlerCount++
  155. go b.handle(b.curBundle)
  156. } else if b.tail != nil {
  157. // There are bundles on the queue, so append to the end
  158. b.tail.next = b.curBundle
  159. b.tail = b.curBundle
  160. } else {
  161. // The queue is empty, so initialize the queue
  162. b.head = b.curBundle
  163. b.tail = b.curBundle
  164. }
  165. b.curBundle = nil
  166. if b.flushTimer != nil {
  167. b.flushTimer.Stop()
  168. b.flushTimer = nil
  169. }
  170. }
  171. // setMode sets the state of Bundler's mode. If mode was defined before
  172. // and passed state is different from it then return an error.
  173. func (b *Bundler) setMode(m mode) error {
  174. b.mu.Lock()
  175. defer b.mu.Unlock()
  176. if b.mode == m || b.mode == none {
  177. b.mode = m
  178. return nil
  179. }
  180. return errMixedMethods
  181. }
  182. // canFit returns true if bu can fit an additional item of size bytes based
  183. // on the limits of Bundler b.
  184. func (b *Bundler) canFit(bu *bundle, size int) bool {
  185. return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
  186. (b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold)
  187. }
  188. // Add adds item to the current bundle. It marks the bundle for handling and
  189. // starts a new one if any of the thresholds or limits are exceeded.
  190. // The type of item must be assignable to the itemExample parameter of the NewBundler
  191. // method, otherwise there will be a panic.
  192. //
  193. // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
  194. // the item can never be handled. Add returns ErrOversizedItem in this case.
  195. //
  196. // If adding the item would exceed the maximum memory allowed
  197. // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
  198. // memory, Add returns ErrOverflow.
  199. //
  200. // Add never blocks.
  201. func (b *Bundler) Add(item interface{}, size int) error {
  202. if err := b.setMode(add); err != nil {
  203. return err
  204. }
  205. // If this item exceeds the maximum size of a bundle,
  206. // we can never send it.
  207. if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
  208. return ErrOversizedItem
  209. }
  210. // If adding this item would exceed our allotted memory
  211. // footprint, we can't accept it.
  212. // (TryAcquire also returns false if anything is waiting on the semaphore,
  213. // so calls to Add and AddWait shouldn't be mixed.)
  214. b.initSemaphores()
  215. if !b.sem.TryAcquire(int64(size)) {
  216. return ErrOverflow
  217. }
  218. b.mu.Lock()
  219. defer b.mu.Unlock()
  220. return b.add(item, size)
  221. }
  222. // add adds item to the tail of the bundle queue or curBundle depending on space
  223. // and nil-ness (see inline comments). It marks curBundle for handling (by
  224. // appending it to the queue) if any of the thresholds or limits are exceeded.
  225. // curBundle is lazily initialized. It requires that b.mu is locked.
  226. func (b *Bundler) add(item interface{}, size int) error {
  227. // If we don't have a curBundle, see if we can add to the queue tail.
  228. if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
  229. b.tail.add(item, size)
  230. return nil
  231. }
  232. // If we can't fit in the existing curBundle, move it onto the queue.
  233. if b.curBundle != nil && !b.canFit(b.curBundle, size) {
  234. b.enqueueCurBundle()
  235. }
  236. // Create a curBundle if we don't have one.
  237. if b.curBundle == nil {
  238. b.curFlush.Add(1)
  239. b.curBundle = &bundle{
  240. items: b.itemSliceZero,
  241. flush: b.curFlush,
  242. }
  243. }
  244. // Add the item.
  245. b.curBundle.add(item, size)
  246. // If curBundle is ready for handling, move it to the queue.
  247. if b.curBundle.size >= b.BundleByteThreshold ||
  248. b.curBundle.items.Len() == b.BundleCountThreshold {
  249. b.enqueueCurBundle()
  250. }
  251. // If we created a new bundle and it wasn't immediately handled, set a timer
  252. if b.curBundle != nil && b.flushTimer == nil {
  253. b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
  254. }
  255. return nil
  256. }
  257. // tryHandleBundles is the timer callback that handles or queues any current
  258. // bundle after DelayThreshold time, even if the bundle isn't completely full.
  259. func (b *Bundler) tryHandleBundles() {
  260. b.mu.Lock()
  261. b.enqueueCurBundle()
  262. b.mu.Unlock()
  263. }
  264. // next returns the next bundle that is ready for handling and removes it from
  265. // the internal queue. It requires that b.mu is locked.
  266. func (b *Bundler) next() *bundle {
  267. if b.head == nil {
  268. return nil
  269. }
  270. out := b.head
  271. b.head = b.head.next
  272. if b.head == nil {
  273. b.tail = nil
  274. }
  275. out.next = nil
  276. return out
  277. }
  278. // handle calls the user-specified handler on the given bundle. handle is
  279. // intended to be run as a goroutine. After the handler returns, we update the
  280. // byte total. handle continues processing additional bundles that are ready.
  281. // If no more bundles are ready, the handler count is decremented and the
  282. // goroutine ends.
  283. func (b *Bundler) handle(bu *bundle) {
  284. for bu != nil {
  285. b.handler(bu.items.Interface())
  286. bu = b.postHandle(bu)
  287. }
  288. }
  289. func (b *Bundler) postHandle(bu *bundle) *bundle {
  290. b.mu.Lock()
  291. defer b.mu.Unlock()
  292. b.sem.Release(int64(bu.size))
  293. bu.flush.Done()
  294. bu = b.next()
  295. if bu == nil {
  296. b.handlerCount--
  297. }
  298. return bu
  299. }
  300. // AddWait adds item to the current bundle. It marks the bundle for handling and
  301. // starts a new one if any of the thresholds or limits are exceeded.
  302. //
  303. // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
  304. // the item can never be handled. AddWait returns ErrOversizedItem in this case.
  305. //
  306. // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
  307. // AddWait blocks until space is available or ctx is done.
  308. //
  309. // Calls to Add and AddWait should not be mixed on the same Bundler.
  310. func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
  311. if err := b.setMode(addWait); err != nil {
  312. return err
  313. }
  314. // If this item exceeds the maximum size of a bundle,
  315. // we can never send it.
  316. if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
  317. return ErrOversizedItem
  318. }
  319. // If adding this item would exceed our allotted memory footprint, block
  320. // until space is available. The semaphore is FIFO, so there will be no
  321. // starvation.
  322. b.initSemaphores()
  323. if err := b.sem.Acquire(ctx, int64(size)); err != nil {
  324. return err
  325. }
  326. b.mu.Lock()
  327. defer b.mu.Unlock()
  328. return b.add(item, size)
  329. }
  330. // Flush invokes the handler for all remaining items in the Bundler and waits
  331. // for it to return.
  332. func (b *Bundler) Flush() {
  333. b.mu.Lock()
  334. // If a curBundle is pending, move it to the queue.
  335. b.enqueueCurBundle()
  336. // Store a pointer to the WaitGroup that counts outstanding bundles
  337. // in the current flush and create a new one to track the next flush.
  338. wg := b.curFlush
  339. b.curFlush = &sync.WaitGroup{}
  340. // Flush must wait for all prior, outstanding flushes to complete.
  341. // We use a channel to communicate completion between each flush in
  342. // the sequence.
  343. prev := b.prevFlush
  344. next := make(chan bool)
  345. b.prevFlush = next
  346. b.mu.Unlock()
  347. // Wait until the previous flush is finished.
  348. if prev != nil {
  349. <-prev
  350. }
  351. // Wait until this flush is finished.
  352. wg.Wait()
  353. // Allow the next flush to finish.
  354. close(next)
  355. }