bytespipe.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package ioutils // import "github.com/docker/docker/pkg/ioutils"
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. )
  7. // maxCap is the highest capacity to use in byte slices that buffer data.
  8. const maxCap = 1e6
  9. // minCap is the lowest capacity to use in byte slices that buffer data
  10. const minCap = 64
  11. // blockThreshold is the minimum number of bytes in the buffer which will cause
  12. // a write to BytesPipe to block when allocating a new slice.
  13. const blockThreshold = 1e6
  14. var (
  15. // ErrClosed is returned when Write is called on a closed BytesPipe.
  16. ErrClosed = errors.New("write to closed BytesPipe")
  17. bufPools = make(map[int]*sync.Pool)
  18. bufPoolsLock sync.Mutex
  19. )
  20. // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
  21. // All written data may be read at most once. Also, BytesPipe allocates
  22. // and releases new byte slices to adjust to current needs, so the buffer
  23. // won't be overgrown after peak loads.
  24. type BytesPipe struct {
  25. mu sync.Mutex
  26. wait *sync.Cond
  27. buf []*fixedBuffer
  28. bufLen int
  29. closeErr error // error to return from next Read. set to nil if not closed.
  30. readBlock bool // check read BytesPipe is Wait() or not
  31. }
  32. // NewBytesPipe creates new BytesPipe, initialized by specified slice.
  33. // If buf is nil, then it will be initialized with slice which cap is 64.
  34. // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
  35. func NewBytesPipe() *BytesPipe {
  36. bp := &BytesPipe{}
  37. bp.buf = append(bp.buf, getBuffer(minCap))
  38. bp.wait = sync.NewCond(&bp.mu)
  39. return bp
  40. }
  41. // Write writes p to BytesPipe.
  42. // It can allocate new []byte slices in a process of writing.
  43. func (bp *BytesPipe) Write(p []byte) (int, error) {
  44. bp.mu.Lock()
  45. defer bp.mu.Unlock()
  46. written := 0
  47. loop0:
  48. for {
  49. if bp.closeErr != nil {
  50. return written, ErrClosed
  51. }
  52. if len(bp.buf) == 0 {
  53. bp.buf = append(bp.buf, getBuffer(64))
  54. }
  55. // get the last buffer
  56. b := bp.buf[len(bp.buf)-1]
  57. n, err := b.Write(p)
  58. written += n
  59. bp.bufLen += n
  60. // errBufferFull is an error we expect to get if the buffer is full
  61. if err != nil && err != errBufferFull {
  62. bp.wait.Broadcast()
  63. return written, err
  64. }
  65. // if there was enough room to write all then break
  66. if len(p) == n {
  67. break
  68. }
  69. // more data: write to the next slice
  70. p = p[n:]
  71. // make sure the buffer doesn't grow too big from this write
  72. for bp.bufLen >= blockThreshold {
  73. if bp.readBlock {
  74. bp.wait.Broadcast()
  75. }
  76. bp.wait.Wait()
  77. if bp.closeErr != nil {
  78. continue loop0
  79. }
  80. }
  81. // add new byte slice to the buffers slice and continue writing
  82. nextCap := b.Cap() * 2
  83. if nextCap > maxCap {
  84. nextCap = maxCap
  85. }
  86. bp.buf = append(bp.buf, getBuffer(nextCap))
  87. }
  88. bp.wait.Broadcast()
  89. return written, nil
  90. }
  91. // CloseWithError causes further reads from a BytesPipe to return immediately.
  92. func (bp *BytesPipe) CloseWithError(err error) error {
  93. bp.mu.Lock()
  94. if err != nil {
  95. bp.closeErr = err
  96. } else {
  97. bp.closeErr = io.EOF
  98. }
  99. bp.wait.Broadcast()
  100. bp.mu.Unlock()
  101. return nil
  102. }
  103. // Close causes further reads from a BytesPipe to return immediately.
  104. func (bp *BytesPipe) Close() error {
  105. return bp.CloseWithError(nil)
  106. }
  107. // Read reads bytes from BytesPipe.
  108. // Data could be read only once.
  109. func (bp *BytesPipe) Read(p []byte) (n int, err error) {
  110. bp.mu.Lock()
  111. defer bp.mu.Unlock()
  112. if bp.bufLen == 0 {
  113. if bp.closeErr != nil {
  114. return 0, bp.closeErr
  115. }
  116. bp.readBlock = true
  117. bp.wait.Wait()
  118. bp.readBlock = false
  119. if bp.bufLen == 0 && bp.closeErr != nil {
  120. return 0, bp.closeErr
  121. }
  122. }
  123. for bp.bufLen > 0 {
  124. b := bp.buf[0]
  125. read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
  126. n += read
  127. bp.bufLen -= read
  128. if b.Len() == 0 {
  129. // it's empty so return it to the pool and move to the next one
  130. returnBuffer(b)
  131. bp.buf[0] = nil
  132. bp.buf = bp.buf[1:]
  133. }
  134. if len(p) == read {
  135. break
  136. }
  137. p = p[read:]
  138. }
  139. bp.wait.Broadcast()
  140. return
  141. }
  142. func returnBuffer(b *fixedBuffer) {
  143. b.Reset()
  144. bufPoolsLock.Lock()
  145. pool := bufPools[b.Cap()]
  146. bufPoolsLock.Unlock()
  147. if pool != nil {
  148. pool.Put(b)
  149. }
  150. }
  151. func getBuffer(size int) *fixedBuffer {
  152. bufPoolsLock.Lock()
  153. pool, ok := bufPools[size]
  154. if !ok {
  155. pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
  156. bufPools[size] = pool
  157. }
  158. bufPoolsLock.Unlock()
  159. return pool.Get().(*fixedBuffer)
  160. }