Browse Source

Merge pull request #30446 from jim-minter/attachrace

Resolve race conditions in attach API call
Vincent Demeester 8 years ago
parent
commit
caa78da1c6
4 changed files with 98 additions and 91 deletions
  1. 0 13
      container/container.go
  2. 43 25
      container/stream/attach.go
  3. 49 51
      daemon/attach.go
  4. 6 2
      daemon/exec.go

+ 0 - 13
container/container.go

@@ -365,19 +365,6 @@ func (container *Container) GetExecIDs() []string {
 	return container.ExecCommands.List()
 	return container.ExecCommands.List()
 }
 }
 
 
-// Attach connects to the container's stdio to the client streams
-func (container *Container) Attach(cfg *stream.AttachConfig) chan error {
-	ctx := container.InitAttachContext()
-
-	cfg.TTY = container.Config.Tty
-	if !container.Config.OpenStdin {
-		cfg.Stdin = nil
-	}
-	cfg.CloseStdin = cfg.Stdin != nil && container.Config.StdinOnce
-
-	return container.StreamConfig.Attach(ctx, cfg)
-}
-
 // ShouldRestart decides whether the daemon should restart the container or not.
 // ShouldRestart decides whether the daemon should restart the container or not.
 // This is based on the container's restart policy.
 // This is based on the container's restart policy.
 func (container *Container) ShouldRestart() bool {
 func (container *Container) ShouldRestart() bool {

+ 43 - 25
container/stream/attach.go

@@ -33,33 +33,51 @@ type AttachConfig struct {
 	// For example, this would close the attached container's stdin.
 	// For example, this would close the attached container's stdin.
 	CloseStdin bool
 	CloseStdin bool
 
 
+	// UseStd* indicate whether the client has requested to be connected to the
+	// given stream or not.  These flags are used instead of checking Std* != nil
+	// at points before the client streams Std* are wired up.
+	UseStdin, UseStdout, UseStderr bool
+
+	// CStd* are the streams directly connected to the container
+	CStdin           io.WriteCloser
+	CStdout, CStderr io.ReadCloser
+
 	// Provide client streams to wire up to
 	// Provide client streams to wire up to
 	Stdin          io.ReadCloser
 	Stdin          io.ReadCloser
 	Stdout, Stderr io.Writer
 	Stdout, Stderr io.Writer
 }
 }
 
 
-// Attach attaches the stream config to the streams specified in
-// the AttachOptions
-func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
+// AttachStreams attaches the container's streams to the AttachConfig
+func (c *Config) AttachStreams(cfg *AttachConfig) {
+	if cfg.UseStdin {
+		cfg.CStdin = c.StdinPipe()
+	}
+
+	if cfg.UseStdout {
+		cfg.CStdout = c.StdoutPipe()
+	}
+
+	if cfg.UseStderr {
+		cfg.CStderr = c.StderrPipe()
+	}
+}
+
+// CopyStreams starts goroutines to copy data in and out to/from the container
+func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error {
 	var (
 	var (
-		cStdout, cStderr io.ReadCloser
-		cStdin           io.WriteCloser
-		wg               sync.WaitGroup
-		errors           = make(chan error, 3)
+		wg     sync.WaitGroup
+		errors = make(chan error, 3)
 	)
 	)
 
 
 	if cfg.Stdin != nil {
 	if cfg.Stdin != nil {
-		cStdin = c.StdinPipe()
 		wg.Add(1)
 		wg.Add(1)
 	}
 	}
 
 
 	if cfg.Stdout != nil {
 	if cfg.Stdout != nil {
-		cStdout = c.StdoutPipe()
 		wg.Add(1)
 		wg.Add(1)
 	}
 	}
 
 
 	if cfg.Stderr != nil {
 	if cfg.Stderr != nil {
-		cStderr = c.StderrPipe()
 		wg.Add(1)
 		wg.Add(1)
 	}
 	}
 
 
@@ -72,9 +90,9 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
 
 
 		var err error
 		var err error
 		if cfg.TTY {
 		if cfg.TTY {
-			_, err = copyEscapable(cStdin, cfg.Stdin, cfg.DetachKeys)
+			_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
 		} else {
 		} else {
-			_, err = io.Copy(cStdin, cfg.Stdin)
+			_, err = io.Copy(cfg.CStdin, cfg.Stdin)
 		}
 		}
 		if err == io.ErrClosedPipe {
 		if err == io.ErrClosedPipe {
 			err = nil
 			err = nil
@@ -84,14 +102,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
 			errors <- err
 			errors <- err
 		}
 		}
 		if cfg.CloseStdin && !cfg.TTY {
 		if cfg.CloseStdin && !cfg.TTY {
-			cStdin.Close()
+			cfg.CStdin.Close()
 		} else {
 		} else {
 			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
 			// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
-			if cStdout != nil {
-				cStdout.Close()
+			if cfg.CStdout != nil {
+				cfg.CStdout.Close()
 			}
 			}
-			if cStderr != nil {
-				cStderr.Close()
+			if cfg.CStderr != nil {
+				cfg.CStderr.Close()
 			}
 			}
 		}
 		}
 		logrus.Debug("attach: stdin: end")
 		logrus.Debug("attach: stdin: end")
@@ -121,8 +139,8 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
 		wg.Done()
 		wg.Done()
 	}
 	}
 
 
