bytespipe.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package ioutils
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. )
  7. // maxCap is the highest capacity to use in byte slices that buffer data.
  8. const maxCap = 1e6
  9. // minCap is the lowest capacity to use in byte slices that buffer data
  10. const minCap = 64
  11. // blockThreshold is the minimum number of bytes in the buffer which will cause
  12. // a write to BytesPipe to block when allocating a new slice.
  13. const blockThreshold = 1e6
  14. var (
  15. // ErrClosed is returned when Write is called on a closed BytesPipe.
  16. ErrClosed = errors.New("write to closed BytesPipe")
  17. bufPools = make(map[int]*sync.Pool)
  18. bufPoolsLock sync.Mutex
  19. )
  20. // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
  21. // All written data may be read at most once. Also, BytesPipe allocates
  22. // and releases new byte slices to adjust to current needs, so the buffer
  23. // won't be overgrown after peak loads.
  24. type BytesPipe struct {
  25. mu sync.Mutex
  26. wait *sync.Cond
  27. buf []*fixedBuffer
  28. bufLen int
  29. closeErr error // error to return from next Read. set to nil if not closed.
  30. }
  31. // NewBytesPipe creates new BytesPipe, initialized by specified slice.
  32. // If buf is nil, then it will be initialized with slice which cap is 64.
  33. // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
  34. func NewBytesPipe() *BytesPipe {
  35. bp := &BytesPipe{}
  36. bp.buf = append(bp.buf, getBuffer(minCap))
  37. bp.wait = sync.NewCond(&bp.mu)
  38. return bp
  39. }
  40. // Write writes p to BytesPipe.
  41. // It can allocate new []byte slices in a process of writing.
  42. func (bp *BytesPipe) Write(p []byte) (int, error) {
  43. bp.mu.Lock()
  44. written := 0
  45. loop0:
  46. for {
  47. if bp.closeErr != nil {
  48. bp.mu.Unlock()
  49. return written, ErrClosed
  50. }
  51. if len(bp.buf) == 0 {
  52. bp.buf = append(bp.buf, getBuffer(64))
  53. }
  54. // get the last buffer
  55. b := bp.buf[len(bp.buf)-1]
  56. n, err := b.Write(p)
  57. written += n
  58. bp.bufLen += n
  59. // errBufferFull is an error we expect to get if the buffer is full
  60. if err != nil && err != errBufferFull {
  61. bp.wait.Broadcast()
  62. bp.mu.Unlock()
  63. return written, err
  64. }
  65. // if there was enough room to write all then break
  66. if len(p) == n {
  67. break
  68. }
  69. // more data: write to the next slice
  70. p = p[n:]
  71. // make sure the buffer doesn't grow too big from this write
  72. for bp.bufLen >= blockThreshold {
  73. bp.wait.Wait()
  74. if bp.closeErr != nil {
  75. continue loop0
  76. }
  77. }
  78. // add new byte slice to the buffers slice and continue writing
  79. nextCap := b.Cap() * 2
  80. if nextCap > maxCap {
  81. nextCap = maxCap
  82. }
  83. bp.buf = append(bp.buf, getBuffer(nextCap))
  84. }
  85. bp.wait.Broadcast()
  86. bp.mu.Unlock()
  87. return written, nil
  88. }
  89. // CloseWithError causes further reads from a BytesPipe to return immediately.
  90. func (bp *BytesPipe) CloseWithError(err error) error {
  91. bp.mu.Lock()
  92. if err != nil {
  93. bp.closeErr = err
  94. } else {
  95. bp.closeErr = io.EOF
  96. }
  97. bp.wait.Broadcast()
  98. bp.mu.Unlock()
  99. return nil
  100. }
  101. // Close causes further reads from a BytesPipe to return immediately.
  102. func (bp *BytesPipe) Close() error {
  103. return bp.CloseWithError(nil)
  104. }
  105. // Read reads bytes from BytesPipe.
  106. // Data could be read only once.
  107. func (bp *BytesPipe) Read(p []byte) (n int, err error) {
  108. bp.mu.Lock()
  109. if bp.bufLen == 0 {
  110. if bp.closeErr != nil {
  111. bp.mu.Unlock()
  112. return 0, bp.closeErr
  113. }
  114. bp.wait.Wait()
  115. if bp.bufLen == 0 && bp.closeErr != nil {
  116. err := bp.closeErr
  117. bp.mu.Unlock()
  118. return 0, err
  119. }
  120. }
  121. for bp.bufLen > 0 {
  122. b := bp.buf[0]
  123. read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
  124. n += read
  125. bp.bufLen -= read
  126. if b.Len() == 0 {
  127. // it's empty so return it to the pool and move to the next one
  128. returnBuffer(b)
  129. bp.buf[0] = nil
  130. bp.buf = bp.buf[1:]
  131. }
  132. if len(p) == read {
  133. break
  134. }
  135. p = p[read:]
  136. }
  137. bp.wait.Broadcast()
  138. bp.mu.Unlock()
  139. return
  140. }
  141. func returnBuffer(b *fixedBuffer) {
  142. b.Reset()
  143. bufPoolsLock.Lock()
  144. pool := bufPools[b.Cap()]
  145. bufPoolsLock.Unlock()
  146. if pool != nil {
  147. pool.Put(b)
  148. }
  149. }
  150. func getBuffer(size int) *fixedBuffer {
  151. bufPoolsLock.Lock()
  152. pool, ok := bufPools[size]
  153. if !ok {
  154. pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
  155. bufPools[size] = pool
  156. }
  157. bufPoolsLock.Unlock()
  158. return pool.Get().(*fixedBuffer)
  159. }