Browse Source

Refacator pkg/streamformatter

StreamFormatter suffered was two distinct structs mixed into a single struct
without any overlap.

Signed-off-by: Daniel Nephin <dnephin@docker.com>
Daniel Nephin 8 years ago
parent
commit
c87d67b0ad

+ 12 - 19
api/server/router/build/build_routes.go

@@ -138,7 +138,6 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
 
 	output := ioutils.NewWriteFlusher(w)
 	defer output.Close()
-	sf := streamformatter.NewJSONStreamFormatter()
 	errf := func(err error) error {
 		if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 {
 			output.Write(notVerboseBuffer.Bytes())
@@ -148,7 +147,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
 		if !output.Flushed() {
 			return err
 		}
-		_, err = w.Write(sf.FormatError(err))
+		_, err = w.Write(streamformatter.FormatError(err))
 		if err != nil {
 			logrus.Warnf("could not write error response: %v", err)
 		}
@@ -166,25 +165,22 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
 			errors.New("squash is only supported with experimental mode"))
 	}
 
+	out := io.Writer(output)
+	if buildOptions.SuppressOutput {
+		out = notVerboseBuffer
+	}
+
 	// Currently, only used if context is from a remote url.
 	// Look at code in DetectContextFromRemoteURL for more information.
 	createProgressReader := func(in io.ReadCloser) io.ReadCloser {
-		progressOutput := sf.NewProgressOutput(output, true)
-		if buildOptions.SuppressOutput {
-			progressOutput = sf.NewProgressOutput(notVerboseBuffer, true)
-		}
+		progressOutput := streamformatter.NewJSONProgressOutput(out, true)
 		return progress.NewProgressReader(in, progressOutput, r.ContentLength, "Downloading context", buildOptions.RemoteContext)
 	}
 
-	out := io.Writer(output)
-	if buildOptions.SuppressOutput {
-		out = notVerboseBuffer
-	}
-
 	imgID, err := br.backend.Build(ctx, backend.BuildConfig{
 		Source:         r.Body,
 		Options:        buildOptions,
-		ProgressWriter: buildProgressWriter(out, sf, createProgressReader),
+		ProgressWriter: buildProgressWriter(out, createProgressReader),
 	})
 	if err != nil {
 		return errf(err)
@@ -193,8 +189,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
 	// Everything worked so if -q was provided the output from the daemon
 	// should be just the image ID and we'll print that to stdout.
 	if buildOptions.SuppressOutput {
-		stdout := &streamformatter.StdoutFormatter{Writer: output, StreamFormatter: sf}
-		fmt.Fprintln(stdout, imgID)
+		fmt.Fprintln(streamformatter.NewStdoutWriter(output), imgID)
 	}
 	return nil
 }
@@ -226,15 +221,13 @@ func (s *syncWriter) Write(b []byte) (count int, err error) {
 	return
 }
 
