attach.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package stream
  2. import (
  3. "io"
  4. "sync"
  5. "golang.org/x/net/context"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/docker/docker/pkg/promise"
  8. "github.com/docker/docker/pkg/term"
  9. )
  10. var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
  11. // AttachConfig is the config struct used to attach a client to a stream's stdio
  12. type AttachConfig struct {
  13. // Tells the attach copier that the stream's stdin is a TTY and to look for
  14. // escape sequences in stdin to detach from the stream.
  15. // When true the escape sequence is not passed to the underlying stream
  16. TTY bool
  17. // Specifies the detach keys the client will be using
  18. // Only useful when `TTY` is true
  19. DetachKeys []byte
  20. // CloseStdin signals that once done, stdin for the attached stream should be closed
  21. // For example, this would close the attached container's stdin.
  22. CloseStdin bool
  23. // UseStd* indicate whether the client has requested to be connected to the
  24. // given stream or not. These flags are used instead of checking Std* != nil
  25. // at points before the client streams Std* are wired up.
  26. UseStdin, UseStdout, UseStderr bool
  27. // CStd* are the streams directly connected to the container
  28. CStdin io.WriteCloser
  29. CStdout, CStderr io.ReadCloser
  30. // Provide client streams to wire up to
  31. Stdin io.ReadCloser
  32. Stdout, Stderr io.Writer
  33. }
  34. // AttachStreams attaches the container's streams to the AttachConfig
  35. func (c *Config) AttachStreams(cfg *AttachConfig) {
  36. if cfg.UseStdin {
  37. cfg.CStdin = c.StdinPipe()
  38. }
  39. if cfg.UseStdout {
  40. cfg.CStdout = c.StdoutPipe()
  41. }
  42. if cfg.UseStderr {
  43. cfg.CStderr = c.StderrPipe()
  44. }
  45. }
  46. // CopyStreams starts goroutines to copy data in and out to/from the container
  47. func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error {
  48. var (
  49. wg sync.WaitGroup
  50. errors = make(chan error, 3)
  51. )
  52. if cfg.Stdin != nil {
  53. wg.Add(1)
  54. }
  55. if cfg.Stdout != nil {
  56. wg.Add(1)
  57. }
  58. if cfg.Stderr != nil {
  59. wg.Add(1)
  60. }
  61. // Connect stdin of container to the attach stdin stream.
  62. go func() {
  63. if cfg.Stdin == nil {
  64. return
  65. }
  66. logrus.Debug("attach: stdin: begin")
  67. var err error
  68. if cfg.TTY {
  69. _, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
  70. } else {
  71. _, err = io.Copy(cfg.CStdin, cfg.Stdin)
  72. }
  73. if err == io.ErrClosedPipe {
  74. err = nil
  75. }
  76. if err != nil {
  77. logrus.Errorf("attach: stdin: %s", err)
  78. errors <- err
  79. }
  80. if cfg.CloseStdin && !cfg.TTY {
  81. cfg.CStdin.Close()
  82. } else {
  83. // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
  84. if cfg.CStdout != nil {
  85. cfg.CStdout.Close()
  86. }
  87. if cfg.CStderr != nil {
  88. cfg.CStderr.Close()
  89. }
  90. }
  91. logrus.Debug("attach: stdin: end")
  92. wg.Done()
  93. }()
  94. attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
  95. if stream == nil {
  96. return
  97. }
  98. logrus.Debugf("attach: %s: begin", name)
  99. _, err := io.Copy(stream, streamPipe)
  100. if err == io.ErrClosedPipe {
  101. err = nil
  102. }
  103. if err != nil {
  104. logrus.Errorf("attach: %s: %v", name, err)
  105. errors <- err
  106. }
  107. // Make sure stdin gets closed
  108. if cfg.Stdin != nil {
  109. cfg.Stdin.Close()
  110. }
  111. streamPipe.Close()
  112. logrus.Debugf("attach: %s: end", name)
  113. wg.Done()
  114. }
  115. go attachStream("stdout", cfg.Stdout, cfg.CStdout)
  116. go attachStream("stderr", cfg.Stderr, cfg.CStderr)
  117. return promise.Go(func() error {
  118. done := make(chan struct{})
  119. go func() {
  120. wg.Wait()
  121. close(done)
  122. }()
  123. select {
  124. case <-done:
  125. case <-ctx.Done():
  126. // close all pipes
  127. if cfg.CStdin != nil {
  128. cfg.CStdin.Close()
  129. }
  130. if cfg.CStdout != nil {
  131. cfg.CStdout.Close()
  132. }
  133. if cfg.CStderr != nil {
  134. cfg.CStderr.Close()
  135. }
  136. <-done
  137. }
  138. close(errors)
  139. for err := range errors {
  140. if err != nil {
  141. return err
  142. }
  143. }
  144. return nil
  145. })
  146. }
  147. func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
  148. if len(keys) == 0 {
  149. keys = defaultEscapeSequence
  150. }
  151. pr := term.NewEscapeProxy(src, keys)
  152. defer src.Close()
  153. return io.Copy(dst, pr)
  154. }