writeflusher.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package ioutils
  2. import (
  3. "errors"
  4. "io"
  5. "net/http"
  6. "sync"
  7. )
  8. // WriteFlusher wraps the Write and Flush operation ensuring that every write
  9. // is a flush. In addition, the Close method can be called to intercept
  10. // Read/Write calls if the targets lifecycle has already ended.
  11. type WriteFlusher struct {
  12. mu sync.Mutex
  13. w io.Writer
  14. flusher http.Flusher
  15. flushed bool
  16. closed error
  17. // TODO(stevvooe): Use channel for closed instead, remove mutex. Using a
  18. // channel will allow one to properly order the operations.
  19. }
  20. var errWriteFlusherClosed = errors.New("writeflusher: closed")
  21. func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
  22. wf.mu.Lock()
  23. defer wf.mu.Unlock()
  24. if wf.closed != nil {
  25. return 0, wf.closed
  26. }
  27. n, err = wf.w.Write(b)
  28. wf.flush() // every write is a flush.
  29. return n, err
  30. }
  31. // Flush the stream immediately.
  32. func (wf *WriteFlusher) Flush() {
  33. wf.mu.Lock()
  34. defer wf.mu.Unlock()
  35. wf.flush()
  36. }
  37. // flush the stream immediately without taking a lock. Used internally.
  38. func (wf *WriteFlusher) flush() {
  39. if wf.closed != nil {
  40. return
  41. }
  42. wf.flushed = true
  43. wf.flusher.Flush()
  44. }
  45. // Flushed returns the state of flushed.
  46. // If it's flushed, return true, or else it return false.
  47. func (wf *WriteFlusher) Flushed() bool {
  48. // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
  49. // be used to detect whether or a response code has been issued or not.
  50. // Another hook should be used instead.
  51. wf.mu.Lock()
  52. defer wf.mu.Unlock()
  53. return wf.flushed
  54. }
  55. // Close closes the write flusher, disallowing any further writes to the
  56. // target. After the flusher is closed, all calls to write or flush will
  57. // result in an error.
  58. func (wf *WriteFlusher) Close() error {
  59. wf.mu.Lock()
  60. defer wf.mu.Unlock()
  61. if wf.closed != nil {
  62. return wf.closed
  63. }
  64. wf.closed = errWriteFlusherClosed
  65. return nil
  66. }
  67. // NewWriteFlusher returns a new WriteFlusher.
  68. func NewWriteFlusher(w io.Writer) *WriteFlusher {
  69. var flusher http.Flusher
  70. if f, ok := w.(http.Flusher); ok {
  71. flusher = f
  72. } else {
  73. flusher = &NopFlusher{}
  74. }
  75. return &WriteFlusher{w: w, flusher: flusher}
  76. }