From c87d67b0ad788a6a80d1af89488e2d1f22726c34 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 1 May 2017 14:54:56 -0400 Subject: [PATCH] Refacator pkg/streamformatter StreamFormatter suffered was two distinct structs mixed into a single struct without any overlap. Signed-off-by: Daniel Nephin --- api/server/router/build/build_routes.go | 33 ++-- api/server/router/image/image_routes.go | 11 +- api/server/router/plugin/plugin_routes.go | 6 +- api/types/backend/build.go | 5 +- builder/dockerfile/internals.go | 3 +- cli/command/image/build.go | 2 +- cli/command/image/build/context.go | 2 +- cli/command/service/progress/progress.go | 2 +- daemon/import.go | 7 +- distribution/utils/progress.go | 2 +- image/tarexport/load.go | 9 +- pkg/streamformatter/streamformatter.go | 174 ++++++++------------ pkg/streamformatter/streamformatter_test.go | 117 ++++++------- pkg/streamformatter/streamwriter.go | 47 ++++++ pkg/streamformatter/streamwriter_test.go | 35 ++++ 15 files changed, 229 insertions(+), 226 deletions(-) create mode 100644 pkg/streamformatter/streamwriter.go create mode 100644 pkg/streamformatter/streamwriter_test.go diff --git a/api/server/router/build/build_routes.go b/api/server/router/build/build_routes.go index eea5af4a18..d927674436 100644 --- a/api/server/router/build/build_routes.go +++ b/api/server/router/build/build_routes.go @@ -138,7 +138,6 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r * output := ioutils.NewWriteFlusher(w) defer output.Close() - sf := streamformatter.NewJSONStreamFormatter() errf := func(err error) error { if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 { output.Write(notVerboseBuffer.Bytes()) @@ -148,7 +147,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r * if !output.Flushed() { return err } - _, err = w.Write(sf.FormatError(err)) + _, err = w.Write(streamformatter.FormatError(err)) if err != nil { logrus.Warnf("could not write error response: %v", err) } @@ -166,25 +165,22 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r * errors.New("squash is only supported with experimental mode")) } - // Currently, only used if context is from a remote url. - // Look at code in DetectContextFromRemoteURL for more information. - createProgressReader := func(in io.ReadCloser) io.ReadCloser { - progressOutput := sf.NewProgressOutput(output, true) - if buildOptions.SuppressOutput { - progressOutput = sf.NewProgressOutput(notVerboseBuffer, true) - } - return progress.NewProgressReader(in, progressOutput, r.ContentLength, "Downloading context", buildOptions.RemoteContext) - } - out := io.Writer(output) if buildOptions.SuppressOutput { out = notVerboseBuffer } + // Currently, only used if context is from a remote url. + // Look at code in DetectContextFromRemoteURL for more information. + createProgressReader := func(in io.ReadCloser) io.ReadCloser { + progressOutput := streamformatter.NewJSONProgressOutput(out, true) + return progress.NewProgressReader(in, progressOutput, r.ContentLength, "Downloading context", buildOptions.RemoteContext) + } + imgID, err := br.backend.Build(ctx, backend.BuildConfig{ Source: r.Body, Options: buildOptions, - ProgressWriter: buildProgressWriter(out, sf, createProgressReader), + ProgressWriter: buildProgressWriter(out, createProgressReader), }) if err != nil { return errf(err) @@ -193,8 +189,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r * // Everything worked so if -q was provided the output from the daemon // should be just the image ID and we'll print that to stdout. if buildOptions.SuppressOutput { - stdout := &streamformatter.StdoutFormatter{Writer: output, StreamFormatter: sf} - fmt.Fprintln(stdout, imgID) + fmt.Fprintln(streamformatter.NewStdoutWriter(output), imgID) } return nil } @@ -226,15 +221,13 @@ func (s *syncWriter) Write(b []byte) (count int, err error) { return } -func buildProgressWriter(out io.Writer, sf *streamformatter.StreamFormatter, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter { +func buildProgressWriter(out io.Writer, createProgressReader func(io.ReadCloser) io.ReadCloser) backend.ProgressWriter { out = &syncWriter{w: out} - stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf} - stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf} return backend.ProgressWriter{ Output: out, - StdoutFormatter: stdout, - StderrFormatter: stderr, + StdoutFormatter: streamformatter.NewStdoutWriter(out), + StderrFormatter: streamformatter.NewStderrWriter(out), ProgressReaderFunc: createProgressReader, } } diff --git a/api/server/router/image/image_routes.go b/api/server/router/image/image_routes.go index 75db6d441c..465182caa1 100644 --- a/api/server/router/image/image_routes.go +++ b/api/server/router/image/image_routes.go @@ -118,8 +118,7 @@ func (s *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrite if !output.Flushed() { return err } - sf := streamformatter.NewJSONStreamFormatter() - output.Write(sf.FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil @@ -164,8 +163,7 @@ func (s *imageRouter) postImagesPush(ctx context.Context, w http.ResponseWriter, if !output.Flushed() { return err } - sf := streamformatter.NewJSONStreamFormatter() - output.Write(sf.FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil } @@ -190,8 +188,7 @@ func (s *imageRouter) getImagesGet(ctx context.Context, w http.ResponseWriter, r if !output.Flushed() { return err } - sf := streamformatter.NewJSONStreamFormatter() - output.Write(sf.FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil } @@ -207,7 +204,7 @@ func (s *imageRouter) postImagesLoad(ctx context.Context, w http.ResponseWriter, output := ioutils.NewWriteFlusher(w) defer output.Close() if err := s.backend.LoadImage(r.Body, output, quiet); err != nil { - output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil } diff --git a/api/server/router/plugin/plugin_routes.go b/api/server/router/plugin/plugin_routes.go index 0d743a4a95..79e3cf5de8 100644 --- a/api/server/router/plugin/plugin_routes.go +++ b/api/server/router/plugin/plugin_routes.go @@ -121,7 +121,7 @@ func (pr *pluginRouter) upgradePlugin(ctx context.Context, w http.ResponseWriter if !output.Flushed() { return err } - output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil @@ -160,7 +160,7 @@ func (pr *pluginRouter) pullPlugin(ctx context.Context, w http.ResponseWriter, r if !output.Flushed() { return err } - output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil @@ -268,7 +268,7 @@ func (pr *pluginRouter) pushPlugin(ctx context.Context, w http.ResponseWriter, r if !output.Flushed() { return err } - output.Write(streamformatter.NewJSONStreamFormatter().FormatError(err)) + output.Write(streamformatter.FormatError(err)) } return nil } diff --git a/api/types/backend/build.go b/api/types/backend/build.go index 78f955b8fc..adbd6b344a 100644 --- a/api/types/backend/build.go +++ b/api/types/backend/build.go @@ -4,14 +4,13 @@ import ( "io" "github.com/docker/docker/api/types" - "github.com/docker/docker/pkg/streamformatter" ) // ProgressWriter is a data object to transport progress streams to the client type ProgressWriter struct { Output io.Writer - StdoutFormatter *streamformatter.StdoutFormatter - StderrFormatter *streamformatter.StderrFormatter + StdoutFormatter io.Writer + StderrFormatter io.Writer ProgressReaderFunc func(io.ReadCloser) io.ReadCloser } diff --git a/builder/dockerfile/internals.go b/builder/dockerfile/internals.go index 1cb2eeef16..e353710a5f 100644 --- a/builder/dockerfile/internals.go +++ b/builder/dockerfile/internals.go @@ -275,8 +275,7 @@ func (b *Builder) download(srcURL string) (remote builder.Source, p string, err return } - stdoutFormatter := b.Stdout.(*streamformatter.StdoutFormatter) - progressOutput := stdoutFormatter.StreamFormatter.NewProgressOutput(stdoutFormatter.Writer, true) + progressOutput := streamformatter.NewJSONProgressOutput(b.Output, true) progressReader := progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Downloading") // Download and dump result to tmp file // TODO: add filehash directly diff --git a/cli/command/image/build.go b/cli/command/image/build.go index 27fe83c524..cdb116f169 100644 --- a/cli/command/image/build.go +++ b/cli/command/image/build.go @@ -269,7 +269,7 @@ func runBuild(dockerCli *command.DockerCli, options buildOptions) error { } // Setup an upload progress bar - progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(progBuff, true) + progressOutput := streamformatter.NewProgressOutput(progBuff) if !dockerCli.Out().IsTerminal() { progressOutput = &lastProgressOutput{output: progressOutput} } diff --git a/cli/command/image/build/context.go b/cli/command/image/build/context.go index 348c721931..5e66717a04 100644 --- a/cli/command/image/build/context.go +++ b/cli/command/image/build/context.go @@ -154,7 +154,7 @@ func GetContextFromURL(out io.Writer, remoteURL, dockerfileName string) (io.Read if err != nil { return nil, "", errors.Errorf("unable to download remote context %s: %v", remoteURL, err) } - progressOutput := streamformatter.NewStreamFormatter().NewProgressOutput(out, true) + progressOutput := streamformatter.NewProgressOutput(out) // Pass the response body through a progress reader. progReader := progress.NewProgressReader(response.Body, progressOutput, response.ContentLength, "", fmt.Sprintf("Downloading build context from remote url: %s", remoteURL)) diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index bfeaa314a4..d68fc6c1af 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -62,7 +62,7 @@ func stateToProgress(state swarm.TaskState, rollback bool) int64 { func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error { defer progressWriter.Close() - progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false) + progressOut := streamformatter.NewJSONProgressOutput(progressWriter, false) sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt) diff --git a/daemon/import.go b/daemon/import.go index fc9f2682c5..17b9d870a3 100644 --- a/daemon/import.go +++ b/daemon/import.go @@ -28,7 +28,6 @@ import ( // the repo and tag arguments, respectively. func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string, inConfig io.ReadCloser, outStream io.Writer, changes []string) error { var ( - sf = streamformatter.NewJSONStreamFormatter() rc io.ReadCloser resp *http.Response newRef reference.Named @@ -72,8 +71,8 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string if err != nil { return err } - outStream.Write(sf.FormatStatus("", "Downloading from %s", u)) - progressOutput := sf.NewProgressOutput(outStream, true) + outStream.Write(streamformatter.FormatStatus("", "Downloading from %s", u)) + progressOutput := streamformatter.NewJSONProgressOutput(outStream, true) rc = progress.NewProgressReader(resp.Body, progressOutput, resp.ContentLength, "", "Importing") } @@ -129,6 +128,6 @@ func (daemon *Daemon) ImportImage(src string, repository, tag string, msg string } daemon.LogImageEvent(id.String(), id.String(), "import") - outStream.Write(sf.FormatStatus("", id.String())) + outStream.Write(streamformatter.FormatStatus("", id.String())) return nil } diff --git a/distribution/utils/progress.go b/distribution/utils/progress.go index ef8ecc89f6..cc3632a534 100644 --- a/distribution/utils/progress.go +++ b/distribution/utils/progress.go @@ -14,7 +14,7 @@ import ( // WriteDistributionProgress is a helper for writing progress from chan to JSON // stream with an optional cancel function. func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) { - progressOutput := streamformatter.NewJSONStreamFormatter().NewProgressOutput(outStream, false) + progressOutput := streamformatter.NewJSONProgressOutput(outStream, false) operationCancelled := false for prog := range progressChan { diff --git a/image/tarexport/load.go b/image/tarexport/load.go index cdd377ab90..14b7c1af0b 100644 --- a/image/tarexport/load.go +++ b/image/tarexport/load.go @@ -26,14 +26,11 @@ import ( ) func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error { - var ( - sf = streamformatter.NewJSONStreamFormatter() - progressOutput progress.Output - ) + var progressOutput progress.Output if !quiet { - progressOutput = sf.NewProgressOutput(outStream, false) + progressOutput = streamformatter.NewJSONProgressOutput(outStream, false) } - outStream = &streamformatter.StdoutFormatter{Writer: outStream, StreamFormatter: streamformatter.NewJSONStreamFormatter()} + outStream = streamformatter.NewStdoutWriter(outStream) tmpDir, err := ioutil.TempDir("", "docker-import-") if err != nil { diff --git a/pkg/streamformatter/streamformatter.go b/pkg/streamformatter/streamformatter.go index f2868441ee..fa79828172 100644 --- a/pkg/streamformatter/streamformatter.go +++ b/pkg/streamformatter/streamformatter.go @@ -10,91 +10,76 @@ import ( "github.com/docker/docker/pkg/progress" ) -// StreamFormatter formats a stream, optionally using JSON. -type StreamFormatter struct { - json bool -} - -// NewStreamFormatter returns a simple StreamFormatter -func NewStreamFormatter() *StreamFormatter { - return &StreamFormatter{} -} - -// NewJSONStreamFormatter returns a StreamFormatter configured to stream json -func NewJSONStreamFormatter() *StreamFormatter { - return &StreamFormatter{true} -} - const streamNewline = "\r\n" -var streamNewlineBytes = []byte(streamNewline) +type jsonProgressFormatter struct{} -// FormatStream formats the specified stream. -func (sf *StreamFormatter) FormatStream(str string) []byte { - if sf.json { - b, err := json.Marshal(&jsonmessage.JSONMessage{Stream: str}) - if err != nil { - return sf.FormatError(err) - } - return append(b, streamNewlineBytes...) - } - return []byte(str + "\r") +func appendNewline(source []byte) []byte { + return append(source, []byte(streamNewline)...) } // FormatStatus formats the specified objects according to the specified format (and id). -func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte { +func FormatStatus(id, format string, a ...interface{}) []byte { str := fmt.Sprintf(format, a...) - if sf.json { - b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str}) - if err != nil { - return sf.FormatError(err) - } - return append(b, streamNewlineBytes...) + b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str}) + if err != nil { + return FormatError(err) } - return []byte(str + streamNewline) + return appendNewline(b) } -// FormatError formats the specified error. -func (sf *StreamFormatter) FormatError(err error) []byte { - if sf.json { - jsonError, ok := err.(*jsonmessage.JSONError) - if !ok { - jsonError = &jsonmessage.JSONError{Message: err.Error()} - } - if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil { - return append(b, streamNewlineBytes...) - } - return []byte("{\"error\":\"format error\"}" + streamNewline) +// FormatError formats the error as a JSON object +func FormatError(err error) []byte { + jsonError, ok := err.(*jsonmessage.JSONError) + if !ok { + jsonError = &jsonmessage.JSONError{Message: err.Error()} } - return []byte("Error: " + err.Error() + streamNewline) + if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil { + return appendNewline(b) + } + return []byte(`{"error":"format error"}` + streamNewline) } -// FormatProgress formats the progress information for a specified action. -func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte { +func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte { + return FormatStatus(id, format, a...) +} + +// formatProgress formats the progress information for a specified action. +func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte { if progress == nil { progress = &jsonmessage.JSONProgress{} } - if sf.json { - var auxJSON *json.RawMessage - if aux != nil { - auxJSONBytes, err := json.Marshal(aux) - if err != nil { - return nil - } - auxJSON = new(json.RawMessage) - *auxJSON = auxJSONBytes - } - b, err := json.Marshal(&jsonmessage.JSONMessage{ - Status: action, - ProgressMessage: progress.String(), - Progress: progress, - ID: id, - Aux: auxJSON, - }) + var auxJSON *json.RawMessage + if aux != nil { + auxJSONBytes, err := json.Marshal(aux) if err != nil { return nil } - return append(b, streamNewlineBytes...) + auxJSON = new(json.RawMessage) + *auxJSON = auxJSONBytes + } + b, err := json.Marshal(&jsonmessage.JSONMessage{ + Status: action, + ProgressMessage: progress.String(), + Progress: progress, + ID: id, + Aux: auxJSON, + }) + if err != nil { + return nil + } + return appendNewline(b) +} + +type rawProgressFormatter struct{} + +func (sf *rawProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte { + return []byte(fmt.Sprintf(format, a...) + streamNewline) +} + +func (sf *rawProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte { + if progress == nil { + progress = &jsonmessage.JSONProgress{} } endl := "\r" if progress.String() == "" { @@ -105,16 +90,23 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *jsonmessa // NewProgressOutput returns a progress.Output object that can be passed to // progress.NewProgressReader. -func (sf *StreamFormatter) NewProgressOutput(out io.Writer, newLines bool) progress.Output { - return &progressOutput{ - sf: sf, - out: out, - newLines: newLines, - } +func NewProgressOutput(out io.Writer) progress.Output { + return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true} +} + +// NewJSONProgressOutput returns a progress.Output that that formats output +// using JSON objects +func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output { + return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines} +} + +type formatProgress interface { + formatStatus(id, format string, a ...interface{}) []byte + formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte } type progressOutput struct { - sf *StreamFormatter + sf formatProgress out io.Writer newLines bool } @@ -123,10 +115,10 @@ type progressOutput struct { func (out *progressOutput) WriteProgress(prog progress.Progress) error { var formatted []byte if prog.Message != "" { - formatted = out.sf.FormatStatus(prog.ID, prog.Message) + formatted = out.sf.formatStatus(prog.ID, prog.Message) } else { jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total, HideCounts: prog.HideCounts} - formatted = out.sf.FormatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux) + formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux) } _, err := out.out.Write(formatted) if err != nil { @@ -134,39 +126,9 @@ func (out *progressOutput) WriteProgress(prog progress.Progress) error { } if out.newLines && prog.LastUpdate { - _, err = out.out.Write(out.sf.FormatStatus("", "")) + _, err = out.out.Write(out.sf.formatStatus("", "")) return err } return nil } - -// StdoutFormatter is a streamFormatter that writes to the standard output. -type StdoutFormatter struct { - io.Writer - *StreamFormatter -} - -func (sf *StdoutFormatter) Write(buf []byte) (int, error) { - formattedBuf := sf.StreamFormatter.FormatStream(string(buf)) - n, err := sf.Writer.Write(formattedBuf) - if n != len(formattedBuf) { - return n, io.ErrShortWrite - } - return len(buf), err -} - -// StderrFormatter is a streamFormatter that writes to the standard error. -type StderrFormatter struct { - io.Writer - *StreamFormatter -} - -func (sf *StderrFormatter) Write(buf []byte) (int, error) { - formattedBuf := sf.StreamFormatter.FormatStream("\033[91m" + string(buf) + "\033[0m") - n, err := sf.Writer.Write(formattedBuf) - if n != len(formattedBuf) { - return n, io.ErrShortWrite - } - return len(buf), err -} diff --git a/pkg/streamformatter/streamformatter_test.go b/pkg/streamformatter/streamformatter_test.go index f087b92d8f..8ebcc74ee5 100644 --- a/pkg/streamformatter/streamformatter_test.go +++ b/pkg/streamformatter/streamformatter_test.go @@ -3,88 +3,65 @@ package streamformatter import ( "encoding/json" "errors" - "reflect" "strings" "testing" "github.com/docker/docker/pkg/jsonmessage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestFormatStream(t *testing.T) { - sf := NewStreamFormatter() - res := sf.FormatStream("stream") - if string(res) != "stream"+"\r" { - t.Fatalf("%q", res) - } +func TestRawProgressFormatterFormatStatus(t *testing.T) { + sf := rawProgressFormatter{} + res := sf.formatStatus("ID", "%s%d", "a", 1) + assert.Equal(t, "a1\r\n", string(res)) } -func TestFormatJSONStatus(t *testing.T) { - sf := NewStreamFormatter() - res := sf.FormatStatus("ID", "%s%d", "a", 1) - if string(res) != "a1\r\n" { - t.Fatalf("%q", res) - } -} - -func TestFormatSimpleError(t *testing.T) { - sf := NewStreamFormatter() - res := sf.FormatError(errors.New("Error for formatter")) - if string(res) != "Error: Error for formatter\r\n" { - t.Fatalf("%q", res) - } -} - -func TestJSONFormatStream(t *testing.T) { - sf := NewJSONStreamFormatter() - res := sf.FormatStream("stream") - if string(res) != `{"stream":"stream"}`+"\r\n" { - t.Fatalf("%q", res) - } -} - -func TestJSONFormatStatus(t *testing.T) { - sf := NewJSONStreamFormatter() - res := sf.FormatStatus("ID", "%s%d", "a", 1) - if string(res) != `{"status":"a1","id":"ID"}`+"\r\n" { - t.Fatalf("%q", res) - } -} - -func TestJSONFormatSimpleError(t *testing.T) { - sf := NewJSONStreamFormatter() - res := sf.FormatError(errors.New("Error for formatter")) - if string(res) != `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}`+"\r\n" { - t.Fatalf("%q", res) - } -} - -func TestJSONFormatJSONError(t *testing.T) { - sf := NewJSONStreamFormatter() - err := &jsonmessage.JSONError{Code: 50, Message: "Json error"} - res := sf.FormatError(err) - if string(res) != `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}`+"\r\n" { - t.Fatalf("%q", res) - } -} - -func TestJSONFormatProgress(t *testing.T) { - sf := NewJSONStreamFormatter() +func TestRawProgressFormatterFormatProgress(t *testing.T) { + sf := rawProgressFormatter{} progress := &jsonmessage.JSONProgress{ Current: 15, Total: 30, Start: 1, } - res := sf.FormatProgress("id", "action", progress, nil) + res := sf.formatProgress("id", "action", progress, nil) + out := string(res) + assert.True(t, strings.HasPrefix(out, "action [====")) + assert.Contains(t, out, "15B/30B") + assert.True(t, strings.HasSuffix(out, "\r")) +} + +func TestFormatStatus(t *testing.T) { + res := FormatStatus("ID", "%s%d", "a", 1) + expected := `{"status":"a1","id":"ID"}` + streamNewline + assert.Equal(t, expected, string(res)) +} + +func TestFormatError(t *testing.T) { + res := FormatError(errors.New("Error for formatter")) + expected := `{"errorDetail":{"message":"Error for formatter"},"error":"Error for formatter"}` + "\r\n" + assert.Equal(t, expected, string(res)) +} + +func TestFormatJSONError(t *testing.T) { + err := &jsonmessage.JSONError{Code: 50, Message: "Json error"} + res := FormatError(err) + expected := `{"errorDetail":{"code":50,"message":"Json error"},"error":"Json error"}` + streamNewline + assert.Equal(t, expected, string(res)) +} + +func TestJsonProgressFormatterFormatProgress(t *testing.T) { + sf := &jsonProgressFormatter{} + progress := &jsonmessage.JSONProgress{ + Current: 15, + Total: 30, + Start: 1, + } + res := sf.formatProgress("id", "action", progress, nil) msg := &jsonmessage.JSONMessage{} - if err := json.Unmarshal(res, msg); err != nil { - t.Fatal(err) - } - if msg.ID != "id" { - t.Fatalf("ID must be 'id', got: %s", msg.ID) - } - if msg.Status != "action" { - t.Fatalf("Status must be 'action', got: %s", msg.Status) - } + require.NoError(t, json.Unmarshal(res, msg)) + assert.Equal(t, "id", msg.ID) + assert.Equal(t, "action", msg.Status) // The progress will always be in the format of: // [=========================> ] 15B/30B 412910h51m30s @@ -102,7 +79,5 @@ func TestJSONFormatProgress(t *testing.T) { expectedProgress, expectedProgressShort, msg.ProgressMessage) } - if !reflect.DeepEqual(msg.Progress, progress) { - t.Fatal("Original progress not equals progress from FormatProgress") - } + assert.Equal(t, progress, msg.Progress) } diff --git a/pkg/streamformatter/streamwriter.go b/pkg/streamformatter/streamwriter.go new file mode 100644 index 0000000000..141d12e20e --- /dev/null +++ b/pkg/streamformatter/streamwriter.go @@ -0,0 +1,47 @@ +package streamformatter + +import ( + "encoding/json" + "io" + + "github.com/docker/docker/pkg/jsonmessage" +) + +type streamWriter struct { + io.Writer + lineFormat func([]byte) string +} + +func (sw *streamWriter) Write(buf []byte) (int, error) { + formattedBuf := sw.format(buf) + n, err := sw.Writer.Write(formattedBuf) + if n != len(formattedBuf) { + return n, io.ErrShortWrite + } + return len(buf), err +} + +func (sw *streamWriter) format(buf []byte) []byte { + msg := &jsonmessage.JSONMessage{Stream: sw.lineFormat(buf)} + b, err := json.Marshal(msg) + if err != nil { + return FormatError(err) + } + return appendNewline(b) +} + +// NewStdoutWriter returns a writer which formats the output as json message +// representing stdout lines +func NewStdoutWriter(out io.Writer) io.Writer { + return &streamWriter{Writer: out, lineFormat: func(buf []byte) string { + return string(buf) + }} +} + +// NewStderrWriter returns a writer which formats the output as json message +// representing stderr lines +func NewStderrWriter(out io.Writer) io.Writer { + return &streamWriter{Writer: out, lineFormat: func(buf []byte) string { + return "\033[91m" + string(buf) + "\033[0m" + }} +} diff --git a/pkg/streamformatter/streamwriter_test.go b/pkg/streamformatter/streamwriter_test.go new file mode 100644 index 0000000000..4935cc595c --- /dev/null +++ b/pkg/streamformatter/streamwriter_test.go @@ -0,0 +1,35 @@ +package streamformatter + +import ( + "testing" + + "bytes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStreamWriterStdout(t *testing.T) { + buffer := &bytes.Buffer{} + content := "content" + sw := NewStdoutWriter(buffer) + size, err := sw.Write([]byte(content)) + + require.NoError(t, err) + assert.Equal(t, len(content), size) + + expected := `{"stream":"content"}` + streamNewline + assert.Equal(t, expected, buffer.String()) +} + +func TestStreamWriterStderr(t *testing.T) { + buffer := &bytes.Buffer{} + content := "content" + sw := NewStderrWriter(buffer) + size, err := sw.Write([]byte(content)) + + require.NoError(t, err) + assert.Equal(t, len(content), size) + + expected := `{"stream":"\u001b[91mcontent\u001b[0m"}` + streamNewline + assert.Equal(t, expected, buffer.String()) +}