diff --git a/container/container.go b/container/container.go index a79ec89562..b004db2f4e 100644 --- a/container/container.go +++ b/container/container.go @@ -31,7 +31,6 @@ import ( "github.com/docker/docker/opts" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/symlink" "github.com/docker/docker/restartmanager" @@ -58,13 +57,6 @@ var ( errInvalidNetwork = fmt.Errorf("invalid network settings while building port map info") ) -// DetachError is special error which returned in case of container detach. -type DetachError struct{} - -func (DetachError) Error() string { - return "detached from container" -} - // CommonContainer holds the fields for a container which are // applicable across all platforms supported by the daemon. type CommonContainer struct { @@ -373,183 +365,17 @@ func (container *Container) GetExecIDs() []string { return container.ExecCommands.List() } -// Attach connects to the container's TTY, delegating to standard -// streams or websockets depending on the configuration. -func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { +// Attach connects to the container's stdio to the client streams +func (container *Container) Attach(cfg *stream.AttachConfig) chan error { ctx := container.InitAttachContext() - return AttachStreams(ctx, container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys) -} -// AttachStreams connects streams to a TTY. -// Used by exec too. Should this move somewhere else? -func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { - var ( - cStdout, cStderr io.ReadCloser - cStdin io.WriteCloser - wg sync.WaitGroup - errors = make(chan error, 3) - ) - - if stdin != nil && openStdin { - cStdin = streamConfig.StdinPipe() - wg.Add(1) + cfg.TTY = container.Config.Tty + if !container.Config.OpenStdin { + cfg.Stdin = nil } + cfg.CloseStdin = cfg.Stdin != nil && container.Config.StdinOnce - if stdout != nil { - cStdout = streamConfig.StdoutPipe() - wg.Add(1) - } - - if stderr != nil { - cStderr = streamConfig.StderrPipe() - wg.Add(1) - } - - // Connect stdin of container to the http conn. - go func() { - if stdin == nil || !openStdin { - return - } - logrus.Debug("attach: stdin: begin") - - var err error - if tty { - _, err = copyEscapable(cStdin, stdin, keys) - } else { - _, err = io.Copy(cStdin, stdin) - } - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - logrus.Errorf("attach: stdin: %s", err) - errors <- err - } - if stdinOnce && !tty { - cStdin.Close() - } else { - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - } - logrus.Debug("attach: stdin: end") - wg.Done() - }() - - attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { - if stream == nil { - return - } - - logrus.Debugf("attach: %s: begin", name) - _, err := io.Copy(stream, streamPipe) - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - logrus.Errorf("attach: %s: %v", name, err) - errors <- err - } - // Make sure stdin gets closed - if stdin != nil { - stdin.Close() - } - streamPipe.Close() - logrus.Debugf("attach: %s: end", name) - wg.Done() - } - - go attachStream("stdout", stdout, cStdout) - go attachStream("stderr", stderr, cStderr) - - return promise.Go(func() error { - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - // close all pipes - if cStdin != nil { - cStdin.Close() - } - if cStdout != nil { - cStdout.Close() - } - if cStderr != nil { - cStderr.Close() - } - <-done - } - close(errors) - for err := range errors { - if err != nil { - return err - } - } - return nil - }) -} - -// Code c/c from io.Copy() modified to handle escape sequence -func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) { - if len(keys) == 0 { - // Default keys : ctrl-p ctrl-q - keys = []byte{16, 17} - } - buf := make([]byte, 32*1024) - for { - nr, er := src.Read(buf) - if nr > 0 { - // ---- Docker addition - preservBuf := []byte{} - for i, key := range keys { - preservBuf = append(preservBuf, buf[0:nr]...) - if nr != 1 || buf[0] != key { - break - } - if i == len(keys)-1 { - src.Close() - return 0, DetachError{} - } - nr, er = src.Read(buf) - } - var nw int - var ew error - if len(preservBuf) > 0 { - nw, ew = dst.Write(preservBuf) - nr = len(preservBuf) - } else { - // ---- End of docker - nw, ew = dst.Write(buf[0:nr]) - } - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } - } - if er == io.EOF { - break - } - if er != nil { - err = er - break - } - } - return written, err + return container.StreamConfig.Attach(ctx, cfg) } // ShouldRestart decides whether the daemon should restart the container or not. diff --git a/container/stream/attach.go b/container/stream/attach.go new file mode 100644 index 0000000000..5f642933e3 --- /dev/null +++ b/container/stream/attach.go @@ -0,0 +1,209 @@ +package stream + +import ( + "io" + "sync" + + "golang.org/x/net/context" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/promise" +) + +// DetachError is special error which returned in case of container detach. +type DetachError struct{} + +func (DetachError) Error() string { + return "detached from container" +} + +// AttachConfig is the config struct used to attach a client to a stream's stdio +type AttachConfig struct { + // Tells the attach copier that the stream's stdin is a TTY and to look for + // escape sequences in stdin to detach from the stream. + // When true the escape sequence is not passed to the underlying stream + TTY bool + // Specifies the detach keys the client will be using + // Only useful when `TTY` is true + DetachKeys []byte + + // CloseStdin signals that once done, stdin for the attached stream should be closed + // For example, this would close the attached container's stdin. + CloseStdin bool + + // Provide client streams to wire up to + Stdin io.ReadCloser + Stdout, Stderr io.Writer +} + +// Attach attaches the stream config to the streams specified in +// the AttachOptions +func (c *Config) Attach(ctx context.Context, cfg *AttachConfig) chan error { + var ( + cStdout, cStderr io.ReadCloser + cStdin io.WriteCloser + wg sync.WaitGroup + errors = make(chan error, 3) + ) + + if cfg.Stdin != nil { + cStdin = c.StdinPipe() + wg.Add(1) + } + + if cfg.Stdout != nil { + cStdout = c.StdoutPipe() + wg.Add(1) + } + + if cfg.Stderr != nil { + cStderr = c.StderrPipe() + wg.Add(1) + } + + // Connect stdin of container to the attach stdin stream. + go func() { + if cfg.Stdin == nil { + return + } + logrus.Debug("attach: stdin: begin") + + var err error + if cfg.TTY { + _, err = copyEscapable(cStdin, cfg.Stdin, cfg.DetachKeys) + } else { + _, err = io.Copy(cStdin, cfg.Stdin) + } + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + logrus.Errorf("attach: stdin: %s", err) + errors <- err + } + if cfg.CloseStdin && !cfg.TTY { + cStdin.Close() + } else { + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } + } + logrus.Debug("attach: stdin: end") + wg.Done() + }() + + attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { + if stream == nil { + return + } + + logrus.Debugf("attach: %s: begin", name) + _, err := io.Copy(stream, streamPipe) + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + logrus.Errorf("attach: %s: %v", name, err) + errors <- err + } + // Make sure stdin gets closed + if cfg.Stdin != nil { + cfg.Stdin.Close() + } + streamPipe.Close() + logrus.Debugf("attach: %s: end", name) + wg.Done() + } + + go attachStream("stdout", cfg.Stdout, cStdout) + go attachStream("stderr", cfg.Stderr, cStderr) + + return promise.Go(func() error { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + // close all pipes + if cStdin != nil { + cStdin.Close() + } + if cStdout != nil { + cStdout.Close() + } + if cStderr != nil { + cStderr.Close() + } + <-done + } + close(errors) + for err := range errors { + if err != nil { + return err + } + } + return nil + }) +} + +// Code c/c from io.Copy() modified to handle escape sequence +func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) { + if len(keys) == 0 { + // Default keys : ctrl-p ctrl-q + keys = []byte{16, 17} + } + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + // ---- Docker addition + preservBuf := []byte{} + for i, key := range keys { + preservBuf = append(preservBuf, buf[0:nr]...) + if nr != 1 || buf[0] != key { + break + } + if i == len(keys)-1 { + src.Close() + return 0, DetachError{} + } + nr, er = src.Read(buf) + } + var nw int + var ew error + if len(preservBuf) > 0 { + nw, ew = dst.Write(preservBuf) + nr = len(preservBuf) + } else { + // ---- End of docker + nw, ew = dst.Write(buf[0:nr]) + } + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return written, err +} diff --git a/container/stream/streams.go b/container/stream/streams.go index bbfe4af61d..735bab5107 100644 --- a/container/stream/streams.go +++ b/container/stream/streams.go @@ -62,6 +62,7 @@ func (c *Config) StdinPipe() io.WriteCloser { // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe. // It adds this new out pipe to the Stdout broadcaster. +// This will block stdout if unconsumed. func (c *Config) StdoutPipe() io.ReadCloser { bytesPipe := ioutils.NewBytesPipe() c.stdout.Add(bytesPipe) @@ -70,6 +71,7 @@ func (c *Config) StdoutPipe() io.ReadCloser { // StderrPipe creates a new io.ReadCloser with an empty bytes pipe. // It adds this new err pipe to the Stderr broadcaster. +// This will block stderr if unconsumed. func (c *Config) StderrPipe() io.ReadCloser { bytesPipe := ioutils.NewBytesPipe() c.stderr.Add(bytesPipe) diff --git a/daemon/attach.go b/daemon/attach.go index 917237dd89..5bdbb35b31 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -9,11 +9,20 @@ import ( "github.com/docker/docker/api/errors" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/container" + "github.com/docker/docker/container/stream" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/term" ) +type containerAttachConfig struct { + detachKeys []byte + stdin io.ReadCloser + stdout, stderr io.Writer + showHistory bool + stream bool +} + // ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig. func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error { keys := []byte{} @@ -45,20 +54,23 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) } - var stdin io.ReadCloser - var stdout, stderr io.Writer + var cfg containerAttachConfig if c.UseStdin { - stdin = inStream + cfg.stdin = inStream } if c.UseStdout { - stdout = outStream + cfg.stdout = outStream } if c.UseStderr { - stderr = errStream + cfg.stderr = errStream } - if err := daemon.containerAttach(container, stdin, stdout, stderr, c.Logs, c.Stream, keys); err != nil { + cfg.showHistory = c.Logs + cfg.stream = c.Stream + cfg.detachKeys = keys + + if err := daemon.containerAttach(container, &cfg); err != nil { fmt.Fprintf(outStream, "Error attaching: %s\n", err) } return nil @@ -70,11 +82,20 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose if err != nil { return err } - return daemon.containerAttach(container, stdin, stdout, stderr, false, stream, nil) + cfg := &containerAttachConfig{ + stdin: stdin, + stdout: stdout, + stderr: stderr, + stream: stream, + } + return daemon.containerAttach(container, cfg) } -func (daemon *Daemon) containerAttach(c *container.Container, stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool, keys []byte) error { - if logs { +func (daemon *Daemon) containerAttach(c *container.Container, cfg *containerAttachConfig) error { + stdin := cfg.stdin + stdout := cfg.stdout + stderr := cfg.stderr + if cfg.showHistory { logDriver, err := daemon.getLogger(c) if err != nil { return err @@ -107,41 +128,47 @@ func (daemon *Daemon) containerAttach(c *container.Container, stdin io.ReadClose daemon.LogContainerEvent(c, "attach") - //stream - if stream { - var stdinPipe io.ReadCloser - if stdin != nil { - r, w := io.Pipe() - go func() { - defer w.Close() - defer logrus.Debug("Closing buffered stdin pipe") - io.Copy(w, stdin) - }() - stdinPipe = r - } + if !cfg.stream { + return nil + } - waitChan := make(chan struct{}) - if c.Config.StdinOnce && !c.Config.Tty { - go func() { - c.WaitStop(-1 * time.Second) - close(waitChan) - }() - } + var stdinPipe io.ReadCloser + if stdin != nil { + r, w := io.Pipe() + go func() { + defer w.Close() + defer logrus.Debug("Closing buffered stdin pipe") + io.Copy(w, stdin) + }() + stdinPipe = r + } - err := <-c.Attach(stdinPipe, stdout, stderr, keys) - if err != nil { - if _, ok := err.(container.DetachError); ok { - daemon.LogContainerEvent(c, "detach") - } else { - logrus.Errorf("attach failed with error: %v", err) - } - } - - // If we are in stdinonce mode, wait for the process to end - // otherwise, simply return - if c.Config.StdinOnce && !c.Config.Tty { + waitChan := make(chan struct{}) + if c.Config.StdinOnce && !c.Config.Tty { + defer func() { <-waitChan + }() + go func() { + c.WaitStop(-1 * time.Second) + close(waitChan) + }() + } + + aCfg := &stream.AttachConfig{ + Stdin: stdinPipe, + Stdout: stdout, + Stderr: stderr, + DetachKeys: cfg.detachKeys, + } + + err := <-c.Attach(aCfg) + if err != nil { + if _, ok := err.(stream.DetachError); ok { + daemon.LogContainerEvent(c, "detach") + } else { + logrus.Errorf("attach failed with error: %v", err) } } + return nil } diff --git a/daemon/exec.go b/daemon/exec.go index ab5315a745..99db2204c6 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -13,6 +13,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/strslice" "github.com/docker/docker/container" + "github.com/docker/docker/container/stream" "github.com/docker/docker/daemon/exec" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/pools" @@ -209,7 +210,15 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R return err } - attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) + attachConfig := &stream.AttachConfig{ + TTY: ec.Tty, + Stdin: cStdin, + Stdout: cStdout, + Stderr: cStderr, + DetachKeys: ec.DetachKeys, + CloseStdin: true, + } + attachErr := ec.StreamConfig.Attach(ctx, attachConfig) systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio) if err != nil { @@ -233,7 +242,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R return fmt.Errorf("context cancelled") case err := <-attachErr: if err != nil { - if _, ok := err.(container.DetachError); !ok { + if _, ok := err.(stream.DetachError); !ok { return fmt.Errorf("exec attach failed with error: %v", err) } d.LogContainerEvent(c, "exec_detach")