Merge remote-tracking branch 'robryk/writebroadcaster-stuff'

This commit is contained in:
Solomon Hykes 2013-04-02 20:35:13 -07:00
commit 22d1622926
3 changed files with 46 additions and 29 deletions

View file

@ -171,14 +171,14 @@ func (container *Container) startPty() error {
// Copy the PTYs to our broadcasters
go func() {
defer container.stdout.Close()
defer container.stdout.CloseWriters()
Debugf("[startPty] Begin of stdout pipe")
io.Copy(container.stdout, stdoutMaster)
Debugf("[startPty] End of stdout pipe")
}()
go func() {
defer container.stderr.Close()
defer container.stderr.CloseWriters()
Debugf("[startPty] Begin of stderr pipe")
io.Copy(container.stderr, stderrMaster)
Debugf("[startPty] End of stderr pipe")
@ -391,10 +391,10 @@ func (container *Container) monitor() {
Debugf("%s: Error close stdin: %s", container.Id, err)
}
}
if err := container.stdout.Close(); err != nil {
if err := container.stdout.CloseWriters(); err != nil {
Debugf("%s: Error close stdout: %s", container.Id, err)
}
if err := container.stderr.Close(); err != nil {
if err := container.stderr.CloseWriters(); err != nil {
Debugf("%s: Error close stderr: %s", container.Id, err)
}

View file

@ -2,7 +2,6 @@ package docker
import (
"bytes"
"container/list"
"errors"
"fmt"
"github.com/dotcloud/docker/rcli"
@ -215,52 +214,48 @@ func (r *bufReader) Close() error {
}
type writeBroadcaster struct {
writers *list.List
mu sync.Mutex
writers map[io.WriteCloser]struct{}
}
func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) {
w.writers.PushBack(writer)
w.mu.Lock()
w.writers[writer] = struct{}{}
w.mu.Unlock()
}
// FIXME: Is that function used?
// FIXME: This relies on the concrete writer type used having equality operator
func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) {
for e := w.writers.Front(); e != nil; e = e.Next() {
v := e.Value.(io.Writer)
if v == writer {
w.writers.Remove(e)
return
}
}
w.mu.Lock()
delete(w.writers, writer)
w.mu.Unlock()
}
func (w *writeBroadcaster) Write(p []byte) (n int, err error) {
failed := []*list.Element{}
for e := w.writers.Front(); e != nil; e = e.Next() {
writer := e.Value.(io.Writer)
w.mu.Lock()
defer w.mu.Unlock()
for writer := range w.writers {
if n, err := writer.Write(p); err != nil || n != len(p) {
// On error, evict the writer
failed = append(failed, e)
delete(w.writers, writer)
}
}
// We cannot remove while iterating, so it has to be done in
// a separate step
for _, e := range failed {
w.writers.Remove(e)
}
return len(p), nil
}
func (w *writeBroadcaster) Close() error {
for e := w.writers.Front(); e != nil; e = e.Next() {
writer := e.Value.(io.WriteCloser)
func (w *writeBroadcaster) CloseWriters() error {
w.mu.Lock()
defer w.mu.Unlock()
for writer := range w.writers {
writer.Close()
}
w.writers.Init()
w.writers = make(map[io.WriteCloser]struct{})
return nil
}
func newWriteBroadcaster() *writeBroadcaster {
return &writeBroadcaster{list.New()}
return &writeBroadcaster{writers: make(map[io.WriteCloser]struct{})}
}
func getTotalUsedFds() int {

View file

@ -122,7 +122,29 @@ func TestWriteBroadcaster(t *testing.T) {
t.Errorf("Buffer contains %v", bufferC.String())
}
writer.Close()
writer.CloseWriters()
}
type devNullCloser int
func (d devNullCloser) Close() error {
return nil
}
func (d devNullCloser) Write(buf []byte) (int, error) {
return len(buf), nil
}
// This test checks for races. It is only useful when run with the race detector.
func TestRaceWriteBroadcaster(t *testing.T) {
writer := newWriteBroadcaster()
c := make(chan bool)
go func() {
writer.AddWriter(devNullCloser(0))
c <- true
}()
writer.Write([]byte("hello"))
<-c
}
// Test the behavior of TruncIndex, an index for querying IDs from a non-conflicting prefix.