bytespipe.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package ioutils
  2. import (
  3. "errors"
  4. "io"
  5. "sync"
  6. )
  7. // maxCap is the highest capacity to use in byte slices that buffer data.
  8. const maxCap = 1e6
  9. // blockThreshold is the minimum number of bytes in the buffer which will cause
  10. // a write to BytesPipe to block when allocating a new slice.
  11. const blockThreshold = 1e6
  12. // ErrClosed is returned when Write is called on a closed BytesPipe.
  13. var ErrClosed = errors.New("write to closed BytesPipe")
  14. // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
  15. // All written data may be read at most once. Also, BytesPipe allocates
  16. // and releases new byte slices to adjust to current needs, so the buffer
  17. // won't be overgrown after peak loads.
  18. type BytesPipe struct {
  19. mu sync.Mutex
  20. wait *sync.Cond
  21. buf [][]byte // slice of byte-slices of buffered data
  22. lastRead int // index in the first slice to a read point
  23. bufLen int // length of data buffered over the slices
  24. closeErr error // error to return from next Read. set to nil if not closed.
  25. }
  26. // NewBytesPipe creates new BytesPipe, initialized by specified slice.
  27. // If buf is nil, then it will be initialized with slice which cap is 64.
  28. // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
  29. func NewBytesPipe(buf []byte) *BytesPipe {
  30. if cap(buf) == 0 {
  31. buf = make([]byte, 0, 64)
  32. }
  33. bp := &BytesPipe{
  34. buf: [][]byte{buf[:0]},
  35. }
  36. bp.wait = sync.NewCond(&bp.mu)
  37. return bp
  38. }
  39. // Write writes p to BytesPipe.
  40. // It can allocate new []byte slices in a process of writing.
  41. func (bp *BytesPipe) Write(p []byte) (int, error) {
  42. bp.mu.Lock()
  43. defer bp.mu.Unlock()
  44. written := 0
  45. for {
  46. if bp.closeErr != nil {
  47. return written, ErrClosed
  48. }
  49. // write data to the last buffer
  50. b := bp.buf[len(bp.buf)-1]
  51. // copy data to the current empty allocated area
  52. n := copy(b[len(b):cap(b)], p)
  53. // increment buffered data length
  54. bp.bufLen += n
  55. // include written data in last buffer
  56. bp.buf[len(bp.buf)-1] = b[:len(b)+n]
  57. written += n
  58. // if there was enough room to write all then break
  59. if len(p) == n {
  60. break
  61. }
  62. // more data: write to the next slice
  63. p = p[n:]
  64. // block if too much data is still in the buffer
  65. for bp.bufLen >= blockThreshold {
  66. bp.wait.Wait()
  67. }
  68. // allocate slice that has twice the size of the last unless maximum reached
  69. nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
  70. if nextCap > maxCap {
  71. nextCap = maxCap
  72. }
  73. // add new byte slice to the buffers slice and continue writing
  74. bp.buf = append(bp.buf, make([]byte, 0, nextCap))
  75. }
  76. bp.wait.Broadcast()
  77. return written, nil
  78. }
  79. // CloseWithError causes further reads from a BytesPipe to return immediately.
  80. func (bp *BytesPipe) CloseWithError(err error) error {
  81. bp.mu.Lock()
  82. if err != nil {
  83. bp.closeErr = err
  84. } else {
  85. bp.closeErr = io.EOF
  86. }
  87. bp.wait.Broadcast()
  88. bp.mu.Unlock()
  89. return nil
  90. }
  91. // Close causes further reads from a BytesPipe to return immediately.
  92. func (bp *BytesPipe) Close() error {
  93. return bp.CloseWithError(nil)
  94. }
  95. func (bp *BytesPipe) len() int {
  96. return bp.bufLen - bp.lastRead
  97. }
  98. // Read reads bytes from BytesPipe.
  99. // Data could be read only once.
  100. func (bp *BytesPipe) Read(p []byte) (n int, err error) {
  101. bp.mu.Lock()
  102. defer bp.mu.Unlock()
  103. if bp.len() == 0 {
  104. if bp.closeErr != nil {
  105. return 0, bp.closeErr
  106. }
  107. bp.wait.Wait()
  108. if bp.len() == 0 && bp.closeErr != nil {
  109. return 0, bp.closeErr
  110. }
  111. }
  112. for {
  113. read := copy(p, bp.buf[0][bp.lastRead:])
  114. n += read
  115. bp.lastRead += read
  116. if bp.len() == 0 {
  117. // we have read everything. reset to the beginning.
  118. bp.lastRead = 0
  119. bp.bufLen -= len(bp.buf[0])
  120. bp.buf[0] = bp.buf[0][:0]
  121. break
  122. }
  123. // break if everything was read
  124. if len(p) == read {
  125. break
  126. }
  127. // more buffered data and more asked. read from next slice.
  128. p = p[read:]
  129. bp.lastRead = 0
  130. bp.bufLen -= len(bp.buf[0])
  131. bp.buf[0] = nil // throw away old slice
  132. bp.buf = bp.buf[1:] // switch to next
  133. }
  134. bp.wait.Broadcast()
  135. return
  136. }