readers.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package ioutils
  2. import (
  3. "crypto/sha256"
  4. "encoding/hex"
  5. "io"
  6. "golang.org/x/net/context"
  7. )
  8. type readCloserWrapper struct {
  9. io.Reader
  10. closer func() error
  11. }
  12. func (r *readCloserWrapper) Close() error {
  13. return r.closer()
  14. }
  15. // NewReadCloserWrapper returns a new io.ReadCloser.
  16. func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
  17. return &readCloserWrapper{
  18. Reader: r,
  19. closer: closer,
  20. }
  21. }
  22. type readerErrWrapper struct {
  23. reader io.Reader
  24. closer func()
  25. }
  26. func (r *readerErrWrapper) Read(p []byte) (int, error) {
  27. n, err := r.reader.Read(p)
  28. if err != nil {
  29. r.closer()
  30. }
  31. return n, err
  32. }
  33. // NewReaderErrWrapper returns a new io.Reader.
  34. func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
  35. return &readerErrWrapper{
  36. reader: r,
  37. closer: closer,
  38. }
  39. }
  40. // HashData returns the sha256 sum of src.
  41. func HashData(src io.Reader) (string, error) {
  42. h := sha256.New()
  43. if _, err := io.Copy(h, src); err != nil {
  44. return "", err
  45. }
  46. return "sha256:" + hex.EncodeToString(h.Sum(nil)), nil
  47. }
  48. // OnEOFReader wraps an io.ReadCloser and a function
  49. // the function will run at the end of file or close the file.
  50. type OnEOFReader struct {
  51. Rc io.ReadCloser
  52. Fn func()
  53. }
  54. func (r *OnEOFReader) Read(p []byte) (n int, err error) {
  55. n, err = r.Rc.Read(p)
  56. if err == io.EOF {
  57. r.runFunc()
  58. }
  59. return
  60. }
  61. // Close closes the file and run the function.
  62. func (r *OnEOFReader) Close() error {
  63. err := r.Rc.Close()
  64. r.runFunc()
  65. return err
  66. }
  67. func (r *OnEOFReader) runFunc() {
  68. if fn := r.Fn; fn != nil {
  69. fn()
  70. r.Fn = nil
  71. }
  72. }
  73. // cancelReadCloser wraps an io.ReadCloser with a context for cancelling read
  74. // operations.
  75. type cancelReadCloser struct {
  76. cancel func()
  77. pR *io.PipeReader // Stream to read from
  78. pW *io.PipeWriter
  79. }
  80. // NewCancelReadCloser creates a wrapper that closes the ReadCloser when the
  81. // context is cancelled. The returned io.ReadCloser must be closed when it is
  82. // no longer needed.
  83. func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser {
  84. pR, pW := io.Pipe()
  85. // Create a context used to signal when the pipe is closed
  86. doneCtx, cancel := context.WithCancel(context.Background())
  87. p := &cancelReadCloser{
  88. cancel: cancel,
  89. pR: pR,
  90. pW: pW,
  91. }
  92. go func() {
  93. _, err := io.Copy(pW, in)
  94. select {
  95. case <-ctx.Done():
  96. // If the context was closed, p.closeWithError
  97. // was already called. Calling it again would
  98. // change the error that Read returns.
  99. default:
  100. p.closeWithError(err)
  101. }
  102. in.Close()
  103. }()
  104. go func() {
  105. for {
  106. select {
  107. case <-ctx.Done():
  108. p.closeWithError(ctx.Err())
  109. case <-doneCtx.Done():
  110. return
  111. }
  112. }
  113. }()
  114. return p
  115. }
  116. // Read wraps the Read method of the pipe that provides data from the wrapped
  117. // ReadCloser.
  118. func (p *cancelReadCloser) Read(buf []byte) (n int, err error) {
  119. return p.pR.Read(buf)
  120. }
  121. // closeWithError closes the wrapper and its underlying reader. It will
  122. // cause future calls to Read to return err.
  123. func (p *cancelReadCloser) closeWithError(err error) {
  124. p.pW.CloseWithError(err)
  125. p.cancel()
  126. }
  127. // Close closes the wrapper its underlying reader. It will cause
  128. // future calls to Read to return io.EOF.
  129. func (p *cancelReadCloser) Close() error {
  130. p.closeWithError(io.EOF)
  131. return nil
  132. }