diff --git a/container/container.go b/container/container.go index b004db2f4e..311953685e 100644 --- a/container/container.go +++ b/container/container.go @@ -365,19 +365,6 @@ func (container *Container) GetExecIDs() []string { return container.ExecCommands.List() } -// Attach connects to the container's stdio to the client streams -func (container *Container) Attach(cfg *stream.AttachConfig) chan error { - ctx := container.InitAttachContext() - - cfg.TTY = container.Config.Tty - if !container.Config.OpenStdin { - cfg.Stdin = nil - } - cfg.CloseStdin = cfg.Stdin != nil && container.Config.StdinOnce - - return container.StreamConfig.Attach(ctx, cfg) -} - // ShouldRestart decides whether the daemon should restart the container or not. // This is based on the container's restart policy. func (container *Container) ShouldRestart() bool { diff --git a/container/stream/attach.go b/container/stream/attach.go index 3c5f14378a..c3a630d975 100644 --- a/container/stream/attach.go +++ b/container/stream/attach.go @@ -33,33 +33,51 @@ type AttachConfig struct { // For example, this would close the attached container's stdin. CloseStdin bool + // UseStd* indicate whether the client has requested to be connected to the + // given stream or not. These flags are used instead of checking Std* != nil + // at points before the client streams Std* are wired up. + UseStdin, UseStdout, UseStderr bool + + // CStd* are the streams directly connected to the container + CStdin io.WriteCloser + CStdout, CStderr io.ReadCloser + // Provide client streams to wire up to Stdin io.ReadCloser Stdout, Stderr io.Writer } -// Attach attaches the stream config to the streams specified in -// the AttachOptions -func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { +// AttachStreams attaches the container's streams to the AttachConfig +func (c *Config) AttachStreams(cfg *AttachConfig) { + if cfg.UseStdin { + cfg.CStdin = c.StdinPipe() + } + + if cfg.UseStdout { + cfg.CStdout = c.StdoutPipe() + } + + if cfg.UseStderr { + cfg.CStderr = c.StderrPipe() + } +} + +// CopyStreams starts goroutines to copy data in and out to/from the container +func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error { var ( - cStdout, cStderr io.ReadCloser - cStdin io.WriteCloser - wg sync.WaitGroup - errors = make(chan error, 3) + wg sync.WaitGroup + errors = make(chan error, 3) ) if cfg.Stdin != nil { - cStdin = c.StdinPipe() wg.Add(1) } if cfg.Stdout != nil { - cStdout = c.StdoutPipe() wg.Add(1) } if cfg.Stderr != nil { - cStderr = c.StderrPipe() wg.Add(1) } @@ -72,9 +90,9 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { var err error if cfg.TTY { - _, err = copyEscapable(cStdin, cfg.Stdin, cfg.DetachKeys) + _, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys) } else { - _, err = io.Copy(cStdin, cfg.Stdin) + _, err = io.Copy(cfg.CStdin, cfg.Stdin) } if err == io.ErrClosedPipe { err = nil @@ -84,14 +102,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { errors <- err } if cfg.CloseStdin && !cfg.TTY { - cStdin.Close() + cfg.CStdin.Close() } else { // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - if cStdout != nil { - cStdout.Close() + if cfg.CStdout != nil { + cfg.CStdout.Close() } - if cStderr != nil { - cStderr.Close() + if cfg.CStderr != nil { + cfg.CStderr.Close() } } logrus.Debug("attach: stdin: end") @@ -121,8 +139,8 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { wg.Done() } - go attachStream("stdout", cfg.Stdout, cStdout) - go attachStream("stderr", cfg.Stderr, cStderr) + go attachStream("stdout", cfg.Stdout, cfg.CStdout) + go attachStream("stderr", cfg.Stderr, cfg.CStderr) return promise.Go(func() error { done := make(chan struct{}) @@ -134,14 +152,14 @@ func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { case <-done: case <-ctx.Done(): // close all pipes - if cStdin != nil { - cStdin.Close() + if cfg.CStdin != nil { + cfg.CStdin.Close() } - if cStdout != nil { - cStdout.Close() + if cfg.CStdout != nil { + cfg.CStdout.Close() } - if cStderr != nil { - cStderr.Close() + if cfg.CStderr != nil { + cfg.CStderr.Close() } <-done } diff --git a/daemon/attach.go b/daemon/attach.go index 5bdbb35b31..1cc8adb17b 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -15,14 +15,6 @@ import ( "github.com/docker/docker/pkg/term" ) -type containerAttachConfig struct { - detachKeys []byte - stdin io.ReadCloser - stdout, stderr io.Writer - showHistory bool - stream bool -} - // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig. func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error { keys := []byte{} @@ -43,6 +35,16 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA return errors.NewRequestConflictError(err) } + cfg := stream.AttachConfig{ + UseStdin: c.UseStdin && container.Config.OpenStdin, + UseStdout: c.UseStdout, + UseStderr: c.UseStderr, + TTY: container.Config.Tty, + CloseStdin: container.Config.StdinOnce, + DetachKeys: keys, + } + container.StreamConfig.AttachStreams(&cfg) + inStream, outStream, errStream, err := c.GetStreams() if err != nil { return err @@ -54,48 +56,51 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) } - var cfg containerAttachConfig - - if c.UseStdin { - cfg.stdin = inStream + if cfg.UseStdin { + cfg.Stdin = inStream } - if c.UseStdout { - cfg.stdout = outStream + if cfg.UseStdout { + cfg.Stdout = outStream } - if c.UseStderr { - cfg.stderr = errStream + if cfg.UseStderr { + cfg.Stderr = errStream } - cfg.showHistory = c.Logs - cfg.stream = c.Stream - cfg.detachKeys = keys - - if err := daemon.containerAttach(container, &cfg); err != nil { + if err := daemon.containerAttach(container, &cfg, c.Logs, c.Stream); err != nil { fmt.Fprintf(outStream, "Error attaching: %s\n", err) } return nil } // ContainerAttachRaw attaches the provided streams to the container's stdio -func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error { +func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool) error { container, err := daemon.GetContainer(prefixOrName) if err != nil { return err } - cfg := &containerAttachConfig{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - stream: stream, + cfg := stream.AttachConfig{ + UseStdin: stdin != nil && container.Config.OpenStdin, + UseStdout: stdout != nil, + UseStderr: stderr != nil, + TTY: container.Config.Tty, + CloseStdin: container.Config.StdinOnce, } - return daemon.containerAttach(container, cfg) + container.StreamConfig.AttachStreams(&cfg) + if cfg.UseStdin { + cfg.Stdin = stdin + } + if cfg.UseStdout { + cfg.Stdout = stdout + } + if cfg.UseStderr { + cfg.Stderr = stderr + } + + return daemon.containerAttach(container, &cfg, false, doStream) } -func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAttachConfig) error { - stdin := cfg.stdin - stdout := cfg.stdout - stderr := cfg.stderr - if cfg.showHistory { +func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.AttachConfig, logs, doStream bool) error { + if logs { logDriver, err := daemon.getLogger(c) if err != nil { return err @@ -113,11 +118,11 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta if !ok { break LogLoop } - if msg.Source == "stdout" && stdout != nil { - stdout.Write(msg.Line) + if msg.Source == "stdout" && cfg.Stdout != nil { + cfg.Stdout.Write(msg.Line) } - if msg.Source == "stderr" && stderr != nil { - stderr.Write(msg.Line) + if msg.Source == "stderr" && cfg.Stderr != nil { + cfg.Stderr.Write(msg.Line) } case err := <-logs.Err: logrus.Errorf("Error streaming logs: %v", err) @@ -128,19 +133,18 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta daemon.LogContainerEvent(c, "attach") - if !cfg.stream { + if !doStream { return nil } - var stdinPipe io.ReadCloser - if stdin != nil { + if cfg.Stdin != nil { r, w := io.Pipe() - go func() { + go func(stdin io.ReadCloser) { defer w.Close() defer logrus.Debug("Closing buffered stdin pipe") io.Copy(w, stdin) - }() - stdinPipe = r + }(cfg.Stdin) + cfg.Stdin = r } waitChan := make(chan struct{}) @@ -154,14 +158,8 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAtta }() } - aCfg := &stream.AttachConfig{ - Stdin: stdinPipe, - Stdout: stdout, - Stderr: stderr, - DetachKeys: cfg.detachKeys, - } - - err := <-c.Attach(aCfg) + ctx := c.InitAttachContext() + err := <-c.StreamConfig.CopyStreams(ctx, cfg) if err != nil { if _, ok := err.(stream.DetachError); ok { daemon.LogContainerEvent(c, "detach") diff --git a/daemon/exec.go b/daemon/exec.go index 99db2204c6..001ffbd7fd 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -210,15 +210,19 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R return err } - attachConfig := &stream.AttachConfig{ + attachConfig := stream.AttachConfig{ TTY: ec.Tty, + UseStdin: cStdin != nil, + UseStdout: cStdout != nil, + UseStderr: cStderr != nil, Stdin: cStdin, Stdout: cStdout, Stderr: cStderr, DetachKeys: ec.DetachKeys, CloseStdin: true, } - attachErr := ec.StreamConfig.Attach(ctx, attachConfig) + ec.StreamConfig.AttachStreams(&attachConfig) + attachErr := ec.StreamConfig.CopyStreams(ctx, &attachConfig) systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio) if err != nil {