瀏覽代碼

Move types from progressreader and broadcastwriter to broadcaster

progressreader.Broadcaster becomes broadcaster.Buffered and
broadcastwriter.Writer becomes broadcaster.Unbuffered.

The package broadcastwriter is thus renamed to broadcaster.

Signed-off-by: Tibor Vass <tibor@docker.com>
Tibor Vass 9 年之前
父節點
當前提交
2391233404
共有 9 個文件被更改,包括 62 次插入66 次删除
  1. 5 5
      daemon/container.go
  2. 3 3
      daemon/daemon.go
  3. 3 3
      daemon/exec.go
  4. 3 3
      graph/pools_test.go
  5. 2 1
      graph/pull_v2.go
  6. 7 7
      graph/tags.go
  7. 17 17
      pkg/broadcaster/buffered.go
  8. 7 12
      pkg/broadcaster/unbuffered.go
  9. 15 15
      pkg/broadcaster/unbuffered_test.go

+ 5 - 5
daemon/container.go

@@ -22,7 +22,7 @@ import (
 	derr "github.com/docker/docker/errors"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/archive"
-	"github.com/docker/docker/pkg/broadcastwriter"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/fileutils"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/mount"
@@ -41,8 +41,8 @@ var (
 )
 
 type streamConfig struct {
-	stdout    *broadcastwriter.BroadcastWriter
-	stderr    *broadcastwriter.BroadcastWriter
+	stdout    *broadcaster.Unbuffered
+	stderr    *broadcaster.Unbuffered
 	stdin     io.ReadCloser
 	stdinPipe io.WriteCloser
 }
@@ -318,13 +318,13 @@ func (streamConfig *streamConfig) StdinPipe() io.WriteCloser {
 
 func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser {
 	reader, writer := io.Pipe()
-	streamConfig.stdout.AddWriter(writer)
+	streamConfig.stdout.Add(writer)
 	return ioutils.NewBufReader(reader)
 }
 
 func (streamConfig *streamConfig) StderrPipe() io.ReadCloser {
 	reader, writer := io.Pipe()
-	streamConfig.stderr.AddWriter(writer)
+	streamConfig.stderr.Add(writer)
 	return ioutils.NewBufReader(reader)
 }
 

+ 3 - 3
daemon/daemon.go

@@ -32,7 +32,7 @@ import (
 	"github.com/docker/docker/graph"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/pkg/archive"
-	"github.com/docker/docker/pkg/broadcastwriter"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/discovery"
 	"github.com/docker/docker/pkg/fileutils"
 	"github.com/docker/docker/pkg/graphdb"
@@ -194,8 +194,8 @@ func (daemon *Daemon) Register(container *Container) error {
 	container.daemon = daemon
 
 	// Attach to stdout and stderr
-	container.stderr = broadcastwriter.New()
-	container.stdout = broadcastwriter.New()
+	container.stderr = new(broadcaster.Unbuffered)
+	container.stdout = new(broadcaster.Unbuffered)
 	// Attach to stdin
 	if container.Config.OpenStdin {
 		container.stdin, container.stdinPipe = io.Pipe()

+ 3 - 3
daemon/exec.go

@@ -10,7 +10,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/execdriver"
 	derr "github.com/docker/docker/errors"
-	"github.com/docker/docker/pkg/broadcastwriter"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/stringid"
@@ -233,8 +233,8 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
 		cStderr = stderr
 	}
 
-	ec.streamConfig.stderr = broadcastwriter.New()
-	ec.streamConfig.stdout = broadcastwriter.New()
+	ec.streamConfig.stderr = new(broadcaster.Unbuffered)
+	ec.streamConfig.stdout = new(broadcaster.Unbuffered)
 	// Attach to stdin
 	if ec.OpenStdin {
 		ec.streamConfig.stdin, ec.streamConfig.stdinPipe = io.Pipe()

+ 3 - 3
graph/pools_test.go

@@ -3,7 +3,7 @@ package graph
 import (
 	"testing"
 
-	"github.com/docker/docker/pkg/progressreader"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/reexec"
 )
 
@@ -13,8 +13,8 @@ func init() {
 
 func TestPools(t *testing.T) {
 	s := &TagStore{
-		pullingPool: make(map[string]*progressreader.Broadcaster),
-		pushingPool: make(map[string]*progressreader.Broadcaster),
+		pullingPool: make(map[string]*broadcaster.Buffered),
+		pushingPool: make(map[string]*broadcaster.Buffered),
 	}
 
 	if _, found := s.poolAdd("pull", "test1"); found {

+ 2 - 1
graph/pull_v2.go

@@ -11,6 +11,7 @@ import (
 	"github.com/docker/distribution/digest"
 	"github.com/docker/distribution/manifest"
 	"github.com/docker/docker/image"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/streamformatter"
 	"github.com/docker/docker/pkg/stringid"
@@ -110,7 +111,7 @@ type downloadInfo struct {
 	size        int64
 	err         chan error
 	poolKey     string
-	broadcaster *progressreader.Broadcaster
+	broadcaster *broadcaster.Buffered
 }
 
 type errVerification struct{}

+ 7 - 7
graph/tags.go

@@ -16,8 +16,8 @@ import (
 	"github.com/docker/docker/daemon/events"
 	"github.com/docker/docker/graph/tags"
 	"github.com/docker/docker/image"
+	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/parsers"
-	"github.com/docker/docker/pkg/progressreader"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/registry"
 	"github.com/docker/docker/trust"
@@ -37,8 +37,8 @@ type TagStore struct {
 	sync.Mutex
 	// FIXME: move push/pull-related fields
 	// to a helper type
-	pullingPool     map[string]*progressreader.Broadcaster
-	pushingPool     map[string]*progressreader.Broadcaster
+	pullingPool     map[string]*broadcaster.Buffered
+	pushingPool     map[string]*broadcaster.Buffered
 	registryService *registry.Service
 	eventsService   *events.Events
 	trustService    *trust.Store
@@ -94,8 +94,8 @@ func NewTagStore(path string, cfg *TagStoreConfig) (*TagStore, error) {
 		graph:           cfg.Graph,
 		trustKey:        cfg.Key,
 		Repositories:    make(map[string]Repository),
-		pullingPool:     make(map[string]*progressreader.Broadcaster),
-		pushingPool:     make(map[string]*progressreader.Broadcaster),
+		pullingPool:     make(map[string]*broadcaster.Buffered),
+		pushingPool:     make(map[string]*broadcaster.Buffered),
 		registryService: cfg.Registry,
 		eventsService:   cfg.Events,
 		trustService:    cfg.Trust,
@@ -437,7 +437,7 @@ func validateDigest(dgst string) error {
 // poolAdd checks if a push or pull is already running, and returns
 // (broadcaster, true) if a running operation is found. Otherwise, it creates a
 // new one and returns (broadcaster, false).
-func (store *TagStore) poolAdd(kind, key string) (*progressreader.Broadcaster, bool) {
+func (store *TagStore) poolAdd(kind, key string) (*broadcaster.Buffered, bool) {
 	store.Lock()
 	defer store.Unlock()
 
@@ -448,7 +448,7 @@ func (store *TagStore) poolAdd(kind, key string) (*progressreader.Broadcaster, b
 		return p, true
 	}
 
-	broadcaster := progressreader.NewBroadcaster()
+	broadcaster := broadcaster.NewBuffered()
 
 	switch kind {
 	case "pull":

+ 17 - 17
pkg/progressreader/broadcaster.go → pkg/broadcaster/buffered.go

@@ -1,4 +1,4 @@
-package progressreader
+package broadcaster
 
 import (
 	"errors"
@@ -6,10 +6,10 @@ import (
 	"sync"
 )
 
-// Broadcaster keeps track of one or more observers watching the progress
+// Buffered keeps track of one or more observers watching the progress
 // of an operation. For example, if multiple clients are trying to pull an
-// image, they share a Broadcaster for the download operation.
-type Broadcaster struct {
+// image, they share a Buffered struct for the download operation.
+type Buffered struct {
 	sync.Mutex
 	// c is a channel that observers block on, waiting for the operation
 	// to finish.
@@ -29,9 +29,9 @@ type Broadcaster struct {
 	result error
 }
 
-// NewBroadcaster returns a Broadcaster structure
-func NewBroadcaster() *Broadcaster {
-	b := &Broadcaster{
+// NewBuffered returns an initialized Buffered structure.
+func NewBuffered() *Buffered {
+	b := &Buffered{
 		c: make(chan struct{}),
 	}
 	b.cond = sync.NewCond(b)
@@ -39,7 +39,7 @@ func NewBroadcaster() *Broadcaster {
 }
 
 // closed returns true if and only if the broadcaster has been closed
-func (broadcaster *Broadcaster) closed() bool {
+func (broadcaster *Buffered) closed() bool {
 	select {
 	case <-broadcaster.c:
 		return true
@@ -51,7 +51,7 @@ func (broadcaster *Broadcaster) closed() bool {
 // receiveWrites runs as a goroutine so that writes don't block the Write
 // function. It writes the new data in broadcaster.history each time there's
 // activity on the broadcaster.cond condition variable.
-func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) {
+func (broadcaster *Buffered) receiveWrites(observer io.Writer) {
 	n := 0
 
 	broadcaster.Lock()
@@ -98,13 +98,13 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) {
 
 // Write adds data to the history buffer, and also writes it to all current
 // observers.
-func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) {
+func (broadcaster *Buffered) Write(p []byte) (n int, err error) {
 	broadcaster.Lock()
 	defer broadcaster.Unlock()
 
 	// Is the broadcaster closed? If so, the write should fail.
 	if broadcaster.closed() {
-		return 0, errors.New("attempted write to closed progressreader Broadcaster")
+		return 0, errors.New("attempted write to a closed broadcaster.Buffered")
 	}
 
 	// Add message in p to the history slice
@@ -117,15 +117,15 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) {
 	return len(p), nil
 }
 
-// Add adds an observer to the Broadcaster. The new observer receives the
+// Add adds an observer to the broadcaster. The new observer receives the
 // data from the history buffer, and also all subsequent data.
-func (broadcaster *Broadcaster) Add(w io.Writer) error {
+func (broadcaster *Buffered) Add(w io.Writer) error {
 	// The lock is acquired here so that Add can't race with Close
 	broadcaster.Lock()
 	defer broadcaster.Unlock()
 
 	if broadcaster.closed() {
-		return errors.New("attempted to add observer to closed progressreader Broadcaster")
+		return errors.New("attempted to add observer to a closed broadcaster.Buffered")
 	}
 
 	broadcaster.wg.Add(1)
@@ -136,7 +136,7 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error {
 
 // CloseWithError signals to all observers that the operation has finished. Its
 // argument is a result that should be returned to waiters blocking on Wait.
-func (broadcaster *Broadcaster) CloseWithError(result error) {
+func (broadcaster *Buffered) CloseWithError(result error) {
 	broadcaster.Lock()
 	if broadcaster.closed() {
 		broadcaster.Unlock()
@@ -153,14 +153,14 @@ func (broadcaster *Broadcaster) CloseWithError(result error) {
 
 // Close signals to all observers that the operation has finished. It causes
 // all calls to Wait to return nil.
-func (broadcaster *Broadcaster) Close() {
+func (broadcaster *Buffered) Close() {
 	broadcaster.CloseWithError(nil)
 }
 
 // Wait blocks until the operation is marked as completed by the Close method,
 // and all writer goroutines have completed. It returns the argument that was
 // passed to Close.
-func (broadcaster *Broadcaster) Wait() error {
+func (broadcaster *Buffered) Wait() error {
 	<-broadcaster.c
 	broadcaster.wg.Wait()
 	return broadcaster.result

+ 7 - 12
pkg/broadcastwriter/broadcastwriter.go → pkg/broadcaster/unbuffered.go

@@ -1,18 +1,18 @@
-package broadcastwriter
+package broadcaster
 
 import (
 	"io"
 	"sync"
 )
 
-// BroadcastWriter accumulate multiple io.WriteCloser by stream.
-type BroadcastWriter struct {
+// Unbuffered accumulates multiple io.WriteCloser by stream.
+type Unbuffered struct {
 	mu      sync.Mutex
 	writers []io.WriteCloser
 }
 
-// AddWriter adds new io.WriteCloser.
-func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) {
+// Add adds new io.WriteCloser.
+func (w *Unbuffered) Add(writer io.WriteCloser) {
 	w.mu.Lock()
 	w.writers = append(w.writers, writer)
 	w.mu.Unlock()
@@ -20,7 +20,7 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) {
 
 // Write writes bytes to all writers. Failed writers will be evicted during
 // this call.
-func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
+func (w *Unbuffered) Write(p []byte) (n int, err error) {
 	w.mu.Lock()
 	var evict []int
 	for i, sw := range w.writers {
@@ -38,7 +38,7 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
 
 // Clean closes and removes all writers. Last non-eol-terminated part of data
 // will be saved.
-func (w *BroadcastWriter) Clean() error {
+func (w *Unbuffered) Clean() error {
 	w.mu.Lock()
 	for _, sw := range w.writers {
 		sw.Close()
@@ -47,8 +47,3 @@ func (w *BroadcastWriter) Clean() error {
 	w.mu.Unlock()
 	return nil
 }
-
-// New creates a new BroadcastWriter.
-func New() *BroadcastWriter {
-	return &BroadcastWriter{}
-}

+ 15 - 15
pkg/broadcastwriter/broadcastwriter_test.go → pkg/broadcaster/unbuffered_test.go

@@ -1,4 +1,4 @@
-package broadcastwriter
+package broadcaster
 
 import (
 	"bytes"
@@ -28,14 +28,14 @@ func (dw *dummyWriter) Close() error {
 	return nil
 }
 
-func TestBroadcastWriter(t *testing.T) {
-	writer := New()
+func TestUnbuffered(t *testing.T) {
+	writer := new(Unbuffered)
 
 	// Test 1: Both bufferA and bufferB should contain "foo"
 	bufferA := &dummyWriter{}
-	writer.AddWriter(bufferA)
+	writer.Add(bufferA)
 	bufferB := &dummyWriter{}
-	writer.AddWriter(bufferB)
+	writer.Add(bufferB)
 	writer.Write([]byte("foo"))
 
 	if bufferA.String() != "foo" {
@@ -49,7 +49,7 @@ func TestBroadcastWriter(t *testing.T) {
 	// Test2: bufferA and bufferB should contain "foobar",
 	// while bufferC should only contain "bar"
 	bufferC := &dummyWriter{}
-	writer.AddWriter(bufferC)
+	writer.Add(bufferC)
 	writer.Write([]byte("bar"))
 
 	if bufferA.String() != "foobar" {
@@ -87,7 +87,7 @@ func TestBroadcastWriter(t *testing.T) {
 	bufferB.failOnWrite = true
 	bufferC.failOnWrite = true
 	bufferD := &dummyWriter{}
-	writer.AddWriter(bufferD)
+	writer.Add(bufferD)
 	writer.Write([]byte("yo"))
 	writer.Write([]byte("ink"))
 	if strings.Contains(bufferB.String(), "yoink") {
@@ -114,24 +114,24 @@ func (d devNullCloser) Write(buf []byte) (int, error) {
 }
 
 // This test checks for races. It is only useful when run with the race detector.
-func TestRaceBroadcastWriter(t *testing.T) {
-	writer := New()
+func TestRaceUnbuffered(t *testing.T) {
+	writer := new(Unbuffered)
 	c := make(chan bool)
 	go func() {
-		writer.AddWriter(devNullCloser(0))
+		writer.Add(devNullCloser(0))
 		c <- true
 	}()
 	writer.Write([]byte("hello"))
 	<-c
 }
 
-func BenchmarkBroadcastWriter(b *testing.B) {
-	writer := New()
+func BenchmarkUnbuffered(b *testing.B) {
+	writer := new(Unbuffered)
 	setUpWriter := func() {
 		for i := 0; i < 100; i++ {
-			writer.AddWriter(devNullCloser(0))
-			writer.AddWriter(devNullCloser(0))
-			writer.AddWriter(devNullCloser(0))
+			writer.Add(devNullCloser(0))
+			writer.Add(devNullCloser(0))
+			writer.Add(devNullCloser(0))
 		}
 	}
 	testLine := "Line that thinks that it is log line from docker"