diff --git a/internal/writeflusher/legacywriteflusher.go b/internal/writeflusher/legacywriteflusher.go new file mode 100644 index 0000000000..8f736c7ea2 --- /dev/null +++ b/internal/writeflusher/legacywriteflusher.go @@ -0,0 +1,102 @@ +package writeflusher + +import ( + "io" + "sync" +) + +// Deprecated: use the internal WriteFlusher instead. +// This is the old implementation that lived in ioutils. + +// This struct and all funcs below used to live in the pkg/ioutils package +// +// LegacyWriteFlusher wraps the Write and Flush operation ensuring that every write +// is a flush. In addition, the Close method can be called to intercept +// Read/Write calls if the targets lifecycle has already ended. +// +// Deprecated: use the internal writeflusher.WriteFlusher instead +type LegacyWriteFlusher struct { + w io.Writer + flusher flusher + flushed chan struct{} + flushedOnce sync.Once + closed chan struct{} + closeLock sync.Mutex +} + +// NewLegacyWriteFlusher returns a new LegacyWriteFlusher. +// +// Deprecated: use the internal writeflusher.NewWriteFlusher() instead +func NewLegacyWriteFlusher(w io.Writer) *LegacyWriteFlusher { + var fl flusher + if f, ok := w.(flusher); ok { + fl = f + } else { + fl = &NopFlusher{} + } + return &LegacyWriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} +} + +// Deprecated: use the internal writeflusher.Write() instead +func (wf *LegacyWriteFlusher) Write(b []byte) (n int, err error) { + select { + case <-wf.closed: + return 0, errWriteFlusherClosed + default: + } + + n, err = wf.w.Write(b) + wf.Flush() // every write is a flush. + return n, err +} + +// Flush the stream immediately. +// +// Deprecated: use the internal writeflusher.Flush() instead +func (wf *LegacyWriteFlusher) Flush() { + select { + case <-wf.closed: + return + default: + } + + wf.flushedOnce.Do(func() { + close(wf.flushed) + }) + wf.flusher.Flush() +} + +// Flushed returns the state of flushed. +// If it's flushed, return true, or else it return false. +// +// Deprecated: use the internal writeflusher.WriteFlusher instead +func (wf *LegacyWriteFlusher) Flushed() bool { + // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to + // be used to detect whether or a response code has been issued or not. + // Another hook should be used instead. + var flushed bool + select { + case <-wf.flushed: + flushed = true + default: + } + return flushed +} + +// Close closes the write flusher, disallowing any further writes to the +// target. After the flusher is closed, all calls to write or flush will +// result in an error. +// +// Deprecated: use the internal writeflusher.Close() instead +func (wf *LegacyWriteFlusher) Close() error { + wf.closeLock.Lock() + defer wf.closeLock.Unlock() + + select { + case <-wf.closed: + return errWriteFlusherClosed + default: + close(wf.closed) + } + return nil +} diff --git a/internal/writeflusher/writeflusher.go b/internal/writeflusher/writeflusher.go new file mode 100644 index 0000000000..9ec7ad9a49 --- /dev/null +++ b/internal/writeflusher/writeflusher.go @@ -0,0 +1,89 @@ +package writeflusher + +import ( + "io" + "net/http" + "sync" +) + +type flusher interface { + Flush() +} + +var errWriteFlusherClosed = io.EOF + +// NopFlusher represents a type which flush operation is nop. +type NopFlusher struct{} + +// Flush is a nop operation. +func (f *NopFlusher) Flush() {} + +// WriteFlusher wraps the Write and Flush operation ensuring that every write +// is a flush. In addition, the Close method can be called to intercept +// Read/Write calls if the targets lifecycle has already ended. +type WriteFlusher struct { + w io.Writer + flusher flusher + closed chan struct{} + closeLock sync.Mutex + firstFlush sync.Once +} + +// NewWriteFlusher returns a new WriteFlusher. +func NewWriteFlusher(w io.Writer) *WriteFlusher { + var fl flusher + if f, ok := w.(flusher); ok { + fl = f + } else { + fl = &NopFlusher{} + } + return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})} +} + +func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + select { + case <-wf.closed: + return 0, errWriteFlusherClosed + default: + } + + n, err = wf.w.Write(b) + wf.Flush() // every write is a flush. + return n, err +} + +// Flush the stream immediately. +func (wf *WriteFlusher) Flush() { + select { + case <-wf.closed: + return + default: + } + + // Here we call WriteHeader() if the io.Writer is an http.ResponseWriter to ensure that we don't try + // to send headers multiple times if the writer has already been wrapped by OTEL instrumentation + // (which doesn't wrap the Flush() func. See https://github.com/moby/moby/issues/47448) + wf.firstFlush.Do(func() { + if rw, ok := wf.w.(http.ResponseWriter); ok { + rw.WriteHeader(http.StatusOK) + } + }) + + wf.flusher.Flush() +} + +// Close closes the write flusher, disallowing any further writes to the +// target. After the flusher is closed, all calls to write or flush will +// result in an error. +func (wf *WriteFlusher) Close() error { + wf.closeLock.Lock() + defer wf.closeLock.Unlock() + + select { + case <-wf.closed: + return errWriteFlusherClosed + default: + close(wf.closed) + } + return nil +} diff --git a/pkg/ioutils/writeflusher.go b/pkg/ioutils/writeflusher.go index 91b8d18266..05e5a6e33f 100644 --- a/pkg/ioutils/writeflusher.go +++ b/pkg/ioutils/writeflusher.go @@ -2,91 +2,13 @@ package ioutils // import "github.com/docker/docker/pkg/ioutils" import ( "io" - "sync" + + "github.com/docker/docker/internal/writeflusher" ) -// WriteFlusher wraps the Write and Flush operation ensuring that every write -// is a flush. In addition, the Close method can be called to intercept -// Read/Write calls if the targets lifecycle has already ended. -type WriteFlusher struct { - w io.Writer - flusher flusher - flushed chan struct{} - flushedOnce sync.Once - closed chan struct{} - closeLock sync.Mutex -} - -type flusher interface { - Flush() -} - -var errWriteFlusherClosed = io.EOF - -func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - select { - case <-wf.closed: - return 0, errWriteFlusherClosed - default: - } - - n, err = wf.w.Write(b) - wf.Flush() // every write is a flush. - return n, err -} - -// Flush the stream immediately. -func (wf *WriteFlusher) Flush() { - select { - case <-wf.closed: - return - default: - } - - wf.flushedOnce.Do(func() { - close(wf.flushed) - }) - wf.flusher.Flush() -} - -// Flushed returns the state of flushed. -// If it's flushed, return true, or else it return false. -func (wf *WriteFlusher) Flushed() bool { - // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to - // be used to detect whether or a response code has been issued or not. - // Another hook should be used instead. - var flushed bool - select { - case <-wf.flushed: - flushed = true - default: - } - return flushed -} - -// Close closes the write flusher, disallowing any further writes to the -// target. After the flusher is closed, all calls to write or flush will -// result in an error. -func (wf *WriteFlusher) Close() error { - wf.closeLock.Lock() - defer wf.closeLock.Unlock() - - select { - case <-wf.closed: - return errWriteFlusherClosed - default: - close(wf.closed) - } - return nil -} - // NewWriteFlusher returns a new WriteFlusher. -func NewWriteFlusher(w io.Writer) *WriteFlusher { - var fl flusher - if f, ok := w.(flusher); ok { - fl = f - } else { - fl = &NopFlusher{} - } - return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} +// +// Deprecated: use the internal/writeflusher WriteFlusher instead. +func NewWriteFlusher(w io.Writer) *writeflusher.LegacyWriteFlusher { + return writeflusher.NewLegacyWriteFlusher(w) }