diff --git a/daemon/container.go b/daemon/container.go index c7a6774601..419314b8ba 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -503,10 +503,10 @@ func (container *Container) cleanup() { utils.Errorf("%s: Error close stdin: %s", container.ID, err) } } - if err := container.stdout.Close(); err != nil { + if err := container.stdout.Clean(); err != nil { utils.Errorf("%s: Error close stdout: %s", container.ID, err) } - if err := container.stderr.Close(); err != nil { + if err := container.stderr.Clean(); err != nil { utils.Errorf("%s: Error close stderr: %s", container.ID, err) } if container.command != nil && container.command.Terminal != nil { diff --git a/utils/broadcastwriter/broadcastwriter.go b/utils/broadcastwriter/broadcastwriter.go index 81ca9e5bdd..9bd2c1471e 100644 --- a/utils/broadcastwriter/broadcastwriter.go +++ b/utils/broadcastwriter/broadcastwriter.go @@ -10,12 +10,16 @@ import ( "github.com/dotcloud/docker/utils" ) +// BroadcastWriter accumulate multiple io.WriteCloser by stream. type BroadcastWriter struct { sync.Mutex buf *bytes.Buffer streams map[string](map[io.WriteCloser]struct{}) } +// AddWriter adds new io.WriteCloser for stream. +// If stream is "", then all writes proceed as is. Otherwise every line from +// input will be packed to serialized utils.JSONLog. func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { w.Lock() if _, ok := w.streams[stream]; !ok { @@ -25,10 +29,11 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { w.Unlock() } +// Write writes bytes to all writers. Failed writers will be evicted during +// this call. func (w *BroadcastWriter) Write(p []byte) (n int, err error) { created := time.Now().UTC() w.Lock() - defer w.Unlock() if writers, ok := w.streams[""]; ok { for sw := range writers { if n, err := sw.Write(p); err != nil || n != len(p) { @@ -38,49 +43,44 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { } } w.buf.Write(p) - lines := []string{} for { line, err := w.buf.ReadString('\n') if err != nil { w.buf.Write([]byte(line)) break } - lines = append(lines, line) - } - - if len(lines) != 0 { for stream, writers := range w.streams { if stream == "" { continue } - var lp []byte - for _, line := range lines { - b, err := json.Marshal(&utils.JSONLog{Log: line, Stream: stream, Created: created}) - if err != nil { - utils.Errorf("Error making JSON log line: %s", err) - } - lp = append(lp, b...) - lp = append(lp, '\n') + b, err := json.Marshal(utils.JSONLog{Log: line, Stream: stream, Created: created}) + if err != nil { + utils.Errorf("Error making JSON log line: %s", err) + continue } + b = append(b, '\n') for sw := range writers { - if _, err := sw.Write(lp); err != nil { + if _, err := sw.Write(b); err != nil { delete(writers, sw) } } } } + w.Unlock() return len(p), nil } -func (w *BroadcastWriter) Close() error { +// Clean closes and removes all writers. Last non-eol-terminated part of data +// will be saved. +func (w *BroadcastWriter) Clean() error { w.Lock() - defer w.Unlock() for _, writers := range w.streams { for w := range writers { w.Close() } } w.streams = make(map[string](map[io.WriteCloser]struct{})) + w.Unlock() return nil } diff --git a/utils/broadcastwriter/broadcastwriter_test.go b/utils/broadcastwriter/broadcastwriter_test.go index d5c9152467..62ca12659a 100644 --- a/utils/broadcastwriter/broadcastwriter_test.go +++ b/utils/broadcastwriter/broadcastwriter_test.go @@ -82,7 +82,7 @@ func TestBroadcastWriter(t *testing.T) { t.Errorf("Buffer contains %v", bufferC.String()) } - writer.Close() + writer.Clean() } type devNullCloser int @@ -138,7 +138,7 @@ func BenchmarkBroadcastWriter(b *testing.B) { } b.StopTimer() - writer.Close() + writer.Clean() b.StartTimer() } }