Ver código fonte

Merge pull request #6821 from LK4D4/broadcast_writer_refactor

Broadcast writer refactor
Michael Crosby 11 anos atrás
pai
commit
bade039bda

+ 5 - 4
daemon/container.go

@@ -29,6 +29,7 @@ import (
 	"github.com/dotcloud/docker/pkg/symlink"
 	"github.com/dotcloud/docker/runconfig"
 	"github.com/dotcloud/docker/utils"
+	"github.com/dotcloud/docker/utils/broadcastwriter"
 )
 
 const DefaultPathEnv = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
@@ -66,8 +67,8 @@ type Container struct {
 	ExecDriver     string
 
 	command   *execdriver.Command
-	stdout    *utils.WriteBroadcaster
-	stderr    *utils.WriteBroadcaster
+	stdout    *broadcastwriter.BroadcastWriter
+	stderr    *broadcastwriter.BroadcastWriter
 	stdin     io.ReadCloser
 	stdinPipe io.WriteCloser
 
@@ -502,10 +503,10 @@ func (container *Container) cleanup() {
 			utils.Errorf("%s: Error close stdin: %s", container.ID, err)
 		}
 	}
-	if err := container.stdout.CloseWriters(); err != nil {
+	if err := container.stdout.Clean(); err != nil {
 		utils.Errorf("%s: Error close stdout: %s", container.ID, err)
 	}
-	if err := container.stderr.CloseWriters(); err != nil {
+	if err := container.stderr.Clean(); err != nil {
 		utils.Errorf("%s: Error close stderr: %s", container.ID, err)
 	}
 	if container.command != nil && container.command.Terminal != nil {

+ 4 - 3
daemon/daemon.go

@@ -34,6 +34,7 @@ import (
 	"github.com/dotcloud/docker/pkg/truncindex"
 	"github.com/dotcloud/docker/runconfig"
 	"github.com/dotcloud/docker/utils"
+	"github.com/dotcloud/docker/utils/broadcastwriter"
 )
 
 // Set the max depth to the aufs default that most
@@ -169,8 +170,8 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool, con
 	container.daemon = daemon
 
 	// Attach to stdout and stderr
-	container.stderr = utils.NewWriteBroadcaster()
-	container.stdout = utils.NewWriteBroadcaster()
+	container.stderr = broadcastwriter.New()
+	container.stdout = broadcastwriter.New()
 	// Attach to stdin
 	if container.Config.OpenStdin {
 		container.stdin, container.stdinPipe = io.Pipe()
@@ -255,7 +256,7 @@ func (daemon *Daemon) ensureName(container *Container) error {
 	return nil
 }
 
-func (daemon *Daemon) LogToDisk(src *utils.WriteBroadcaster, dst, stream string) error {
+func (daemon *Daemon) LogToDisk(src *broadcastwriter.BroadcastWriter, dst, stream string) error {
 	log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return err

+ 92 - 0
utils/broadcastwriter/broadcastwriter.go

@@ -0,0 +1,92 @@
+package broadcastwriter
+
+import (
+	"bytes"
+	"encoding/json"
+	"io"
+	"sync"
+	"time"
+
+	"github.com/dotcloud/docker/utils"
+)
+
+// BroadcastWriter accumulate multiple io.WriteCloser by stream.
+type BroadcastWriter struct {
+	sync.Mutex
+	buf     *bytes.Buffer
+	streams map[string](map[io.WriteCloser]struct{})
+}
+
+// AddWriter adds new io.WriteCloser for stream.
+// If stream is "", then all writes proceed as is. Otherwise every line from
+// input will be packed to serialized utils.JSONLog.
+func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
+	w.Lock()
+	if _, ok := w.streams[stream]; !ok {
+		w.streams[stream] = make(map[io.WriteCloser]struct{})
+	}
+	w.streams[stream][writer] = struct{}{}
+	w.Unlock()
+}
+
+// Write writes bytes to all writers. Failed writers will be evicted during
+// this call.
+func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
+	created := time.Now().UTC()
+	w.Lock()
+	if writers, ok := w.streams[""]; ok {
+		for sw := range writers {
+			if n, err := sw.Write(p); err != nil || n != len(p) {
+				// On error, evict the writer
+				delete(writers, sw)
+			}
+		}
+	}
+	w.buf.Write(p)
+	for {
+		line, err := w.buf.ReadString('\n')
+		if err != nil {
+			w.buf.Write([]byte(line))
+			break
+		}
+		for stream, writers := range w.streams {
+			if stream == "" {
+				continue
+			}
+			b, err := json.Marshal(utils.JSONLog{Log: line, Stream: stream, Created: created})
+			if err != nil {
+				utils.Errorf("Error making JSON log line: %s", err)
+				continue
+			}
+			b = append(b, '\n')
+			for sw := range writers {
+				if _, err := sw.Write(b); err != nil {
+					delete(writers, sw)
+				}
+			}
+		}
+	}
+	w.Unlock()
+	return len(p), nil
+}
+
+// Clean closes and removes all writers. Last non-eol-terminated part of data
+// will be saved.
+func (w *BroadcastWriter) Clean() error {
+	w.Lock()
+	for _, writers := range w.streams {
+		for w := range writers {
+			w.Close()
+		}
+	}
+	w.streams = make(map[string](map[io.WriteCloser]struct{}))
+	w.Unlock()
+	return nil
+}
+
+func New() *BroadcastWriter {
+	return &BroadcastWriter{
+		streams: make(map[string](map[io.WriteCloser]struct{})),
+		buf:     bytes.NewBuffer(nil),
+	}
+}

+ 144 - 0
utils/broadcastwriter/broadcastwriter_test.go

@@ -0,0 +1,144 @@
+package broadcastwriter
+
+import (
+	"bytes"
+	"errors"
+
+	"testing"
+)
+
+type dummyWriter struct {
+	buffer      bytes.Buffer
+	failOnWrite bool
+}
+
+func (dw *dummyWriter) Write(p []byte) (n int, err error) {
+	if dw.failOnWrite {
+		return 0, errors.New("Fake fail")
+	}
+	return dw.buffer.Write(p)
+}
+
+func (dw *dummyWriter) String() string {
+	return dw.buffer.String()
+}
+
+func (dw *dummyWriter) Close() error {
+	return nil
+}
+
+func TestBroadcastWriter(t *testing.T) {
+	writer := New()
+
+	// Test 1: Both bufferA and bufferB should contain "foo"
+	bufferA := &dummyWriter{}
+	writer.AddWriter(bufferA, "")
+	bufferB := &dummyWriter{}
+	writer.AddWriter(bufferB, "")
+	writer.Write([]byte("foo"))
+
+	if bufferA.String() != "foo" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+
+	if bufferB.String() != "foo" {
+		t.Errorf("Buffer contains %v", bufferB.String())
+	}
+
+	// Test2: bufferA and bufferB should contain "foobar",
+	// while bufferC should only contain "bar"
+	bufferC := &dummyWriter{}
+	writer.AddWriter(bufferC, "")
+	writer.Write([]byte("bar"))
+
+	if bufferA.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+
+	if bufferB.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferB.String())
+	}
+
+	if bufferC.String() != "bar" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+
+	// Test3: Test eviction on failure
+	bufferA.failOnWrite = true
+	writer.Write([]byte("fail"))
+	if bufferA.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+	if bufferC.String() != "barfail" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+	// Even though we reset the flag, no more writes should go in there
+	bufferA.failOnWrite = false
+	writer.Write([]byte("test"))
+	if bufferA.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+	if bufferC.String() != "barfailtest" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+
+	writer.Clean()
+}
+
+type devNullCloser int
+
+func (d devNullCloser) Close() error {
+	return nil
+}
+
+func (d devNullCloser) Write(buf []byte) (int, error) {
+	return len(buf), nil
+}
+
+// This test checks for races. It is only useful when run with the race detector.
+func TestRaceBroadcastWriter(t *testing.T) {
+	writer := New()
+	c := make(chan bool)
+	go func() {
+		writer.AddWriter(devNullCloser(0), "")
+		c <- true
+	}()
+	writer.Write([]byte("hello"))
+	<-c
+}
+
+func BenchmarkBroadcastWriter(b *testing.B) {
+	writer := New()
+	setUpWriter := func() {
+		for i := 0; i < 100; i++ {
+			writer.AddWriter(devNullCloser(0), "stdout")
+			writer.AddWriter(devNullCloser(0), "stderr")
+			writer.AddWriter(devNullCloser(0), "")
+		}
+	}
+	testLine := "Line that thinks that it is log line from docker"
+	var buf bytes.Buffer
+	for i := 0; i < 100; i++ {
+		buf.Write([]byte(testLine + "\n"))
+	}
+	// line without eol
+	buf.Write([]byte(testLine))
+	testText := buf.Bytes()
+	b.SetBytes(int64(5 * len(testText)))
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		b.StopTimer()
+		setUpWriter()
+		b.StartTimer()
+
+		for j := 0; j < 5; j++ {
+			if _, err := writer.Write(testText); err != nil {
+				b.Fatal(err)
+			}
+		}
+
+		b.StopTimer()
+		writer.Clean()
+		b.StartTimer()
+	}
+}

+ 0 - 86
utils/utils.go

@@ -265,21 +265,6 @@ func (r *bufReader) Close() error {
 	return closer.Close()
 }
 
-type WriteBroadcaster struct {
-	sync.Mutex
-	buf     *bytes.Buffer
-	streams map[string](map[io.WriteCloser]struct{})
-}
-
-func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
-	w.Lock()
-	if _, ok := w.streams[stream]; !ok {
-		w.streams[stream] = make(map[io.WriteCloser]struct{})
-	}
-	w.streams[stream][writer] = struct{}{}
-	w.Unlock()
-}
-
 type JSONLog struct {
 	Log     string    `json:"log,omitempty"`
 	Stream  string    `json:"stream,omitempty"`
@@ -316,77 +301,6 @@ func WriteLog(src io.Reader, dst io.WriteCloser, format string) error {
 	}
 }
 
