1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package writeflusher
- import (
- "io"
- "net/http"
- "sync"
- )
- type flusher interface {
- Flush()
- }
- var errWriteFlusherClosed = io.EOF
- // NopFlusher represents a type which flush operation is nop.
- type NopFlusher struct{}
- // Flush is a nop operation.
- func (f *NopFlusher) Flush() {}
- // WriteFlusher wraps the Write and Flush operation ensuring that every write
- // is a flush. In addition, the Close method can be called to intercept
- // Read/Write calls if the targets lifecycle has already ended.
- type WriteFlusher struct {
- w io.Writer
- flusher flusher
- closed chan struct{}
- closeLock sync.Mutex
- firstFlush sync.Once
- }
- // NewWriteFlusher returns a new WriteFlusher.
- func NewWriteFlusher(w io.Writer) *WriteFlusher {
- var fl flusher
- if f, ok := w.(flusher); ok {
- fl = f
- } else {
- fl = &NopFlusher{}
- }
- return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
- }
- func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
- select {
- case <-wf.closed:
- return 0, errWriteFlusherClosed
- default:
- }
- n, err = wf.w.Write(b)
- wf.Flush() // every write is a flush.
- return n, err
- }
- // Flush the stream immediately.
- func (wf *WriteFlusher) Flush() {
- select {
- case <-wf.closed:
- return
- default:
- }
- // Here we call WriteHeader() if the io.Writer is an http.ResponseWriter to ensure that we don't try
- // to send headers multiple times if the writer has already been wrapped by OTEL instrumentation
- // (which doesn't wrap the Flush() func. See https://github.com/moby/moby/issues/47448)
- wf.firstFlush.Do(func() {
- if rw, ok := wf.w.(http.ResponseWriter); ok {
- rw.WriteHeader(http.StatusOK)
- }
- })
- wf.flusher.Flush()
- }
- // Close closes the write flusher, disallowing any further writes to the
- // target. After the flusher is closed, all calls to write or flush will
- // result in an error.
- func (wf *WriteFlusher) Close() error {
- wf.closeLock.Lock()
- defer wf.closeLock.Unlock()
- select {
- case <-wf.closed:
- return errWriteFlusherClosed
- default:
- close(wf.closed)
- }
- return nil
- }
|