multireader.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package multireader
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. )
  8. type pos struct {
  9. idx int
  10. offset int64
  11. }
  12. type multiReadSeeker struct {
  13. readers []io.ReadSeeker
  14. pos *pos
  15. posIdx map[io.ReadSeeker]int
  16. }
  17. func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
  18. var tmpOffset int64
  19. switch whence {
  20. case os.SEEK_SET:
  21. for i, rdr := range r.readers {
  22. // get size of the current reader
  23. s, err := rdr.Seek(0, os.SEEK_END)
  24. if err != nil {
  25. return -1, err
  26. }
  27. if offset > tmpOffset+s {
  28. if i == len(r.readers)-1 {
  29. rdrOffset := s + (offset - tmpOffset)
  30. if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
  31. return -1, err
  32. }
  33. r.pos = &pos{i, rdrOffset}
  34. return offset, nil
  35. }
  36. tmpOffset += s
  37. continue
  38. }
  39. rdrOffset := offset - tmpOffset
  40. idx := i
  41. if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
  42. return -1, err
  43. }
  44. // make sure all following readers are at 0
  45. for _, rdr := range r.readers[i+1:] {
  46. rdr.Seek(0, os.SEEK_SET)
  47. }
  48. if rdrOffset == s && i != len(r.readers)-1 {
  49. idx++
  50. rdrOffset = 0
  51. }
  52. r.pos = &pos{idx, rdrOffset}
  53. return offset, nil
  54. }
  55. case os.SEEK_END:
  56. for _, rdr := range r.readers {
  57. s, err := rdr.Seek(0, os.SEEK_END)
  58. if err != nil {
  59. return -1, err
  60. }
  61. tmpOffset += s
  62. }
  63. if _, err := r.Seek(tmpOffset+offset, os.SEEK_SET); err != nil {
  64. return -1, err
  65. }
  66. return tmpOffset + offset, nil
  67. case os.SEEK_CUR:
  68. if r.pos == nil {
  69. return r.Seek(offset, os.SEEK_SET)
  70. }
  71. // Just return the current offset
  72. if offset == 0 {
  73. return r.getCurOffset()
  74. }
  75. curOffset, err := r.getCurOffset()
  76. if err != nil {
  77. return -1, err
  78. }
  79. rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
  80. if err != nil {
  81. return -1, err
  82. }
  83. r.pos = &pos{r.posIdx[rdr], rdrOffset}
  84. return curOffset + offset, nil
  85. default:
  86. return -1, fmt.Errorf("Invalid whence: %d", whence)
  87. }
  88. return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
  89. }
  90. func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
  91. var offsetTo int64
  92. for _, rdr := range r.readers {
  93. size, err := getReadSeekerSize(rdr)
  94. if err != nil {
  95. return nil, -1, err
  96. }
  97. if offsetTo+size > offset {
  98. return rdr, offset - offsetTo, nil
  99. }
  100. if rdr == r.readers[len(r.readers)-1] {
  101. return rdr, offsetTo + offset, nil
  102. }
  103. offsetTo += size
  104. }
  105. return nil, 0, nil
  106. }
  107. func (r *multiReadSeeker) getCurOffset() (int64, error) {
  108. var totalSize int64
  109. for _, rdr := range r.readers[:r.pos.idx+1] {
  110. if r.posIdx[rdr] == r.pos.idx {
  111. totalSize += r.pos.offset
  112. break
  113. }
  114. size, err := getReadSeekerSize(rdr)
  115. if err != nil {
  116. return -1, fmt.Errorf("error getting seeker size: %v", err)
  117. }
  118. totalSize += size
  119. }
  120. return totalSize, nil
  121. }
  122. func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
  123. var offset int64
  124. for _, r := range r.readers {
  125. if r == rdr {
  126. break
  127. }
  128. size, err := getReadSeekerSize(rdr)
  129. if err != nil {
  130. return -1, err
  131. }
  132. offset += size
  133. }
  134. return offset, nil
  135. }
  136. func (r *multiReadSeeker) Read(b []byte) (int, error) {
  137. if r.pos == nil {
  138. // make sure all readers are at 0
  139. r.Seek(0, os.SEEK_SET)
  140. }
  141. bLen := int64(len(b))
  142. buf := bytes.NewBuffer(nil)
  143. var rdr io.ReadSeeker
  144. for _, rdr = range r.readers[r.pos.idx:] {
  145. readBytes, err := io.CopyN(buf, rdr, bLen)
  146. if err != nil && err != io.EOF {
  147. return -1, err
  148. }
  149. bLen -= readBytes
  150. if bLen == 0 {
  151. break
  152. }
  153. }
  154. rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
  155. if err != nil {
  156. return -1, err
  157. }
  158. r.pos = &pos{r.posIdx[rdr], rdrPos}
  159. return buf.Read(b)
  160. }
  161. func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
  162. // save the current position
  163. pos, err := rdr.Seek(0, os.SEEK_CUR)
  164. if err != nil {
  165. return -1, err
  166. }
  167. // get the size
  168. size, err := rdr.Seek(0, os.SEEK_END)
  169. if err != nil {
  170. return -1, err
  171. }
  172. // reset the position
  173. if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
  174. return -1, err
  175. }
  176. return size, nil
  177. }
  178. // MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
  179. // input readseekers. After calling this method the initial position is set to the
  180. // beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
  181. // to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
  182. // Seek can be used over the sum of lengths of all readseekers.
  183. //
  184. // When a MultiReadSeeker is used, no Read and Seek operations should be made on
  185. // its ReadSeeker components. Also, users should make no assumption on the state
  186. // of individual readseekers while the MultiReadSeeker is used.
  187. func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
  188. if len(readers) == 1 {
  189. return readers[0]
  190. }
  191. idx := make(map[io.ReadSeeker]int)
  192. for i, rdr := range readers {
  193. idx[rdr] = i
  194. }
  195. return &multiReadSeeker{
  196. readers: readers,
  197. posIdx: idx,
  198. }
  199. }