123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package ioutils
- import (
- "errors"
- "io"
- "sync"
- )
- // maxCap is the highest capacity to use in byte slices that buffer data.
- const maxCap = 1e6
- // minCap is the lowest capacity to use in byte slices that buffer data
- const minCap = 64
- // blockThreshold is the minimum number of bytes in the buffer which will cause
- // a write to BytesPipe to block when allocating a new slice.
- const blockThreshold = 1e6
- var (
- // ErrClosed is returned when Write is called on a closed BytesPipe.
- ErrClosed = errors.New("write to closed BytesPipe")
- bufPools = make(map[int]*sync.Pool)
- bufPoolsLock sync.Mutex
- )
- // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
- // All written data may be read at most once. Also, BytesPipe allocates
- // and releases new byte slices to adjust to current needs, so the buffer
- // won't be overgrown after peak loads.
- type BytesPipe struct {
- mu sync.Mutex
- wait *sync.Cond
- buf []*fixedBuffer
- bufLen int
- closeErr error // error to return from next Read. set to nil if not closed.
- }
- // NewBytesPipe creates new BytesPipe, initialized by specified slice.
- // If buf is nil, then it will be initialized with slice which cap is 64.
- // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
- func NewBytesPipe() *BytesPipe {
- bp := &BytesPipe{}
- bp.buf = append(bp.buf, getBuffer(minCap))
- bp.wait = sync.NewCond(&bp.mu)
- return bp
- }
- // Write writes p to BytesPipe.
- // It can allocate new []byte slices in a process of writing.
- func (bp *BytesPipe) Write(p []byte) (int, error) {
- bp.mu.Lock()
- written := 0
- loop0:
- for {
- if bp.closeErr != nil {
- bp.mu.Unlock()
- return written, ErrClosed
- }
- if len(bp.buf) == 0 {
- bp.buf = append(bp.buf, getBuffer(64))
- }
- // get the last buffer
- b := bp.buf[len(bp.buf)-1]
- n, err := b.Write(p)
- written += n
- bp.bufLen += n
- // errBufferFull is an error we expect to get if the buffer is full
- if err != nil && err != errBufferFull {
- bp.wait.Broadcast()
- bp.mu.Unlock()
- return written, err
- }
- // if there was enough room to write all then break
- if len(p) == n {
- break
- }
- // more data: write to the next slice
- p = p[n:]
- // make sure the buffer doesn't grow too big from this write
- for bp.bufLen >= blockThreshold {
- bp.wait.Wait()
- if bp.closeErr != nil {
- continue loop0
- }
- }
- // add new byte slice to the buffers slice and continue writing
- nextCap := b.Cap() * 2
- if nextCap > maxCap {
- nextCap = maxCap
- }
- bp.buf = append(bp.buf, getBuffer(nextCap))
- }
- bp.wait.Broadcast()
- bp.mu.Unlock()
- return written, nil
- }
- // CloseWithError causes further reads from a BytesPipe to return immediately.
- func (bp *BytesPipe) CloseWithError(err error) error {
- bp.mu.Lock()
- if err != nil {
- bp.closeErr = err
- } else {
- bp.closeErr = io.EOF
- }
- bp.wait.Broadcast()
- bp.mu.Unlock()
- return nil
- }
- // Close causes further reads from a BytesPipe to return immediately.
- func (bp *BytesPipe) Close() error {
- return bp.CloseWithError(nil)
- }
- // Read reads bytes from BytesPipe.
- // Data could be read only once.
- func (bp *BytesPipe) Read(p []byte) (n int, err error) {
- bp.mu.Lock()
- if bp.bufLen == 0 {
- if bp.closeErr != nil {
- bp.mu.Unlock()
- return 0, bp.closeErr
- }
- bp.wait.Wait()
- if bp.bufLen == 0 && bp.closeErr != nil {
- err := bp.closeErr
- bp.mu.Unlock()
- return 0, err
- }
- }
- for bp.bufLen > 0 {
- b := bp.buf[0]
- read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
- n += read
- bp.bufLen -= read
- if b.Len() == 0 {
- // it's empty so return it to the pool and move to the next one
- returnBuffer(b)
- bp.buf[0] = nil
- bp.buf = bp.buf[1:]
- }
- if len(p) == read {
- break
- }
- p = p[read:]
- }
- bp.wait.Broadcast()
- bp.mu.Unlock()
- return
- }
- func returnBuffer(b *fixedBuffer) {
- b.Reset()
- bufPoolsLock.Lock()
- pool := bufPools[b.Cap()]
- bufPoolsLock.Unlock()
- if pool != nil {
- pool.Put(b)
- }
- }
- func getBuffer(size int) *fixedBuffer {
- bufPoolsLock.Lock()
- pool, ok := bufPools[size]
- if !ok {
- pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
- bufPools[size] = pool
- }
- bufPoolsLock.Unlock()
- return pool.Get().(*fixedBuffer)
- }
|