writeflusher.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package writeflusher
  2. import (
  3. "io"
  4. "net/http"
  5. "sync"
  6. )
  7. type flusher interface {
  8. Flush()
  9. }
  10. var errWriteFlusherClosed = io.EOF
  11. // NopFlusher represents a type which flush operation is nop.
  12. type NopFlusher struct{}
  13. // Flush is a nop operation.
  14. func (f *NopFlusher) Flush() {}
  15. // WriteFlusher wraps the Write and Flush operation ensuring that every write
  16. // is a flush. In addition, the Close method can be called to intercept
  17. // Read/Write calls if the targets lifecycle has already ended.
  18. type WriteFlusher struct {
  19. w io.Writer
  20. flusher flusher
  21. closed chan struct{}
  22. closeLock sync.Mutex
  23. firstFlush sync.Once
  24. }
  25. // NewWriteFlusher returns a new WriteFlusher.
  26. func NewWriteFlusher(w io.Writer) *WriteFlusher {
  27. var fl flusher
  28. if f, ok := w.(flusher); ok {
  29. fl = f
  30. } else {
  31. fl = &NopFlusher{}
  32. }
  33. return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
  34. }
  35. func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
  36. select {
  37. case <-wf.closed:
  38. return 0, errWriteFlusherClosed
  39. default:
  40. }
  41. n, err = wf.w.Write(b)
  42. wf.Flush() // every write is a flush.
  43. return n, err
  44. }
  45. // Flush the stream immediately.
  46. func (wf *WriteFlusher) Flush() {
  47. select {
  48. case <-wf.closed:
  49. return
  50. default:
  51. }
  52. // Here we call WriteHeader() if the io.Writer is an http.ResponseWriter to ensure that we don't try
  53. // to send headers multiple times if the writer has already been wrapped by OTEL instrumentation
  54. // (which doesn't wrap the Flush() func. See https://github.com/moby/moby/issues/47448)
  55. wf.firstFlush.Do(func() {
  56. if rw, ok := wf.w.(http.ResponseWriter); ok {
  57. rw.WriteHeader(http.StatusOK)
  58. }
  59. })
  60. wf.flusher.Flush()
  61. }
  62. // Close closes the write flusher, disallowing any further writes to the
  63. // target. After the flusher is closed, all calls to write or flush will
  64. // result in an error.
  65. func (wf *WriteFlusher) Close() error {
  66. wf.closeLock.Lock()
  67. defer wf.closeLock.Unlock()
  68. select {
  69. case <-wf.closed:
  70. return errWriteFlusherClosed
  71. default:
  72. close(wf.closed)
  73. }
  74. return nil
  75. }