-	go attachStream("stdout", cfg.Stdout, cStdout)
-	go attachStream("stderr", cfg.Stderr, cStderr)
+	go attachStream("stdout", cfg.Stdout, cfg.CStdout)
+	go attachStream("stderr", cfg.Stderr, cfg.CStderr)
 
 
 	return promise.Go(func() error {
 	return promise.Go(func() error {
 		done := make(chan struct{})
 		done := make(chan struct{})
@@ -134,14 +152,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error {
 		case <-done:
 		case <-done:
 		case <-ctx.Done():
 		case <-ctx.Done():
 			// close all pipes
 			// close all pipes
-			if cStdin != nil {
-				cStdin.Close()
+			if cfg.CStdin != nil {
+				cfg.CStdin.Close()
 			}
 			}
-			if cStdout != nil {
-				cStdout.Close()
+			if cfg.CStdout != nil {
+				cfg.CStdout.Close()
 			}
 			}
-			if cStderr != nil {
-				cStderr.Close()
+			if cfg.CStderr != nil {
+				cfg.CStderr.Close()
 			}
 			}
 			<-done
 			<-done
 		}
 		}

+ 49 - 51
daemon/attach.go

@@ -15,14 +15,6 @@ import (
 	"github.com/docker/docker/pkg/term"
 	"github.com/docker/docker/pkg/term"
 )
 )
 
 
-type containerAttachConfig struct {
-	detachKeys     []byte
-	stdin          io.ReadCloser
-	stdout, stderr io.Writer
-	showHistory    bool
-	stream         bool
-}
-
 // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig.
 // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig.
 func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error {
 func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error {
 	keys := []byte{}
 	keys := []byte{}
@@ -43,6 +35,16 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
 		return errors.NewRequestConflictError(err)
 		return errors.NewRequestConflictError(err)
 	}
 	}
 
 
+	cfg := stream.AttachConfig{
+		UseStdin:   c.UseStdin && container.Config.OpenStdin,
+		UseStdout:  c.UseStdout,
+		UseStderr:  c.UseStderr,
+		TTY:        container.Config.Tty,
+		CloseStdin: container.Config.StdinOnce,
+		DetachKeys: keys,
+	}
+	container.StreamConfig.AttachStreams(&cfg)
+
 	inStream, outStream, errStream, err := c.GetStreams()
 	inStream, outStream, errStream, err := c.GetStreams()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -54,48 +56,51 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
 	}
 	}
 
 
-	var cfg containerAttachConfig
-
-	if c.UseStdin {
-		cfg.stdin = inStream
+	if cfg.UseStdin {
+		cfg.Stdin = inStream
 	}
 	}
-	if c.UseStdout {
-		cfg.stdout = outStream
+	if cfg.UseStdout {
+		cfg.Stdout = outStream
 	}
 	}
-	if c.UseStderr {
-		cfg.stderr = errStream
+	if cfg.UseStderr {
+		cfg.Stderr = errStream
 	}
 	}
 
 