-func buildProgressWriter(out io.Writer, sf *streamformatter.StreamFormatter, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter {
+func buildProgressWriter(out io.Writer, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter {
 	out = &syncWriter{w: out}
-	stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf}
-	stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf}
 
 	return backend.ProgressWriter{
 		Output:             out,
-		StdoutFormatter:    stdout,
-		StderrFormatter:    stderr,
+		StdoutFormatter:    streamformatter.NewStdoutWriter(out),
+		StderrFormatter:    streamformatter.NewStderrWriter(out),
 		ProgressReaderFunc: createProgressReader,
 	}
 }

+ 4 - 7
api/server/router/image/image_routes.go

@@ -118,8 +118,7 @@ func (s *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrite
 		if !output.Flushed() {
 			return err
 		}
-		sf := streamformatter.NewJSONStreamFormatter()
-		output.Write(sf.FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 
 	return nil
@@ -164,8 +163,7 @@ func (s *imageRouter) postImagesPush(ctx context.Context, w http.ResponseWriter,
 		if !output.Flushed() {
 			return err
 		}
-		sf := streamformatter.NewJSONStreamFormatter()
-		output.Write(sf.FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 	return nil
 }
@@ -190,8 +188,7 @@ func (s *imageRouter) getImagesGet(ctx context.Context, w http.ResponseWriter, r
 		if !output.Flushed() {
 			return err
 		}
-		sf := streamformatter.NewJSONStreamFormatter()
-		output.Write(sf.FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 	return nil
 }
@@ -207,7 +204,7 @@ func (s *imageRouter) postImagesLoad(ctx context.Context, w http.ResponseWriter,
 	output := ioutils.NewWriteFlusher(w)
 	defer output.Close()
 	if err := s.backend.LoadImage(r.Body, output, quiet); err != nil {
-		output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 	return nil
 }

+ 3 - 3
api/server/router/plugin/plugin_routes.go

@@ -121,7 +121,7 @@ func (pr *pluginRouter) upgradePlugin(ctx context.Context, w http.ResponseWriter
 		if !output.Flushed() {
 			return err
 		}
-		output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 
 	return nil
@@ -160,7 +160,7 @@ func (pr *pluginRouter) pullPlugin(ctx context.Context, w http.ResponseWriter, r
 		if !output.Flushed() {
 			return err
 		}
-		output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 
 	return nil
@@ -268,7 +268,7 @@ func (pr *pluginRouter) pushPlugin(ctx context.Context, w http.ResponseWriter, r
 		if !output.Flushed() {
 			return err
 		}
-		output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err))
+		output.Write(streamformatter.FormatError(err))
 	}
 	return nil
 }

+ 2 - 3
api/types/backend/build.go

@@ -4,14 +4,13 @@ import (
 	"io"
 
 	"github.com/docker/docker/api/types"
-	"github.com/docker/docker/pkg/streamformatter"
 )
 
 // ProgressWriter is a data object to transport progress streams to the client
 type ProgressWriter struct {
 	Output             io.Writer
-	StdoutFormatter    *streamformatter.StdoutFormatter
-	StderrFormatter    *streamformatter.StderrFormatter
+	StdoutFormatter    io.Writer
+	StderrFormatter    io.Writer
 	ProgressReaderFunc func(io.ReadCloser) io.ReadCloser
 }
 

+ 1 - 2
builder/dockerfile/internals.go

@@ -275,8 +275,7 @@ func (b *Builder) download(srcURL string) (remote builder.Source, p string, err
 		return
 	}
 
-	stdoutFormatter := b.Stdout.(*streamformatter.StdoutFormatter)
-	progressOutput := stdoutFormatter.StreamFormatter.NewProgressOutput(stdoutFormatter.Writer, true)
+	progressOutput := streamformatter.NewJSONProgressOutput(b.Output, true)
 	progressReader := progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Downloading")
 	// Download and dump result to tmp file
 	// TODO: add filehash directly

+ 1 - 1
cli/command/image/build.go

@@ -269,7 +269,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error {
 	}
 
 	// Setup an upload progress bar
-	progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(progBuff, true)
+	progressOutput := streamformatter.NewProgressOutput(progBuff)
 	if !dockerCli.Out().IsTerminal() {
 		progressOutput = &lastProgressOutput{output: progressOutput}
 	}

+ 1 - 1
cli/command/image/build/context.go

@@ -154,7 +154,7 @@ func GetContextFromURL(out io.Writer, remoteURL, dockerfileName string) (io.Read
 	if err != nil {
 		return nil, "", errors.Errorf("unable to download remote context %s: %v", remoteURL, err)
 	}
-	progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(out, true)
+	progressOutput := streamformatter.NewProgressOutput(out)
 
 	// Pass the response body through a progress reader.
 	progReader := progress.NewProgressReader(response.Body, progressOutput, response.ContentLength, "", fmt.Sprintf("Downloading build context from remote url: %s", remoteURL))

+ 1 - 1
cli/command/service/progress/progress.go

@@ -62,7 +62,7 @@ func stateToProgress(state swarm.TaskState, rollback bool) int64 {
 func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
 	defer progressWriter.Close()
 
-	progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false)
+	progressOut := streamformatter.NewJSONProgressOutput(progressWriter, false)
 
 	sigint := make(chan os.Signal, 1)
 	signal.Notify(sigint, os.Interrupt)

+ 3 - 4
daemon/import.go

@@ -28,7 +28,6 @@ import (
 // the repo and tag arguments, respectively.
 func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error {
 	var (
-		sf     = streamformatter.NewJSONStreamFormatter()
 		rc     io.ReadCloser
 		resp   *http.Response
 		newRef reference.Named
@@ -72,8 +71,8 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string
 		if err != nil {
 			return err
 		}
-		outStream.Write(sf.FormatStatus("", "Downloading from %s", u))
-		progressOutput := sf.NewProgressOutput(outStream, true)
+		outStream.Write(streamformatter.FormatStatus("", "Downloading from %s", u))
+		progressOutput := streamformatter.NewJSONProgressOutput(outStream, true)
 		rc = progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Importing")
 	}
 
@@ -129,6 +128,6 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string
 	}
 
 	daemon.LogImageEvent(id.String(), id.String(), "import")
-	outStream.Write(sf.FormatStatus("", id.String()))
+	outStream.Write(streamformatter.FormatStatus("", id.String()))
 	return nil
 }

+ 1 - 1
distribution/utils/progress.go

@@ -14,7 +14,7 @@ import (
 // WriteDistributionProgress is a helper for writing progress from chan to JSON
 // stream with an optional cancel function.
 func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) {
-	progressOutput := streamformatter.NewJSONStreamFormatter().NewProgressOutput(outStream, false)
+	progressOutput := streamformatter.NewJSONProgressOutput(outStream, false)
 	operationCancelled := false
 
 	for prog := range progressChan {

+ 3 - 6
image/tarexport/load.go

@@ -26,14 +26,11 @@ import (
 )
 
 func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error {
-	var (
-		sf             = streamformatter.NewJSONStreamFormatter()
-		progressOutput progress.Output
-	)
+	var progressOutput progress.Output
 	if !quiet {
-		progressOutput = sf.NewProgressOutput(outStream, false)
+		progressOutput = streamformatter.NewJSONProgressOutput(outStream, false)
 	}
-	outStream = &streamformatter.StdoutFormatter{Writer: outStream, StreamFormatter: streamformatter.NewJSONStreamFormatter()}
+	outStream = streamformatter.NewStdoutWriter(outStream)
 
 	tmpDir, err := ioutil.TempDir("", "docker-import-")
 	if err != nil {

+ 68 - 106
pkg/streamformatter/streamformatter.go

@@ -10,91 +10,76 @@ import (
 	"github.com/docker/docker/pkg/progress"
 )
 
-// StreamFormatter formats a stream, optionally using JSON.
-type StreamFormatter struct {
-	json bool
-}
-
-// NewStreamFormatter returns a simple StreamFormatter
-func NewStreamFormatter() *StreamFormatter {
-	return &StreamFormatter{}
-}
-
-// NewJSONStreamFormatter returns a StreamFormatter configured to stream json
-func NewJSONStreamFormatter() *StreamFormatter {
-	return &StreamFormatter{true}
-}
-
 const streamNewline = "\r\n"
 
-var streamNewlineBytes = []byte(streamNewline)
+type jsonProgressFormatter struct{}
 
-// FormatStream formats the specified stream.
-func (sf *StreamFormatter) FormatStream(str string) []byte {
-	if sf.json {
-		b, err := json.Marshal(&jsonmessage.JSONMessage{Stream: str})
-		if err != nil {
-			return sf.FormatError(err)
-		}
-		return append(b, streamNewlineBytes...)
-	}
-	return []byte(str + "\r")
+func appendNewline(source []byte) []byte {
+	return append(source, []byte(streamNewline)...)
 }
 
 // FormatStatus formats the specified objects according to the specified format (and id).
-func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
+func FormatStatus(id, format string, a ...interface{}) []byte {
 	str := fmt.Sprintf(format, a...)
-	if sf.json {
-		b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str})
-		if err != nil {
-			return sf.FormatError(err)
-		}
-		return append(b, streamNewlineBytes...)
+	b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str})
+	if err != nil {
+		return FormatError(err)
 	}
-	return []byte(str + streamNewline)
+	return appendNewline(b)
 }
 
-// FormatError formats the specified error.
-func (sf *StreamFormatter) FormatError(err error) []byte {
-	if sf.json {
-		jsonError, ok := err.(*jsonmessage.JSONError)
-		if !ok {
-			jsonError = &jsonmessage.JSONError{Message: err.Error()}
-		}
-		if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
-			return append(b, streamNewlineBytes...)
-		}
-		return []byte("{\"error\":\"format error\"}" + streamNewline)
+// FormatError formats the error as a JSON object
+func FormatError(err error) []byte {
+	jsonError, ok := err.(*jsonmessage.JSONError)
+	if !ok {
+		jsonError = &jsonmessage.JSONError{Message: err.Error()}
 	}
-	return []byte("Error: " + err.Error() + streamNewline)
+	if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
+		return appendNewline(b)
+	}
+	return []byte(`{"error":"format error"}` + streamNewline)
+}
+
+func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
+	return FormatStatus(id, format, a...)
 }
 
-// FormatProgress formats the progress information for a specified action.
-func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
+// formatProgress formats the progress information for a specified action.
+func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
 	if progress == nil {
 		progress = &jsonmessage.JSONProgress{}
 	}
-	if sf.json {
-		var auxJSON *json.RawMessage
-		if aux != nil {
-			auxJSONBytes, err := json.Marshal(aux)
-			if err != nil {
-				return nil
-			}
-			auxJSON = new(json.RawMessage)
-			*auxJSON = auxJSONBytes
-		}
-		b, err := json.Marshal(&jsonmessage.JSONMessage{
-			Status:          action,
-			ProgressMessage: progress.String(),
-			Progress:        progress,
-			ID:              id,
-			Aux:             auxJSON,
-		})
+	var auxJSON *json.RawMessage
+	if aux != nil {
+		auxJSONBytes, err := json.Marshal(aux)
 		if err != nil {
 			return nil
 		}
-		return append(b, streamNewlineBytes...)
+		auxJSON = new(json.RawMessage)
+		*auxJSON = auxJSONBytes
+	}
+	b, err := json.Marshal(&jsonmessage.JSONMessage{
+		Status:          action,
+		ProgressMessage: progress.String(),
+		Progress:        progress,
+		ID:              id,
+		Aux:             auxJSON,
+	})
+	if err != nil {
+		return nil
+	}
+	return appendNewline(b)
+}
+
+type rawProgressFormatter struct{}
+
+func (sf *rawProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
+	return []byte(fmt.Sprintf(format, a...) + streamNewline)
+}
+
+func (sf *rawProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
+	if progress == nil {
+		progress = &jsonmessage.JSONProgress{}
 	}
 	endl := "\r"
 	if progress.String() == "" {
@@ -105,16 +90,23 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa
 
 // NewProgressOutput returns a progress.Output object that can be passed to
 // progress.NewProgressReader.
-func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output {
-	return &progressOutput{
-		sf:       sf,
-		out:      out,
-		newLines: newLines,
-	}
+func NewProgressOutput(out io.Writer) progress.Output {
+	return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true}
+}
+
+// NewJSONProgressOutput returns a progress.Output that that formats output
+// using JSON objects
+func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
+	return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
+}
+
+type formatProgress interface {
+	formatStatus(id, format string, a ...interface{}) []byte
+	formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte
 }
 
 type progressOutput struct {
-	sf       *StreamFormatter
+	sf       formatProgress
 	out      io.Writer
 	newLines bool
 }
@@ -123,10 +115,10 @@ type progressOutput struct {
 func (out *progressOutput) WriteProgress(prog progress.Progress) error {
 	var formatted []byte
 	if prog.Message != "" {
-		formatted = out.sf.FormatStatus(prog.ID, prog.Message)
+		formatted = out.sf.formatStatus(prog.ID, prog.Message)
 	} else {
 		jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total, HideCounts: prog.HideCounts}
-		formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
+		formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
 	}
 	_, err := out.out.Write(formatted)
 	if err != nil {
@@ -134,39 +126,9 @@ func (out *progressOutput) WriteProgress(prog progress.Progress) error {
 	}
 
 	if out.newLines && prog.LastUpdate {
-		_, err = out.out.Write(out.sf.FormatStatus("", ""))
+		_, err = out.out.Write(out.sf.formatStatus("", ""))
 		return err
 	}
 
 	return nil
 }
-
-// StdoutFormatter is a streamFormatter that writes to the standard output.
-type StdoutFormatter struct {
-	io.Writer
-	*StreamFormatter
-}
-
-func (sf *StdoutFormatter) Write(buf []byte) (int, error) {
-	formattedBuf := sf.StreamFormatter.FormatStream(string(buf))
-	n, err := sf.Writer.Write(formattedBuf)
-	if n != len(formattedBuf) {
-		return n, io.ErrShortWrite
-	}
-	return len(buf), err
-}
-
-// StderrFormatter is a streamFormatter that writes to the standard error.
-type StderrFormatter struct {
-	io.Writer
-	*StreamFormatter
-}
-
-func (sf *StderrFormatter) Write(buf []byte) (int, error) {
-	formattedBuf := sf.StreamFormatter.FormatStream("\033[91m" + string(buf) + "\033[0m")
-	n, err := sf.Writer.Write(formattedBuf)
-	if n != len(formattedBuf) {
-		return n, io.ErrShortWrite
-	}
-	return len(buf), err
-}

+ 36 - 61
pkg/streamformatter/streamformatter_test.go

@@ -3,88 +3,65 @@ package streamformatter
 import (
 	"encoding/json"
 	"errors"
-	"reflect"
 	"strings"
 	"testing"
 
 	"github.com/docker/docker/pkg/jsonmessage"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
 
-func TestFormatStream(t *testing.T) {
-	sf := NewStreamFormatter()
-	res := sf.FormatStream("stream")
-	if string(res) != "stream"+"\r" {
-		t.Fatalf("%q", res)
-	}
+func TestRawProgressFormatterFormatStatus(t *testing.T) {
+	sf := rawProgressFormatter{}
+	res := sf.formatStatus("ID", "%s%d", "a", 1)
+	assert.Equal(t, "a1\r\n", string(res))
 }
 
-func TestFormatJSONStatus(t *testing.T) {
-	sf := NewStreamFormatter()
-	res := sf.FormatStatus("ID", "%s%d", "a", 1)
-	if string(res) != "a1\r\n" {
-		t.Fatalf("%q", res)
-	}
-}
-
-func TestFormatSimpleError(t *testing.T) {
-	sf := NewStreamFormatter()
-	res := sf.FormatError(errors.New("Error for formatter"))
-	if string(res) != "Error: Error for formatter\r\n" {
-		t.Fatalf("%q", res)
-	}
-}
-
-func TestJSONFormatStream(t *testing.T) {
-	sf := NewJSONStreamFormatter()
-	res := sf.FormatStream("stream")
-	if string(res) != `{"stream":"stream"}`+"\r\n" {
-		t.Fatalf("%q", res)
+func TestRawProgressFormatterFormatProgress(t *testing.T) {
+	sf := rawProgressFormatter{}
+	progress := &jsonmessage.JSONProgress{
+		Current: 15,
+		Total:   30,
+		Start:   1,
 	}
+	res := sf.formatProgress("id", "action", progress, nil)
+	out := string(res)
+	assert.True(t, strings.HasPrefix(out, "action [===="))
+	assert.Contains(t, out, "15B/30B")
+	assert.True(t, strings.HasSuffix(out, "\r"))
 }
 
-func TestJSONFormatStatus(t *testing.T) {
-	sf := NewJSONStreamFormatter()
-	res := sf.FormatStatus("ID", "%s%d", "a", 1)
-	if string(res) != `{"status":"a1","id":"ID"}`+"\r\n" {
-		t.Fatalf("%q", res)
-	}
+func TestFormatStatus(t *testing.T) {
+	res := FormatStatus("ID", "%s%d", "a", 1)
+	expected := `{"status":"a1","id":"ID"}` + streamNewline
+	assert.Equal(t, expected, string(res))
 }
 
-func TestJSONFormatSimpleError(t *testing.T) {
-	sf := NewJSONStreamFormatter()
-	res := sf.FormatError(errors.New("Error for formatter"))
-	if string(res) != `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}`+"\r\n" {
-		t.Fatalf("%q", res)
-	}
+func TestFormatError(t *testing.T) {
+	res := FormatError(errors.New("Error for formatter"))
+	expected := `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}` + "\r\n"
+	assert.Equal(t, expected, string(res))
 }
 
-func TestJSONFormatJSONError(t *testing.T) {
-	sf := NewJSONStreamFormatter()
+func TestFormatJSONError(t *testing.T) {
 	err := &jsonmessage.JSONError{Code: 50, Message: "Json error"}
-	res := sf.FormatError(err)
-	if string(res) != `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}`+"\r\n" {
-		t.Fatalf("%q", res)
-	}
+	res := FormatError(err)
+	expected := `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}` + streamNewline
+	assert.Equal(t, expected, string(res))
 }
 
-func TestJSONFormatProgress(t *testing.T) {
-	sf := NewJSONStreamFormatter()
+func TestJsonProgressFormatterFormatProgress(t *testing.T) {
+	sf := &jsonProgressFormatter{}
 	progress := &jsonmessage.JSONProgress{
 		Current: 15,
 		Total:   30,
 		Start:   1,
 	}
-	res := sf.FormatProgress("id", "action", progress, nil)
+	res := sf.formatProgress("id", "action", progress, nil)
 	msg := &jsonmessage.JSONMessage{}
-	if err := json.Unmarshal(res, msg); err != nil {
-		t.Fatal(err)
-	}
-	if msg.ID != "id" {
-		t.Fatalf("ID must be 'id', got: %s", msg.ID)
-	}
-	if msg.Status != "action" {
-		t.Fatalf("Status must be 'action', got: %s", msg.Status)
-	}
+	require.NoError(t, json.Unmarshal(res, msg))
+	assert.Equal(t, "id", msg.ID)
+	assert.Equal(t, "action", msg.Status)
 
 	// The progress will always be in the format of:
 	// [=========================>                         ]      15B/30B 412910h51m30s
@@ -102,7 +79,5 @@ func TestJSONFormatProgress(t *testing.T) {
 			expectedProgress, expectedProgressShort, msg.ProgressMessage)
 	}
 
-	if !reflect.DeepEqual(msg.Progress, progress) {
-		t.Fatal("Original progress not equals progress from FormatProgress")
-	}
+	assert.Equal(t, progress, msg.Progress)
 }

+ 47 - 0
pkg/streamformatter/streamwriter.go

@@ -0,0 +1,47 @@
+package streamformatter
+
+import (
+	"encoding/json"
+	"io"
+
+	"github.com/docker/docker/pkg/jsonmessage"
+)
+
+type streamWriter struct {
+	io.Writer
+	lineFormat func([]byte) string
+}
+
+func (sw *streamWriter) Write(buf []byte) (int, error) {
+	formattedBuf := sw.format(buf)
+	n, err := sw.Writer.Write(formattedBuf)
+	if n != len(formattedBuf) {
+		return n, io.ErrShortWrite
+	}
+	return len(buf), err
+}
+
+func (sw *streamWriter) format(buf []byte) []byte {
+	msg := &jsonmessage.JSONMessage{Stream: sw.lineFormat(buf)}
+	b, err := json.Marshal(msg)
+	if err != nil {
+		return FormatError(err)
+	}
+	return appendNewline(b)
+}
+
+// NewStdoutWriter returns a writer which formats the output as json message
+// representing stdout lines
+func NewStdoutWriter(out io.Writer) io.Writer {
+	return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
+		return string(buf)
+	}}
+}
+
+// NewStderrWriter returns a writer which formats the output as json message
+// representing stderr lines
+func NewStderrWriter(out io.Writer) io.Writer {
+	return &streamWriter{Writer: out, lineFormat: func(buf []byte) string {
+		return "\033[91m" + string(buf) + "\033[0m"
+	}}
+}

+ 35 - 0
pkg/streamformatter/streamwriter_test.go

@@ -0,0 +1,35 @@
+package streamformatter
+
+import (
+	"testing"
+
+	"bytes"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+func TestStreamWriterStdout(t *testing.T) {
+	buffer := &bytes.Buffer{}
+	content := "content"
+	sw := NewStdoutWriter(buffer)
+	size, err := sw.Write([]byte(content))
+
+	require.NoError(t, err)
+	assert.Equal(t, len(content), size)
+
+	expected := `{"stream":"content"}` + streamNewline
+	assert.Equal(t, expected, buffer.String())
+}
+
+func TestStreamWriterStderr(t *testing.T) {
+	buffer := &bytes.Buffer{}
+	content := "content"
+	sw := NewStderrWriter(buffer)
+	size, err := sw.Write([]byte(content))
+
+	require.NoError(t, err)
+	assert.Equal(t, len(content), size)
+
+	expected := `{"stream":"\u001b[91mcontent\u001b[0m"}` + streamNewline
+	assert.Equal(t, expected, buffer.String())
+}