-type LogFormatter struct {
-	wc         io.WriteCloser
-	timeFormat string
-}
-
-func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
-	created := time.Now().UTC()
-	w.Lock()
-	defer w.Unlock()
-	if writers, ok := w.streams[""]; ok {
-		for sw := range writers {
-			if n, err := sw.Write(p); err != nil || n != len(p) {
-				// On error, evict the writer
-				delete(writers, sw)
-			}
-		}
-	}
-	w.buf.Write(p)
-	lines := []string{}
-	for {
-		line, err := w.buf.ReadString('\n')
-		if err != nil {
-			w.buf.Write([]byte(line))
-			break
-		}
-		lines = append(lines, line)
-	}
-
-	if len(lines) != 0 {
-		for stream, writers := range w.streams {
-			if stream == "" {
-				continue
-			}
-			var lp []byte
-			for _, line := range lines {
-				b, err := json.Marshal(&JSONLog{Log: line, Stream: stream, Created: created})
-				if err != nil {
-					Errorf("Error making JSON log line: %s", err)
-				}
-				lp = append(lp, b...)
-				lp = append(lp, '\n')
-			}
-			for sw := range writers {
-				if _, err := sw.Write(lp); err != nil {
-					delete(writers, sw)
-				}
-			}
-		}
-	}
-	return len(p), nil
-}
-
-func (w *WriteBroadcaster) CloseWriters() error {
-	w.Lock()
-	defer w.Unlock()
-	for _, writers := range w.streams {
-		for w := range writers {
-			w.Close()
-		}
-	}
-	w.streams = make(map[string](map[io.WriteCloser]struct{}))
-	return nil
-}
-
-func NewWriteBroadcaster() *WriteBroadcaster {
-	return &WriteBroadcaster{
-		streams: make(map[string](map[io.WriteCloser]struct{})),
-		buf:     bytes.NewBuffer(nil),
-	}
-}
-
 func GetTotalUsedFds() int {
 	if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
 		Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)

