readers.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package ioutils // import "github.com/docker/docker/pkg/ioutils"
  2. import (
  3. "context"
  4. "io"
  5. // make sure crypto.SHA256, crypto.sha512 and crypto.SHA384 are registered
  6. // TODO remove once https://github.com/opencontainers/go-digest/pull/64 is merged.
  7. _ "crypto/sha256"
  8. _ "crypto/sha512"
  9. )
  10. // ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
  11. // It calls the given callback function when closed. It should be constructed
  12. // with NewReadCloserWrapper
  13. type ReadCloserWrapper struct {
  14. io.Reader
  15. closer func() error
  16. }
  17. // Close calls back the passed closer function
  18. func (r *ReadCloserWrapper) Close() error {
  19. return r.closer()
  20. }
  21. // NewReadCloserWrapper returns a new io.ReadCloser.
  22. func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
  23. return &ReadCloserWrapper{
  24. Reader: r,
  25. closer: closer,
  26. }
  27. }
  28. type readerErrWrapper struct {
  29. reader io.Reader
  30. closer func()
  31. }
  32. func (r *readerErrWrapper) Read(p []byte) (int, error) {
  33. n, err := r.reader.Read(p)
  34. if err != nil {
  35. r.closer()
  36. }
  37. return n, err
  38. }
  39. // NewReaderErrWrapper returns a new io.Reader.
  40. func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
  41. return &readerErrWrapper{
  42. reader: r,
  43. closer: closer,
  44. }
  45. }
  46. // OnEOFReader wraps an io.ReadCloser and a function
  47. // the function will run at the end of file or close the file.
  48. type OnEOFReader struct {
  49. Rc io.ReadCloser
  50. Fn func()
  51. }
  52. func (r *OnEOFReader) Read(p []byte) (n int, err error) {
  53. n, err = r.Rc.Read(p)
  54. if err == io.EOF {
  55. r.runFunc()
  56. }
  57. return
  58. }
  59. // Close closes the file and run the function.
  60. func (r *OnEOFReader) Close() error {
  61. err := r.Rc.Close()
  62. r.runFunc()
  63. return err
  64. }
  65. func (r *OnEOFReader) runFunc() {
  66. if fn := r.Fn; fn != nil {
  67. fn()
  68. r.Fn = nil
  69. }
  70. }
  71. // cancelReadCloser wraps an io.ReadCloser with a context for cancelling read
  72. // operations.
  73. type cancelReadCloser struct {
  74. cancel func()
  75. pR *io.PipeReader // Stream to read from
  76. pW *io.PipeWriter
  77. }
  78. // NewCancelReadCloser creates a wrapper that closes the ReadCloser when the
  79. // context is cancelled. The returned io.ReadCloser must be closed when it is
  80. // no longer needed.
  81. func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser {
  82. pR, pW := io.Pipe()
  83. // Create a context used to signal when the pipe is closed
  84. doneCtx, cancel := context.WithCancel(context.Background())
  85. p := &cancelReadCloser{
  86. cancel: cancel,
  87. pR: pR,
  88. pW: pW,
  89. }
  90. go func() {
  91. _, err := io.Copy(pW, in)
  92. select {
  93. case <-ctx.Done():
  94. // If the context was closed, p.closeWithError
  95. // was already called. Calling it again would
  96. // change the error that Read returns.
  97. default:
  98. p.closeWithError(err)
  99. }
  100. in.Close()
  101. }()
  102. go func() {
  103. for {
  104. select {
  105. case <-ctx.Done():
  106. p.closeWithError(ctx.Err())
  107. case <-doneCtx.Done():
  108. return
  109. }
  110. }
  111. }()
  112. return p
  113. }
  114. // Read wraps the Read method of the pipe that provides data from the wrapped
  115. // ReadCloser.
  116. func (p *cancelReadCloser) Read(buf []byte) (n int, err error) {
  117. return p.pR.Read(buf)
  118. }
  119. // closeWithError closes the wrapper and its underlying reader. It will
  120. // cause future calls to Read to return err.
  121. func (p *cancelReadCloser) closeWithError(err error) {
  122. p.pW.CloseWithError(err)
  123. p.cancel()
  124. }
  125. // Close closes the wrapper its underlying reader. It will cause
  126. // future calls to Read to return io.EOF.
  127. func (p *cancelReadCloser) Close() error {
  128. p.closeWithError(io.EOF)
  129. return nil
  130. }