Browse Source

Created new WriteFlusher in /internal with otel related fixes (call WriteHeader() on first Flush())

Signed-off-by: Christopher Petito <47751006+krissetto@users.noreply.github.com>
Christopher Petito 1 year ago
parent
commit
d020e95563
1 changed files with 89 additions and 0 deletions
  1. 89 0
      internal/writeflusher/writeflusher.go

+ 89 - 0
internal/writeflusher/writeflusher.go

@@ -0,0 +1,89 @@
+package writeflusher
+
+import (
+	"io"
+	"net/http"
+	"sync"
+)
+
+type flusher interface {
+	Flush()
+}
+
+var errWriteFlusherClosed = io.EOF
+
+// NopFlusher represents a type which flush operation is nop.
+type NopFlusher struct{}
+
+// Flush is a nop operation.
+func (f *NopFlusher) Flush() {}
+
+// WriteFlusher wraps the Write and Flush operation ensuring that every write
+// is a flush. In addition, the Close method can be called to intercept
+// Read/Write calls if the targets lifecycle has already ended.
+type WriteFlusher struct {
+	w          io.Writer
+	flusher    flusher
+	closed     chan struct{}
+	closeLock  sync.Mutex
+	firstFlush sync.Once
+}
+
+// NewWriteFlusher returns a new WriteFlusher.
+func NewWriteFlusher(w io.Writer) *WriteFlusher {
+	var fl flusher
+	if f, ok := w.(flusher); ok {
+		fl = f
+	} else {
+		fl = &NopFlusher{}
+	}
+	return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
+}
+
+func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
+	select {
+	case <-wf.closed:
+		return 0, errWriteFlusherClosed
+	default:
+	}
+
+	n, err = wf.w.Write(b)
+	wf.Flush() // every write is a flush.
+	return n, err
+}
+
+// Flush the stream immediately.
+func (wf *WriteFlusher) Flush() {
+	select {
+	case <-wf.closed:
+		return
+	default:
+	}
+
+	// Here we call WriteHeader() if the io.Writer is an http.ResponseWriter to ensure that we don't try
+	// to send headers multiple times if the writer has already been wrapped by OTEL instrumentation
+	// (which doesn't wrap the Flush() func. See https://github.com/moby/moby/issues/47448)
+	wf.firstFlush.Do(func() {
+		if rw, ok := wf.w.(http.ResponseWriter); ok {
+			rw.WriteHeader(http.StatusOK)
+		}
+	})
+
+	wf.flusher.Flush()
+}
+
+// Close closes the write flusher, disallowing any further writes to the
+// target. After the flusher is closed, all calls to write or flush will
+// result in an error.
+func (wf *WriteFlusher) Close() error {
+	wf.closeLock.Lock()
+	defer wf.closeLock.Unlock()
+
+	select {
+	case <-wf.closed:
+		return errWriteFlusherClosed
+	default:
+		close(wf.closed)
+	}
+	return nil
+}