123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- // Copyright 2016 Google LLC.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Package bundler supports bundling (batching) of items. Bundling amortizes an
- // action with fixed costs over multiple items. For example, if an API provides
- // an RPC that accepts a list of items as input, but clients would prefer
- // adding items one at a time, then a Bundler can accept individual items from
- // the client and bundle many of them into a single RPC.
- //
- // This package is experimental and subject to change without notice.
- package bundler
- import (
- "context"
- "errors"
- "reflect"
- "sync"
- "time"
- "golang.org/x/sync/semaphore"
- )
- type mode int
- const (
- DefaultDelayThreshold = time.Second
- DefaultBundleCountThreshold = 10
- DefaultBundleByteThreshold = 1e6 // 1M
- DefaultBufferedByteLimit = 1e9 // 1G
- )
- const (
- none mode = iota
- add
- addWait
- )
- var (
- // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
- ErrOverflow = errors.New("bundler reached buffered byte limit")
- // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
- ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
- // errMixedMethods indicates that mutually exclusive methods has been
- // called subsequently.
- errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
- )
- // A Bundler collects items added to it into a bundle until the bundle
- // exceeds a given size, then calls a user-provided function to handle the
- // bundle.
- //
- // The exported fields are only safe to modify prior to the first call to Add
- // or AddWait.
- type Bundler struct {
- // Starting from the time that the first message is added to a bundle, once
- // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
- DelayThreshold time.Duration
- // Once a bundle has this many items, handle the bundle. Since only one
- // item at a time is added to a bundle, no bundle will exceed this
- // threshold, so it also serves as a limit. The default is
- // DefaultBundleCountThreshold.
- BundleCountThreshold int
- // Once the number of bytes in current bundle reaches this threshold, handle
- // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
- // but does not cap the total size of a bundle.
- BundleByteThreshold int
- // The maximum size of a bundle, in bytes. Zero means unlimited.
- BundleByteLimit int
- // The maximum number of bytes that the Bundler will keep in memory before
- // returning ErrOverflow. The default is DefaultBufferedByteLimit.
- BufferedByteLimit int
- // The maximum number of handler invocations that can be running at once.
- // The default is 1.
- HandlerLimit int
- handler func(interface{}) // called to handle a bundle
- itemSliceZero reflect.Value // nil (zero value) for slice of items
- mu sync.Mutex // guards access to fields below
- flushTimer *time.Timer // implements DelayThreshold
- handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them)
- sem *semaphore.Weighted // enforces BufferedByteLimit
- semOnce sync.Once // guards semaphore initialization
- // The current bundle we're adding items to. Not yet in the queue.
- // Appended to the queue once the flushTimer fires or the bundle
- // thresholds/limits are reached. If curBundle is nil and tail is
- // not, we first try to add items to tail. Once tail is full or handled,
- // we create a new curBundle for the incoming item.
- curBundle *bundle
- // The next bundle in the queue to be handled. Nil if the queue is
- // empty.
- head *bundle
- // The last bundle in the queue to be handled. Nil if the queue is
- // empty. If curBundle is nil and tail isn't, we attempt to add new
- // items to the tail until if becomes full or has been passed to the
- // handler.
- tail *bundle
- curFlush *sync.WaitGroup // counts outstanding bundles since last flush
- prevFlush chan bool // signal used to wait for prior flush
- // The first call to Add or AddWait, mode will be add or addWait respectively.
- // If there wasn't call yet then mode is none.
- mode mode
- // TODO: consider alternative queue implementation for head/tail bundle. see:
- // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
- }
- // A bundle is a group of items that were added individually and will be passed
- // to a handler as a slice.
- type bundle struct {
- items reflect.Value // slice of T
- size int // size in bytes of all items
- next *bundle // bundles are handled in order as a linked list queue
- flush *sync.WaitGroup // the counter that tracks flush completion
- }
- // add appends item to this bundle and increments the total size. It requires
- // that b.mu is locked.
- func (bu *bundle) add(item interface{}, size int) {
- bu.items = reflect.Append(bu.items, reflect.ValueOf(item))
- bu.size += size
- }
- // NewBundler creates a new Bundler.
- //
- // itemExample is a value of the type that will be bundled. For example, if you
- // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
- //
- // handler is a function that will be called on each bundle. If itemExample is
- // of type T, the argument to handler is of type []T. handler is always called
- // sequentially for each bundle, and never in parallel.
- //
- // Configure the Bundler by setting its thresholds and limits before calling
- // any of its methods.
- func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
- b := &Bundler{
- DelayThreshold: DefaultDelayThreshold,
- BundleCountThreshold: DefaultBundleCountThreshold,
- BundleByteThreshold: DefaultBundleByteThreshold,
- BufferedByteLimit: DefaultBufferedByteLimit,
- HandlerLimit: 1,
- handler: handler,
- itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
- curFlush: &sync.WaitGroup{},
- }
- return b
- }
- func (b *Bundler) initSemaphores() {
- // Create the semaphores lazily, because the user may set limits
- // after NewBundler.
- b.semOnce.Do(func() {
- b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
- })
- }
- // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
- // handled immediately if we are below HandlerLimit. It requires that b.mu is
- // locked.
- func (b *Bundler) enqueueCurBundle() {
- // We don't require callers to check if there is a pending bundle. It
- // may have already been appended to the queue. If so, return early.
- if b.curBundle == nil {
- return
- }
- // If we are below the HandlerLimit, the queue must be empty. Handle
- // immediately with a new goroutine.
- if b.handlerCount < b.HandlerLimit {
- b.handlerCount++
- go b.handle(b.curBundle)
- } else if b.tail != nil {
- // There are bundles on the queue, so append to the end
- b.tail.next = b.curBundle
- b.tail = b.curBundle
- } else {
- // The queue is empty, so initialize the queue
- b.head = b.curBundle
- b.tail = b.curBundle
- }
- b.curBundle = nil
- if b.flushTimer != nil {
- b.flushTimer.Stop()
- b.flushTimer = nil
- }
- }
- // setMode sets the state of Bundler's mode. If mode was defined before
- // and passed state is different from it then return an error.
- func (b *Bundler) setMode(m mode) error {
- b.mu.Lock()
- defer b.mu.Unlock()
- if b.mode == m || b.mode == none {
- b.mode = m
- return nil
- }
- return errMixedMethods
- }
- // canFit returns true if bu can fit an additional item of size bytes based
- // on the limits of Bundler b.
- func (b *Bundler) canFit(bu *bundle, size int) bool {
- return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
- (b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold)
- }
- // Add adds item to the current bundle. It marks the bundle for handling and
- // starts a new one if any of the thresholds or limits are exceeded.
- // The type of item must be assignable to the itemExample parameter of the NewBundler
- // method, otherwise there will be a panic.
- //
- // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
- // the item can never be handled. Add returns ErrOversizedItem in this case.
- //
- // If adding the item would exceed the maximum memory allowed
- // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
- // memory, Add returns ErrOverflow.
- //
- // Add never blocks.
- func (b *Bundler) Add(item interface{}, size int) error {
- if err := b.setMode(add); err != nil {
- return err
- }
- // If this item exceeds the maximum size of a bundle,
- // we can never send it.
- if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
- return ErrOversizedItem
- }
- // If adding this item would exceed our allotted memory
- // footprint, we can't accept it.
- // (TryAcquire also returns false if anything is waiting on the semaphore,
- // so calls to Add and AddWait shouldn't be mixed.)
- b.initSemaphores()
- if !b.sem.TryAcquire(int64(size)) {
- return ErrOverflow
- }
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.add(item, size)
- }
- // add adds item to the tail of the bundle queue or curBundle depending on space
- // and nil-ness (see inline comments). It marks curBundle for handling (by
- // appending it to the queue) if any of the thresholds or limits are exceeded.
- // curBundle is lazily initialized. It requires that b.mu is locked.
- func (b *Bundler) add(item interface{}, size int) error {
- // If we don't have a curBundle, see if we can add to the queue tail.
- if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
- b.tail.add(item, size)
- return nil
- }
- // If we can't fit in the existing curBundle, move it onto the queue.
- if b.curBundle != nil && !b.canFit(b.curBundle, size) {
- b.enqueueCurBundle()
- }
- // Create a curBundle if we don't have one.
- if b.curBundle == nil {
- b.curFlush.Add(1)
- b.curBundle = &bundle{
- items: b.itemSliceZero,
- flush: b.curFlush,
- }
- }
- // Add the item.
- b.curBundle.add(item, size)
- // If curBundle is ready for handling, move it to the queue.
- if b.curBundle.size >= b.BundleByteThreshold ||
- b.curBundle.items.Len() == b.BundleCountThreshold {
- b.enqueueCurBundle()
- }
- // If we created a new bundle and it wasn't immediately handled, set a timer
- if b.curBundle != nil && b.flushTimer == nil {
- b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
- }
- return nil
- }
- // tryHandleBundles is the timer callback that handles or queues any current
- // bundle after DelayThreshold time, even if the bundle isn't completely full.
- func (b *Bundler) tryHandleBundles() {
- b.mu.Lock()
- b.enqueueCurBundle()
- b.mu.Unlock()
- }
- // next returns the next bundle that is ready for handling and removes it from
- // the internal queue. It requires that b.mu is locked.
- func (b *Bundler) next() *bundle {
- if b.head == nil {
- return nil
- }
- out := b.head
- b.head = b.head.next
- if b.head == nil {
- b.tail = nil
- }
- out.next = nil
- return out
- }
- // handle calls the user-specified handler on the given bundle. handle is
- // intended to be run as a goroutine. After the handler returns, we update the
- // byte total. handle continues processing additional bundles that are ready.
- // If no more bundles are ready, the handler count is decremented and the
- // goroutine ends.
- func (b *Bundler) handle(bu *bundle) {
- for bu != nil {
- b.handler(bu.items.Interface())
- bu = b.postHandle(bu)
- }
- }
- func (b *Bundler) postHandle(bu *bundle) *bundle {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.sem.Release(int64(bu.size))
- bu.flush.Done()
- bu = b.next()
- if bu == nil {
- b.handlerCount--
- }
- return bu
- }
- // AddWait adds item to the current bundle. It marks the bundle for handling and
- // starts a new one if any of the thresholds or limits are exceeded.
- //
- // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
- // the item can never be handled. AddWait returns ErrOversizedItem in this case.
- //
- // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
- // AddWait blocks until space is available or ctx is done.
- //
- // Calls to Add and AddWait should not be mixed on the same Bundler.
- func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
- if err := b.setMode(addWait); err != nil {
- return err
- }
- // If this item exceeds the maximum size of a bundle,
- // we can never send it.
- if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
- return ErrOversizedItem
- }
- // If adding this item would exceed our allotted memory footprint, block
- // until space is available. The semaphore is FIFO, so there will be no
- // starvation.
- b.initSemaphores()
- if err := b.sem.Acquire(ctx, int64(size)); err != nil {
- return err
- }
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.add(item, size)
- }
- // Flush invokes the handler for all remaining items in the Bundler and waits
- // for it to return.
- func (b *Bundler) Flush() {
- b.mu.Lock()
- // If a curBundle is pending, move it to the queue.
- b.enqueueCurBundle()
- // Store a pointer to the WaitGroup that counts outstanding bundles
- // in the current flush and create a new one to track the next flush.
- wg := b.curFlush
- b.curFlush = &sync.WaitGroup{}
- // Flush must wait for all prior, outstanding flushes to complete.
- // We use a channel to communicate completion between each flush in
- // the sequence.
- prev := b.prevFlush
- next := make(chan bool)
- b.prevFlush = next
- b.mu.Unlock()
- // Wait until the previous flush is finished.
- if prev != nil {
- <-prev
- }
- // Wait until this flush is finished.
- wg.Wait()
- // Allow the next flush to finish.
- close(next)
- }
|