writeflusher.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package ioutils
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. // WriteFlusher wraps the Write and Flush operation ensuring that every write
  7. // is a flush. In addition, the Close method can be called to intercept
  8. // Read/Write calls if the targets lifecycle has already ended.
  9. type WriteFlusher struct {
  10. w io.Writer
  11. flusher flusher
  12. flushed chan struct{}
  13. flushedOnce sync.Once
  14. closed chan struct{}
  15. closeLock sync.Mutex
  16. }
  17. type flusher interface {
  18. Flush()
  19. }
  20. var errWriteFlusherClosed = io.EOF
  21. func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
  22. select {
  23. case <-wf.closed:
  24. return 0, errWriteFlusherClosed
  25. default:
  26. }
  27. n, err = wf.w.Write(b)
  28. wf.Flush() // every write is a flush.
  29. return n, err
  30. }
  31. // Flush the stream immediately.
  32. func (wf *WriteFlusher) Flush() {
  33. select {
  34. case <-wf.closed:
  35. return
  36. default:
  37. }
  38. wf.flushedOnce.Do(func() {
  39. close(wf.flushed)
  40. })
  41. wf.flusher.Flush()
  42. }
  43. // Flushed returns the state of flushed.
  44. // If it's flushed, return true, or else it return false.
  45. func (wf *WriteFlusher) Flushed() bool {
  46. // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
  47. // be used to detect whether or a response code has been issued or not.
  48. // Another hook should be used instead.
  49. var flushed bool
  50. select {
  51. case <-wf.flushed:
  52. flushed = true
  53. default:
  54. }
  55. return flushed
  56. }
  57. // Close closes the write flusher, disallowing any further writes to the
  58. // target. After the flusher is closed, all calls to write or flush will
  59. // result in an error.
  60. func (wf *WriteFlusher) Close() error {
  61. wf.closeLock.Lock()
  62. defer wf.closeLock.Unlock()
  63. select {
  64. case <-wf.closed:
  65. return errWriteFlusherClosed
  66. default:
  67. close(wf.closed)
  68. }
  69. return nil
  70. }
  71. // NewWriteFlusher returns a new WriteFlusher.
  72. func NewWriteFlusher(w io.Writer) *WriteFlusher {
  73. var fl flusher
  74. if f, ok := w.(flusher); ok {
  75. fl = f
  76. } else {
  77. fl = &NopFlusher{}
  78. }
  79. return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
  80. }