+ 0 - 101
utils/utils_test.go

@@ -2,7 +2,6 @@ package utils
 
 import (
 	"bytes"
-	"errors"
 	"io"
 	"io/ioutil"
 	"os"
@@ -35,106 +34,6 @@ func TestBufReader(t *testing.T) {
 	}
 }
 
-type dummyWriter struct {
-	buffer      bytes.Buffer
-	failOnWrite bool
-}
-
-func (dw *dummyWriter) Write(p []byte) (n int, err error) {
-	if dw.failOnWrite {
-		return 0, errors.New("Fake fail")
-	}
-	return dw.buffer.Write(p)
-}
-
-func (dw *dummyWriter) String() string {
-	return dw.buffer.String()
-}
-
-func (dw *dummyWriter) Close() error {
-	return nil
-}
-
-func TestWriteBroadcaster(t *testing.T) {
-	writer := NewWriteBroadcaster()
-
-	// Test 1: Both bufferA and bufferB should contain "foo"
-	bufferA := &dummyWriter{}
-	writer.AddWriter(bufferA, "")
-	bufferB := &dummyWriter{}
-	writer.AddWriter(bufferB, "")
-	writer.Write([]byte("foo"))
-
-	if bufferA.String() != "foo" {
-		t.Errorf("Buffer contains %v", bufferA.String())
-	}
-
-	if bufferB.String() != "foo" {
-		t.Errorf("Buffer contains %v", bufferB.String())
-	}
-
-	// Test2: bufferA and bufferB should contain "foobar",
-	// while bufferC should only contain "bar"
-	bufferC := &dummyWriter{}
-	writer.AddWriter(bufferC, "")
-	writer.Write([]byte("bar"))
-
-	if bufferA.String() != "foobar" {
-		t.Errorf("Buffer contains %v", bufferA.String())
-	}
-
-	if bufferB.String() != "foobar" {
-		t.Errorf("Buffer contains %v", bufferB.String())
-	}
-
-	if bufferC.String() != "bar" {
-		t.Errorf("Buffer contains %v", bufferC.String())
-	}
-
-	// Test3: Test eviction on failure
-	bufferA.failOnWrite = true
-	writer.Write([]byte("fail"))
-	if bufferA.String() != "foobar" {
-		t.Errorf("Buffer contains %v", bufferA.String())
-	}
-	if bufferC.String() != "barfail" {
-		t.Errorf("Buffer contains %v", bufferC.String())
-	}
-	// Even though we reset the flag, no more writes should go in there
-	bufferA.failOnWrite = false
-	writer.Write([]byte("test"))
-	if bufferA.String() != "foobar" {
-		t.Errorf("Buffer contains %v", bufferA.String())
-	}
-	if bufferC.String() != "barfailtest" {
-		t.Errorf("Buffer contains %v", bufferC.String())
-	}
-
-	writer.CloseWriters()
-}
-
-type devNullCloser int
-
-func (d devNullCloser) Close() error {
-	return nil
-}
-
-func (d devNullCloser) Write(buf []byte) (int, error) {
-	return len(buf), nil
-}
-
-// This test checks for races. It is only useful when run with the race detector.
-func TestRaceWriteBroadcaster(t *testing.T) {
-	writer := NewWriteBroadcaster()
-	c := make(chan bool)
-	go func() {
-		writer.AddWriter(devNullCloser(0), "")
-		c <- true
-	}()
-	writer.Write([]byte("hello"))
-	<-c
-}
-
 func assertKernelVersion(t *testing.T, a, b *KernelVersionInfo, result int) {
 	if r := CompareKernelVersion(a, b); r != result {
 		t.Fatalf("Unexpected kernel version comparison result. Found %d, expected %d", r, result)