diff --git a/container/container.go b/container/container.go index 5d3e3c7cbc..d0ae38fc23 100644 --- a/container/container.go +++ b/container/container.go @@ -10,6 +10,8 @@ import ( "syscall" "time" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/execdriver" @@ -62,8 +64,9 @@ type CommonContainer struct { monitor *containerMonitor ExecCommands *exec.Store `json:"-"` // logDriver for closing - LogDriver logger.Logger `json:"-"` - LogCopier *logger.Copier `json:"-"` + LogDriver logger.Logger `json:"-"` + LogCopier *logger.Copier `json:"-"` + attachContext *attachContext } // NewBaseContainer creates a new container with its @@ -71,12 +74,13 @@ type CommonContainer struct { func NewBaseContainer(id, root string) *Container { return &Container{ CommonContainer: CommonContainer{ - ID: id, - State: NewState(), - ExecCommands: exec.NewStore(), - Root: root, - MountPoints: make(map[string]*volume.MountPoint), - StreamConfig: runconfig.NewStreamConfig(), + ID: id, + State: NewState(), + ExecCommands: exec.NewStore(), + Root: root, + MountPoints: make(map[string]*volume.MountPoint), + StreamConfig: runconfig.NewStreamConfig(), + attachContext: &attachContext{}, }, } } @@ -345,12 +349,13 @@ func (container *Container) GetExecIDs() []string { // Attach connects to the container's TTY, delegating to standard // streams or websockets depending on the configuration. func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { - return AttachStreams(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys) + ctx := container.InitAttachContext() + return AttachStreams(ctx, container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys) } // AttachStreams connects streams to a TTY. // Used by exec too. Should this move somewhere else? -func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { +func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { var ( cStdout, cStderr io.ReadCloser cStdin io.WriteCloser @@ -379,21 +384,6 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t return } logrus.Debugf("attach: stdin: begin") - defer func() { - if stdinOnce && !tty { - cStdin.Close() - } else { - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - } - wg.Done() - logrus.Debugf("attach: stdin: end") - }() var err error if tty { @@ -408,23 +398,26 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t if err != nil { logrus.Errorf("attach: stdin: %s", err) errors <- err - return } + if stdinOnce && !tty { + cStdin.Close() + } else { + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } + } + logrus.Debugf("attach: stdin: end") + wg.Done() }() attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { if stream == nil { return } - defer func() { - // Make sure stdin gets closed - if stdin != nil { - stdin.Close() - } - streamPipe.Close() - wg.Done() - logrus.Debugf("attach: %s: end", name) - }() logrus.Debugf("attach: %s: begin", name) _, err := io.Copy(stream, streamPipe) @@ -435,13 +428,39 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t logrus.Errorf("attach: %s: %v", name, err) errors <- err } + // Make sure stdin gets closed + if stdin != nil { + stdin.Close() + } + streamPipe.Close() + logrus.Debugf("attach: %s: end", name) + wg.Done() } go attachStream("stdout", stdout, cStdout) go attachStream("stderr", stderr, cStderr) return promise.Go(func() error { - wg.Wait() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + // close all pipes + if cStdin != nil { + cStdin.Close() + } + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } + <-done + } close(errors) for err := range errors { if err != nil { @@ -597,3 +616,31 @@ func (container *Container) UpdateMonitor(restartPolicy containertypes.RestartPo } monitor.mux.Unlock() } + +type attachContext struct { + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex +} + +// InitAttachContext initialize or returns existing context for attach calls to +// track container liveness. +func (container *Container) InitAttachContext() context.Context { + container.attachContext.mu.Lock() + defer container.attachContext.mu.Unlock() + if container.attachContext.ctx == nil { + container.attachContext.ctx, container.attachContext.cancel = context.WithCancel(context.Background()) + } + return container.attachContext.ctx +} + +// CancelAttachContext cancel attach context. All attach calls should detach +// after this call. +func (container *Container) CancelAttachContext() { + container.attachContext.mu.Lock() + if container.attachContext.ctx != nil { + container.attachContext.cancel() + container.attachContext.ctx = nil + } + container.attachContext.mu.Unlock() +} diff --git a/daemon/exec.go b/daemon/exec.go index 56798a5979..c3a1dd8500 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/docker/docker/container" "github.com/docker/docker/daemon/exec" @@ -181,7 +183,7 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io. ec.NewNopInputPipe() } - attachErr := container.AttachStreams(ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) + attachErr := container.AttachStreams(context.Background(), ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) execErr := make(chan error) diff --git a/daemon/start.go b/daemon/start.go index 532c6b48db..757b179a1a 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -175,4 +175,5 @@ func (daemon *Daemon) Cleanup(container *container.Container) { if err := container.UnmountVolumes(false, daemon.LogVolumeEvent); err != nil { logrus.Warnf("%s cleanup: Failed to umount volumes: %v", container.ID, err) } + container.CancelAttachContext() } diff --git a/integration-cli/docker_cli_run_test.go b/integration-cli/docker_cli_run_test.go index 18ee275bbe..8900422d96 100644 --- a/integration-cli/docker_cli_run_test.go +++ b/integration-cli/docker_cli_run_test.go @@ -3,6 +3,7 @@ package main import ( "bufio" "bytes" + "encoding/json" "fmt" "io/ioutil" "net" @@ -4187,3 +4188,42 @@ func (s *DockerSuite) TestRunNamedVolumesFromNotRemoved(c *check.C) { out, _ := dockerCmd(c, "volume", "ls", "-q") c.Assert(strings.TrimSpace(out), checker.Equals, "test") } + +func (s *DockerSuite) TestRunAttachFailedNoLeak(c *check.C) { + type info struct { + NGoroutines int + } + getNGoroutines := func() int { + var i info + status, b, err := sockRequest("GET", "/info", nil) + c.Assert(err, checker.IsNil) + c.Assert(status, checker.Equals, 200) + c.Assert(json.Unmarshal(b, &i), checker.IsNil) + return i.NGoroutines + } + nroutines := getNGoroutines() + + runSleepingContainer(c, "--name=test", "-p", "8000:8000") + + out, _, err := dockerCmdWithError("run", "-p", "8000:8000", "busybox", "true") + c.Assert(err, checker.NotNil) + // check for windows error as well + c.Assert(strings.Contains(string(out), "port is already allocated") || strings.Contains(string(out), "were not connected because a duplicate name exists"), checker.Equals, true, check.Commentf("Output: %s", out)) + dockerCmd(c, "rm", "-f", "test") + + // NGoroutines is not updated right away, so we need to wait before failing + t := time.After(30 * time.Second) + for { + select { + case <-t: + n := getNGoroutines() + c.Assert(n <= nroutines, checker.Equals, true, check.Commentf("leaked goroutines: expected less than or equal to %d, got: %d", nroutines, n)) + + default: + if n := getNGoroutines(); n <= nroutines { + return + } + time.Sleep(200 * time.Millisecond) + } + } +}