Browse Source

daemon: fix hanging attaches on initial start failures

Attach can hang forever if there is no data to send. This PR adds notification
of Attach goroutine about container stop.

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Alexander Morozov 9 years ago
parent
commit
7bb815e296
4 changed files with 127 additions and 37 deletions
  1. 83 36
      container/container.go
  2. 3 1
      daemon/exec.go
  3. 1 0
      daemon/start.go
  4. 40 0
      integration-cli/docker_cli_run_test.go

+ 83 - 36
container/container.go

@@ -10,6 +10,8 @@ import (
 	"syscall"
 	"syscall"
 	"time"
 	"time"
 
 
+	"golang.org/x/net/context"
+
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/execdriver"
 	"github.com/docker/docker/daemon/execdriver"
@@ -62,8 +64,9 @@ type CommonContainer struct {
 	monitor                *containerMonitor
 	monitor                *containerMonitor
 	ExecCommands           *exec.Store `json:"-"`
 	ExecCommands           *exec.Store `json:"-"`
 	// logDriver for closing
 	// logDriver for closing
-	LogDriver logger.Logger  `json:"-"`
-	LogCopier *logger.Copier `json:"-"`
+	LogDriver     logger.Logger  `json:"-"`
+	LogCopier     *logger.Copier `json:"-"`
+	attachContext *attachContext
 }
 }
 
 
 // NewBaseContainer creates a new container with its
 // NewBaseContainer creates a new container with its
@@ -71,12 +74,13 @@ type CommonContainer struct {
 func NewBaseContainer(id, root string) *Container {
 func NewBaseContainer(id, root string) *Container {
 	return &Container{
 	return &Container{
 		CommonContainer: CommonContainer{
 		CommonContainer: CommonContainer{
-			ID:           id,
-			State:        NewState(),
-			ExecCommands: exec.NewStore(),
-			Root:         root,
-			MountPoints:  make(map[string]*volume.MountPoint),
-			StreamConfig: runconfig.NewStreamConfig(),
+			ID:            id,
+			State:         NewState(),
+			ExecCommands:  exec.NewStore(),
+			Root:          root,
+			MountPoints:   make(map[string]*volume.MountPoint),
+			StreamConfig:  runconfig.NewStreamConfig(),
+			attachContext: &attachContext{},
 		},
 		},
 	}
 	}
 }
 }
@@ -345,12 +349,13 @@ func (container *Container) GetExecIDs() []string {
 // Attach connects to the container's TTY, delegating to standard
 // Attach connects to the container's TTY, delegating to standard
 // streams or websockets depending on the configuration.
 // streams or websockets depending on the configuration.
 func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
 func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
-	return AttachStreams(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys)
+	ctx := container.InitAttachContext()
+	return AttachStreams(ctx, container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys)
 }
 }
 
 
 // AttachStreams connects streams to a TTY.
 // AttachStreams connects streams to a TTY.
 // Used by exec too. Should this move somewhere else?
 // Used by exec too. Should this move somewhere else?
-func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
+func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
 	var (
 	var (
 		cStdout, cStderr io.ReadCloser
 		cStdout, cStderr io.ReadCloser
 		cStdin           io.WriteCloser
 		cStdin           io.WriteCloser
@@ -379,21 +384,6 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
 			return
 			return
 		}
 		}
 		logrus.Debugf("attach: stdin: begin")
 		logrus.Debugf("attach: stdin: begin")
-		defer func() {
-			if stdinOnce && !tty {
-				cStdin.Close()
-			} else {
-				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
-				if cStdout != nil {
-					cStdout.Close()
-				}
-				if cStderr != nil {
-					cStderr.Close()
-				}
-			}
-			wg.Done()
-			logrus.Debugf("attach: stdin: end")
-		}()
 
 
 		var err error
 		var err error
 		if tty {
 		if tty {
@@ -408,23 +398,26 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
 		if err != nil {
 		if err != nil {
 			logrus.Errorf("attach: stdin: %s", err)
 			logrus.Errorf("attach: stdin: %s", err)
 			errors <- err
 			errors <- err
-			return
 		}
 		}
+		if stdinOnce && !tty {
+			cStdin.Close()
+		} else {
+			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
+			if cStdout != nil {
+				cStdout.Close()
+			}
+			if cStderr != nil {
+				cStderr.Close()
+			}
+		}
+		logrus.Debugf("attach: stdin: end")
+		wg.Done()
 	}()
 	}()
 
 
 	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
 	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
 		if stream == nil {
 		if stream == nil {
 			return
 			return
 		}
 		}
-		defer func() {
-			// Make sure stdin gets closed
-			if stdin != nil {
-				stdin.Close()
-			}
-			streamPipe.Close()
-			wg.Done()
-			logrus.Debugf("attach: %s: end", name)
-		}()
 
 
 		logrus.Debugf("attach: %s: begin", name)
 		logrus.Debugf("attach: %s: begin", name)
 		_, err := io.Copy(stream, streamPipe)
 		_, err := io.Copy(stream, streamPipe)
@@ -435,13 +428,39 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
 			logrus.Errorf("attach: %s: %v", name, err)
 			logrus.Errorf("attach: %s: %v", name, err)
 			errors <- err
 			errors <- err
 		}
 		}
