attach.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package stream
  2. import (
  3. "io"
  4. "sync"
  5. "golang.org/x/net/context"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/docker/docker/pkg/promise"
  8. )
  9. var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
  10. // DetachError is special error which returned in case of container detach.
  11. type DetachError struct{}
  12. func (DetachError) Error() string {
  13. return "detached from container"
  14. }
  15. // AttachConfig is the config struct used to attach a client to a stream's stdio
  16. type AttachConfig struct {
  17. // Tells the attach copier that the stream's stdin is a TTY and to look for
  18. // escape sequences in stdin to detach from the stream.
  19. // When true the escape sequence is not passed to the underlying stream
  20. TTY bool
  21. // Specifies the detach keys the client will be using
  22. // Only useful when `TTY` is true
  23. DetachKeys []byte
  24. // CloseStdin signals that once done, stdin for the attached stream should be closed
  25. // For example, this would close the attached container's stdin.
  26. CloseStdin bool
  27. // UseStd* indicate whether the client has requested to be connected to the
  28. // given stream or not. These flags are used instead of checking Std* != nil
  29. // at points before the client streams Std* are wired up.
  30. UseStdin, UseStdout, UseStderr bool
  31. // CStd* are the streams directly connected to the container
  32. CStdin io.WriteCloser
  33. CStdout, CStderr io.ReadCloser
  34. // Provide client streams to wire up to
  35. Stdin io.ReadCloser
  36. Stdout, Stderr io.Writer
  37. }
  38. // AttachStreams attaches the container's streams to the AttachConfig
  39. func (c *Config) AttachStreams(cfg *AttachConfig) {
  40. if cfg.UseStdin {
  41. cfg.CStdin = c.StdinPipe()
  42. }
  43. if cfg.UseStdout {
  44. cfg.CStdout = c.StdoutPipe()
  45. }
  46. if cfg.UseStderr {
  47. cfg.CStderr = c.StderrPipe()
  48. }
  49. }
  50. // CopyStreams starts goroutines to copy data in and out to/from the container
  51. func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error {
  52. var (
  53. wg sync.WaitGroup
  54. errors = make(chan error, 3)
  55. )
  56. if cfg.Stdin != nil {
  57. wg.Add(1)
  58. }
  59. if cfg.Stdout != nil {
  60. wg.Add(1)
  61. }
  62. if cfg.Stderr != nil {
  63. wg.Add(1)
  64. }
  65. // Connect stdin of container to the attach stdin stream.
  66. go func() {
  67. if cfg.Stdin == nil {
  68. return
  69. }
  70. logrus.Debug("attach: stdin: begin")
  71. var err error
  72. if cfg.TTY {
  73. _, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
  74. } else {
  75. _, err = io.Copy(cfg.CStdin, cfg.Stdin)
  76. }
  77. if err == io.ErrClosedPipe {
  78. err = nil
  79. }
  80. if err != nil {
  81. logrus.Errorf("attach: stdin: %s", err)
  82. errors <- err
  83. }
  84. if cfg.CloseStdin && !cfg.TTY {
  85. cfg.CStdin.Close()
  86. } else {
  87. // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
  88. if cfg.CStdout != nil {
  89. cfg.CStdout.Close()
  90. }
  91. if cfg.CStderr != nil {
  92. cfg.CStderr.Close()
  93. }
  94. }
  95. logrus.Debug("attach: stdin: end")
  96. wg.Done()
  97. }()
  98. attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
  99. if stream == nil {
  100. return
  101. }
  102. logrus.Debugf("attach: %s: begin", name)
  103. _, err := io.Copy(stream, streamPipe)
  104. if err == io.ErrClosedPipe {
  105. err = nil
  106. }
  107. if err != nil {
  108. logrus.Errorf("attach: %s: %v", name, err)
  109. errors <- err
  110. }
  111. // Make sure stdin gets closed
  112. if cfg.Stdin != nil {
  113. cfg.Stdin.Close()
  114. }
  115. streamPipe.Close()
  116. logrus.Debugf("attach: %s: end", name)
  117. wg.Done()
  118. }
  119. go attachStream("stdout", cfg.Stdout, cfg.CStdout)
  120. go attachStream("stderr", cfg.Stderr, cfg.CStderr)
  121. return promise.Go(func() error {
  122. done := make(chan struct{})
  123. go func() {
  124. wg.Wait()
  125. close(done)
  126. }()
  127. select {
  128. case <-done:
  129. case <-ctx.Done():
  130. // close all pipes
  131. if cfg.CStdin != nil {
  132. cfg.CStdin.Close()
  133. }
  134. if cfg.CStdout != nil {
  135. cfg.CStdout.Close()
  136. }
  137. if cfg.CStderr != nil {
  138. cfg.CStderr.Close()
  139. }
  140. <-done
  141. }
  142. close(errors)
  143. for err := range errors {
  144. if err != nil {
  145. return err
  146. }
  147. }
  148. return nil
  149. })
  150. }
  151. // ttyProxy is used only for attaches with a TTY. It is used to proxy
  152. // stdin keypresses from the underlying reader and look for the passed in
  153. // escape key sequence to signal a detach.
  154. type ttyProxy struct {
  155. escapeKeys []byte
  156. escapeKeyPos int
  157. r io.Reader
  158. }
  159. func (r *ttyProxy) Read(buf []byte) (int, error) {
  160. nr, err := r.r.Read(buf)
  161. preserve := func() {
  162. // this preserves the original key presses in the passed in buffer
  163. nr += r.escapeKeyPos
  164. preserve := make([]byte, 0, r.escapeKeyPos+len(buf))
  165. preserve = append(preserve, r.escapeKeys[:r.escapeKeyPos]...)
  166. preserve = append(preserve, buf...)
  167. r.escapeKeyPos = 0
  168. copy(buf[0:nr], preserve)
  169. }
  170. if nr != 1 || err != nil {
  171. if r.escapeKeyPos > 0 {
  172. preserve()
  173. }
  174. return nr, err
  175. }
  176. if buf[0] != r.escapeKeys[r.escapeKeyPos] {
  177. if r.escapeKeyPos > 0 {
  178. preserve()
  179. }
  180. return nr, nil
  181. }
  182. if r.escapeKeyPos == len(r.escapeKeys)-1 {
  183. return 0, DetachError{}
  184. }
  185. // Looks like we've got an escape key, but we need to match again on the next
  186. // read.
  187. // Store the current escape key we found so we can look for the next one on
  188. // the next read.
  189. // Since this is an escape key, make sure we don't let the caller read it
  190. // If later on we find that this is not the escape sequence, we'll add the
  191. // keys back
  192. r.escapeKeyPos++
  193. return nr - r.escapeKeyPos, nil
  194. }
  195. func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
  196. if len(keys) == 0 {
  197. keys = defaultEscapeSequence
  198. }
  199. pr := &ttyProxy{escapeKeys: keys, r: src}
  200. defer src.Close()
  201. return io.Copy(dst, pr)
  202. }