bundler.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package bundler supports bundling (batching) of items. Bundling amortizes an
  15. // action with fixed costs over multiple items. For example, if an API provides
  16. // an RPC that accepts a list of items as input, but clients would prefer
  17. // adding items one at a time, then a Bundler can accept individual items from
  18. // the client and bundle many of them into a single RPC.
  19. //
  20. // This package is experimental and subject to change without notice.
  21. package bundler
  22. import (
  23. "errors"
  24. "reflect"
  25. "sync"
  26. "time"
  27. )
  28. const (
  29. DefaultDelayThreshold = time.Second
  30. DefaultBundleCountThreshold = 10
  31. DefaultBundleByteThreshold = 1e6 // 1M
  32. DefaultBufferedByteLimit = 1e9 // 1G
  33. )
  34. var (
  35. // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
  36. ErrOverflow = errors.New("bundler reached buffered byte limit")
  37. // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
  38. ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
  39. )
  40. // A Bundler collects items added to it into a bundle until the bundle
  41. // exceeds a given size, then calls a user-provided function to handle the bundle.
  42. type Bundler struct {
  43. // Starting from the time that the first message is added to a bundle, once
  44. // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
  45. DelayThreshold time.Duration
  46. // Once a bundle has this many items, handle the bundle. Since only one
  47. // item at a time is added to a bundle, no bundle will exceed this
  48. // threshold, so it also serves as a limit. The default is
  49. // DefaultBundleCountThreshold.
  50. BundleCountThreshold int
  51. // Once the number of bytes in current bundle reaches this threshold, handle
  52. // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
  53. // but does not cap the total size of a bundle.
  54. BundleByteThreshold int
  55. // The maximum size of a bundle, in bytes. Zero means unlimited.
  56. BundleByteLimit int
  57. // The maximum number of bytes that the Bundler will keep in memory before
  58. // returning ErrOverflow. The default is DefaultBufferedByteLimit.
  59. BufferedByteLimit int
  60. handler func(interface{}) // called to handle a bundle
  61. itemSliceZero reflect.Value // nil (zero value) for slice of items
  62. donec chan struct{} // closed when the Bundler is closed
  63. handlec chan int // sent to when a bundle is ready for handling
  64. timer *time.Timer // implements DelayThreshold
  65. mu sync.Mutex
  66. bufferedSize int // total bytes buffered
  67. closedBundles []bundle // bundles waiting to be handled
  68. curBundle bundle // incoming items added to this bundle
  69. calledc chan struct{} // closed and re-created after handler is called
  70. }
  71. type bundle struct {
  72. items reflect.Value // slice of item type
  73. size int // size in bytes of all items
  74. }
  75. // NewBundler creates a new Bundler. When you are finished with a Bundler, call
  76. // its Close method.
  77. //
  78. // itemExample is a value of the type that will be bundled. For example, if you
  79. // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
  80. //
  81. // handler is a function that will be called on each bundle. If itemExample is
  82. // of type T, the argument to handler is of type []T. handler is always called
  83. // sequentially for each bundle, and never in parallel.
  84. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
  85. b := &Bundler{
  86. DelayThreshold: DefaultDelayThreshold,
  87. BundleCountThreshold: DefaultBundleCountThreshold,
  88. BundleByteThreshold: DefaultBundleByteThreshold,
  89. BufferedByteLimit: DefaultBufferedByteLimit,
  90. handler: handler,
  91. itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
  92. donec: make(chan struct{}),
  93. handlec: make(chan int, 1),
  94. calledc: make(chan struct{}),
  95. timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout
  96. }
  97. b.curBundle.items = b.itemSliceZero
  98. go b.background()
  99. return b
  100. }
  101. // Add adds item to the current bundle. It marks the bundle for handling and
  102. // starts a new one if any of the thresholds or limits are exceeded.
  103. //
  104. // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
  105. // the item can never be handled. Add returns ErrOversizedItem in this case.
  106. //
  107. // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
  108. // Add returns ErrOverflow.
  109. //
  110. // Add never blocks.
  111. func (b *Bundler) Add(item interface{}, size int) error {
  112. // If this item exceeds the maximum size of a bundle,
  113. // we can never send it.
  114. if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
  115. return ErrOversizedItem
  116. }
  117. b.mu.Lock()
  118. defer b.mu.Unlock()
  119. // If adding this item would exceed our allotted memory
  120. // footprint, we can't accept it.
  121. if b.bufferedSize+size > b.BufferedByteLimit {
  122. return ErrOverflow
  123. }
  124. // If adding this item to the current bundle would cause it to exceed the
  125. // maximum bundle size, close the current bundle and start a new one.
  126. if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
  127. b.closeAndHandleBundle()
  128. }
  129. // Add the item.
  130. b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
  131. b.curBundle.size += size
  132. b.bufferedSize += size
  133. // If this is the first item in the bundle, restart the timer.
  134. if b.curBundle.items.Len() == 1 {
  135. b.timer.Reset(b.DelayThreshold)
  136. }
  137. // If the current bundle equals the count threshold, close it.
  138. if b.curBundle.items.Len() == b.BundleCountThreshold {
  139. b.closeAndHandleBundle()
  140. }
  141. // If the current bundle equals or exceeds the byte threshold, close it.
  142. if b.curBundle.size >= b.BundleByteThreshold {
  143. b.closeAndHandleBundle()
  144. }
  145. return nil
  146. }
  147. // Flush waits until all items in the Bundler have been handled (that is,
  148. // until the last invocation of handler has returned).
  149. func (b *Bundler) Flush() {
  150. b.mu.Lock()
  151. b.closeBundle()
  152. // Unconditionally trigger the handling goroutine, to ensure calledc is closed
  153. // even if there are no outstanding bundles.
  154. select {
  155. case b.handlec <- 1:
  156. default:
  157. }
  158. calledc := b.calledc // remember locally, because it may change
  159. b.mu.Unlock()
  160. <-calledc
  161. }
  162. // Close calls Flush, then shuts down the Bundler. Close should always be
  163. // called on a Bundler when it is no longer needed. You must wait for all calls
  164. // to Add to complete before calling Close. Calling Add concurrently with Close
  165. // may result in the added items being ignored.
  166. func (b *Bundler) Close() {
  167. b.Flush()
  168. b.mu.Lock()
  169. b.timer.Stop()
  170. b.mu.Unlock()
  171. close(b.donec)
  172. }
  173. func (b *Bundler) closeAndHandleBundle() {
  174. if b.closeBundle() {
  175. // We have created a closed bundle.
  176. // Send to handlec without blocking.
  177. select {
  178. case b.handlec <- 1:
  179. default:
  180. }
  181. }
  182. }
  183. // closeBundle finishes the current bundle, adds it to the list of closed
  184. // bundles and informs the background goroutine that there are bundles ready
  185. // for processing.
  186. //
  187. // This should always be called with b.mu held.
  188. func (b *Bundler) closeBundle() bool {
  189. if b.curBundle.items.Len() == 0 {
  190. return false
  191. }
  192. b.closedBundles = append(b.closedBundles, b.curBundle)
  193. b.curBundle.items = b.itemSliceZero
  194. b.curBundle.size = 0
  195. return true
  196. }
  197. // background runs in a separate goroutine, waiting for events and handling
  198. // bundles.
  199. func (b *Bundler) background() {
  200. done := false
  201. for {
  202. timedOut := false
  203. // Wait for something to happen.
  204. select {
  205. case <-b.handlec:
  206. case <-b.donec:
  207. done = true
  208. case <-b.timer.C:
  209. timedOut = true
  210. }
  211. // Handle closed bundles.
  212. b.mu.Lock()
  213. if timedOut {
  214. b.closeBundle()
  215. }
  216. buns := b.closedBundles
  217. b.closedBundles = nil
  218. // Closing calledc means we've sent all bundles. We need
  219. // a new channel for the next set of bundles, which may start
  220. // accumulating as soon as we release the lock.
  221. calledc := b.calledc
  222. b.calledc = make(chan struct{})
  223. b.mu.Unlock()
  224. for i, bun := range buns {
  225. b.handler(bun.items.Interface())
  226. // Drop the bundle's items, reducing our memory footprint.
  227. buns[i].items = reflect.Value{} // buns[i] because bun is a copy
  228. // Note immediately that we have more space, so Adds that occur
  229. // during this loop will have a chance of succeeding.
  230. b.mu.Lock()
  231. b.bufferedSize -= bun.size
  232. b.mu.Unlock()
  233. }
  234. // Signal that we've sent all outstanding bundles.
  235. close(calledc)
  236. if done {
  237. break
  238. }
  239. }
  240. }