+		// Make sure stdin gets closed
+		if stdin != nil {
+			stdin.Close()
+		}
+		streamPipe.Close()
+		logrus.Debugf("attach: %s: end", name)
+		wg.Done()
 	}
 	}
 
 
 	go attachStream("stdout", stdout, cStdout)
 	go attachStream("stdout", stdout, cStdout)
 	go attachStream("stderr", stderr, cStderr)
 	go attachStream("stderr", stderr, cStderr)
 
 
 	return promise.Go(func() error {
 	return promise.Go(func() error {
-		wg.Wait()
+		done := make(chan struct{})
+		go func() {
+			wg.Wait()
+			close(done)
+		}()
+		select {
+		case <-done:
+		case <-ctx.Done():
+			// close all pipes
+			if cStdin != nil {
+				cStdin.Close()
+			}
+			if cStdout != nil {
+				cStdout.Close()
+			}
+			if cStderr != nil {
+				cStderr.Close()
+			}
+			<-done
+		}
 		close(errors)
 		close(errors)
 		for err := range errors {
 		for err := range errors {
 			if err != nil {
 			if err != nil {
@@ -597,3 +616,31 @@ func (container *Container) UpdateMonitor(restartPolicy containertypes.RestartPo
 	}
 	}
 	monitor.mux.Unlock()
 	monitor.mux.Unlock()
 }
 }
+
+type attachContext struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+	mu     sync.Mutex
+}
+
+// InitAttachContext initialize or returns existing context for attach calls to
+// track container liveness.
+func (container *Container) InitAttachContext() context.Context {
+	container.attachContext.mu.Lock()
+	defer container.attachContext.mu.Unlock()
+	if container.attachContext.ctx == nil {
+		container.attachContext.ctx, container.attachContext.cancel = context.WithCancel(context.Background())
+	}
+	return container.attachContext.ctx
+}
+
+// CancelAttachContext cancel attach context. All attach calls should detach
+// after this call.
+func (container *Container) CancelAttachContext() {
+	container.attachContext.mu.Lock()
+	if container.attachContext.ctx != nil {
+		container.attachContext.cancel()
+		container.attachContext.ctx = nil
+	}
+	container.attachContext.mu.Unlock()
+}

+ 3 - 1
daemon/exec.go

@@ -6,6 +6,8 @@ import (
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
+	"golang.org/x/net/context"
+
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/exec"
@@ -181,7 +183,7 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
 		ec.NewNopInputPipe()
 		ec.NewNopInputPipe()
 	}
 	}
 
 
-	attachErr := container.AttachStreams(ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
+	attachErr := container.AttachStreams(context.Background(), ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
 
 
 	execErr := make(chan error)
 	execErr := make(chan error)
 
 

+ 1 - 0
daemon/start.go

@@ -175,4 +175,5 @@ func (daemon *Daemon) Cleanup(container *container.Container) {
 	if err := container.UnmountVolumes(false, daemon.LogVolumeEvent); err != nil {
 	if err := container.UnmountVolumes(false, daemon.LogVolumeEvent); err != nil {
 		logrus.Warnf("%s cleanup: Failed to umount volumes: %v", container.ID, err)
 		logrus.Warnf("%s cleanup: Failed to umount volumes: %v", container.ID, err)
 	}
 	}
+	container.CancelAttachContext()
 }
 }

+ 40 - 0
integration-cli/docker_cli_run_test.go

@@ -3,6 +3,7 @@ package main
 import (
 import (
 	"bufio"
 	"bufio"
 	"bytes"
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
@@ -4187,3 +4188,42 @@ func (s *DockerSuite) TestRunNamedVolumesFromNotRemoved(c *check.C) {
 	out, _ := dockerCmd(c, "volume", "ls", "-q")
 	out, _ := dockerCmd(c, "volume", "ls", "-q")
 	c.Assert(strings.TrimSpace(out), checker.Equals, "test")
 	c.Assert(strings.TrimSpace(out), checker.Equals, "test")
 }
 }
+
+func (s *DockerSuite) TestRunAttachFailedNoLeak(c *check.C) {
+	type info struct {
+		NGoroutines int
+	}
+	getNGoroutines := func() int {
+		var i info
+		status, b, err := sockRequest("GET", "/info", nil)
+		c.Assert(err, checker.IsNil)
+		c.Assert(status, checker.Equals, 200)
+		c.Assert(json.Unmarshal(b, &i), checker.IsNil)
+		return i.NGoroutines
+	}
+	nroutines := getNGoroutines()
+
+	runSleepingContainer(c, "--name=test", "-p", "8000:8000")
+
+	out, _, err := dockerCmdWithError("run", "-p", "8000:8000", "busybox", "true")
+	c.Assert(err, checker.NotNil)
+	// check for windows error as well
+	c.Assert(strings.Contains(string(out), "port is already allocated") || strings.Contains(string(out), "were not connected because a duplicate name exists"), checker.Equals, true, check.Commentf("Output: %s", out))
+	dockerCmd(c, "rm", "-f", "test")
+
+	// NGoroutines is not updated right away, so we need to wait before failing
+	t := time.After(30 * time.Second)
+	for {
+		select {
+		case <-t:
+			n := getNGoroutines()
+			c.Assert(n <= nroutines, checker.Equals, true, check.Commentf("leaked goroutines: expected less than or equal to %d, got: %d", nroutines, n))
+
+		default:
+			if n := getNGoroutines(); n <= nroutines {
+				return
+			}
+			time.Sleep(200 * time.Millisecond)
+		}
+	}
+}