-	cfg.showHistory = c.Logs
-	cfg.stream = c.Stream
-	cfg.detachKeys = keys
-
-	if err := daemon.containerAttach(container, &cfg); err != nil {
+	if err := daemon.containerAttach(container, &cfg, c.Logs, c.Stream); err != nil {
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
 // ContainerAttachRaw attaches the provided streams to the container's stdio
 // ContainerAttachRaw attaches the provided streams to the container's stdio
-func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error {
+func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool) error {
 	container, err := daemon.GetContainer(prefixOrName)
 	container, err := daemon.GetContainer(prefixOrName)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	cfg := &containerAttachConfig{
-		stdin:  stdin,
-		stdout: stdout,
-		stderr: stderr,
-		stream: stream,
+	cfg := stream.AttachConfig{
+		UseStdin:   stdin != nil && container.Config.OpenStdin,
+		UseStdout:  stdout != nil,
+		UseStderr:  stderr != nil,
+		TTY:        container.Config.Tty,
+		CloseStdin: container.Config.StdinOnce,
+	}
+	container.StreamConfig.AttachStreams(&cfg)
+	if cfg.UseStdin {
+		cfg.Stdin = stdin
 	}
 	}
-	return daemon.containerAttach(container, cfg)
+	if cfg.UseStdout {
+		cfg.Stdout = stdout
+	}
+	if cfg.UseStderr {
+		cfg.Stderr = stderr
+	}
+
+	return daemon.containerAttach(container, &cfg, false, doStream)
 }
 }
 
 
-func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAttachConfig) error {
-	stdin := cfg.stdin
-	stdout := cfg.stdout
-	stderr := cfg.stderr
-	if cfg.showHistory {
+func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.AttachConfig, logs, doStream bool) error {
+	if logs {
 		logDriver, err := daemon.getLogger(c)
 		logDriver, err := daemon.getLogger(c)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -113,11 +118,11 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
 				if !ok {
 				if !ok {
 					break LogLoop
 					break LogLoop
 				}
 				}
-				if msg.Source == "stdout" && stdout != nil {
-					stdout.Write(msg.Line)
+				if msg.Source == "stdout" && cfg.Stdout != nil {
+					cfg.Stdout.Write(msg.Line)
 				}
 				}
-				if msg.Source == "stderr" && stderr != nil {
-					stderr.Write(msg.Line)
+				if msg.Source == "stderr" && cfg.Stderr != nil {
+					cfg.Stderr.Write(msg.Line)
 				}
 				}
 			case err := <-logs.Err:
 			case err := <-logs.Err:
 				logrus.Errorf("Error streaming logs: %v", err)
 				logrus.Errorf("Error streaming logs: %v", err)
@@ -128,19 +133,18 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
 
 
 	daemon.LogContainerEvent(c, "attach")
 	daemon.LogContainerEvent(c, "attach")
 
 
-	if !cfg.stream {
+	if !doStream {
 		return nil
 		return nil
 	}
 	}
 
 
-	var stdinPipe io.ReadCloser
-	if stdin != nil {
+	if cfg.Stdin != nil {
 		r, w := io.Pipe()
 		r, w := io.Pipe()
-		go func() {
+		go func(stdin io.ReadCloser) {
 			defer w.Close()
 			defer w.Close()
 			defer logrus.Debug("Closing buffered stdin pipe")
 			defer logrus.Debug("Closing buffered stdin pipe")
 			io.Copy(w, stdin)
 			io.Copy(w, stdin)
-		}()
-		stdinPipe = r
+		}(cfg.Stdin)
+		cfg.Stdin = r
 	}
 	}
 
 
 	waitChan := make(chan struct{})
 	waitChan := make(chan struct{})
@@ -154,14 +158,8 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta
 		}()
 		}()
 	}
 	}
 
 
-	aCfg := &stream.AttachConfig{
-		Stdin:      stdinPipe,
-		Stdout:     stdout,
-		Stderr:     stderr,
-		DetachKeys: cfg.detachKeys,
-	}
-
-	err := <-c.Attach(aCfg)
+	ctx := c.InitAttachContext()
+	err := <-c.StreamConfig.CopyStreams(ctx, cfg)
 	if err != nil {
 	if err != nil {
 		if _, ok := err.(stream.DetachError); ok {
 		if _, ok := err.(stream.DetachError); ok {
 			daemon.LogContainerEvent(c, "detach")
 			daemon.LogContainerEvent(c, "detach")

+ 6 - 2
daemon/exec.go

@@ -210,15 +210,19 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
 		return err
 		return err
 	}
 	}
 
 
-	attachConfig := &stream.AttachConfig{
+	attachConfig := stream.AttachConfig{
 		TTY:        ec.Tty,
 		TTY:        ec.Tty,
+		UseStdin:   cStdin != nil,
+		UseStdout:  cStdout != nil,
+		UseStderr:  cStderr != nil,
 		Stdin:      cStdin,
 		Stdin:      cStdin,
 		Stdout:     cStdout,
 		Stdout:     cStdout,
 		Stderr:     cStderr,
 		Stderr:     cStderr,
 		DetachKeys: ec.DetachKeys,
 		DetachKeys: ec.DetachKeys,
 		CloseStdin: true,
 		CloseStdin: true,
 	}
 	}
-	attachErr := ec.StreamConfig.Attach(ctx, attachConfig)
+	ec.StreamConfig.AttachStreams(&attachConfig)
+	attachErr := ec.StreamConfig.CopyStreams(ctx, &attachConfig)
 
 
 	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
 	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
 	if err != nil {
 	if err != nil {