Przeglądaj źródła

Make the broadcaster write messages to the observers in the same units they were written to the broadcaster

This means the writing to a WriteFlusher will flush in the same places
as it would if the broadcaster wasn't sitting in front of it.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
Aaron Lehmann 10 lat temu
rodzic
commit
317a5462e4
1 zmienionych plików z 16 dodań i 10 usunięć
  1. 16 10
      pkg/progressreader/broadcaster.go

+ 16 - 10
pkg/progressreader/broadcaster.go

@@ -1,7 +1,6 @@
 package progressreader
 package progressreader
 
 
 import (
 import (
-	"bytes"
 	"errors"
 	"errors"
 	"io"
 	"io"
 	"sync"
 	"sync"
@@ -19,8 +18,10 @@ type Broadcaster struct {
 	// new data available.
 	// new data available.
 	cond *sync.Cond
 	cond *sync.Cond
 	// history is a buffer of the progress output so far, so a new observer
 	// history is a buffer of the progress output so far, so a new observer
-	// can catch up.
-	history bytes.Buffer
+	// can catch up. The history is stored as a slice of separate byte
+	// slices, so that if the writer is a WriteFlusher, the flushes will
+	// happen in the right places.
+	history [][]byte
 	// wg is a WaitGroup used to wait for all writes to finish on Close
 	// wg is a WaitGroup used to wait for all writes to finish on Close
 	wg sync.WaitGroup
 	wg sync.WaitGroup
 	// isClosed is set to true when Close is called to avoid closing c
 	// isClosed is set to true when Close is called to avoid closing c
@@ -58,19 +59,20 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) {
 	// The condition variable wait is at the end of this loop, so that the
 	// The condition variable wait is at the end of this loop, so that the
 	// first iteration will write the history so far.
 	// first iteration will write the history so far.
 	for {
 	for {
-		newData := broadcaster.history.Bytes()[n:]
+		newData := broadcaster.history[n:]
 		// Make a copy of newData so we can release the lock
 		// Make a copy of newData so we can release the lock
-		sendData := make([]byte, len(newData), len(newData))
+		sendData := make([][]byte, len(newData), len(newData))
 		copy(sendData, newData)
 		copy(sendData, newData)
 		broadcaster.Unlock()
 		broadcaster.Unlock()
 
 
-		if len(sendData) > 0 {
-			written, err := observer.Write(sendData)
+		for len(sendData) > 0 {
+			_, err := observer.Write(sendData[0])
 			if err != nil {
 			if err != nil {
 				broadcaster.wg.Done()
 				broadcaster.wg.Done()
 				return
 				return
 			}
 			}
-			n += written
+			n++
+			sendData = sendData[1:]
 		}
 		}
 
 
 		broadcaster.Lock()
 		broadcaster.Lock()
@@ -82,7 +84,7 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) {
 			return
 			return
 		}
 		}
 
 
-		if broadcaster.history.Len() == n {
+		if len(broadcaster.history) == n {
 			broadcaster.cond.Wait()
 			broadcaster.cond.Wait()
 		}
 		}
 
 
@@ -101,7 +103,11 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) {
 		return 0, errors.New("attempted write to closed progressreader Broadcaster")
 		return 0, errors.New("attempted write to closed progressreader Broadcaster")
 	}
 	}
 
 
-	broadcaster.history.Write(p)
+	// Add message in p to the history slice
+	newEntry := make([]byte, len(p), len(p))
+	copy(newEntry, p)
+	broadcaster.history = append(broadcaster.history, newEntry)
+
 	broadcaster.cond.Broadcast()
 	broadcaster.cond.Broadcast()
 
 
 	return len(p), nil
 	return len(p), nil