attach.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package daemon
  2. import (
  3. "encoding/json"
  4. "io"
  5. "os"
  6. "sync"
  7. "time"
  8. log "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/engine"
  10. "github.com/docker/docker/pkg/jsonlog"
  11. "github.com/docker/docker/pkg/promise"
  12. "github.com/docker/docker/utils"
  13. )
  14. func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
  15. if len(job.Args) != 1 {
  16. return job.Errorf("Usage: %s CONTAINER\n", job.Name)
  17. }
  18. var (
  19. name = job.Args[0]
  20. logs = job.GetenvBool("logs")
  21. stream = job.GetenvBool("stream")
  22. stdin = job.GetenvBool("stdin")
  23. stdout = job.GetenvBool("stdout")
  24. stderr = job.GetenvBool("stderr")
  25. )
  26. container := daemon.Get(name)
  27. if container == nil {
  28. return job.Errorf("No such container: %s", name)
  29. }
  30. //logs
  31. if logs {
  32. cLog, err := container.ReadLog("json")
  33. if err != nil && os.IsNotExist(err) {
  34. // Legacy logs
  35. log.Debugf("Old logs format")
  36. if stdout {
  37. cLog, err := container.ReadLog("stdout")
  38. if err != nil {
  39. log.Errorf("Error reading logs (stdout): %s", err)
  40. } else if _, err := io.Copy(job.Stdout, cLog); err != nil {
  41. log.Errorf("Error streaming logs (stdout): %s", err)
  42. }
  43. }
  44. if stderr {
  45. cLog, err := container.ReadLog("stderr")
  46. if err != nil {
  47. log.Errorf("Error reading logs (stderr): %s", err)
  48. } else if _, err := io.Copy(job.Stderr, cLog); err != nil {
  49. log.Errorf("Error streaming logs (stderr): %s", err)
  50. }
  51. }
  52. } else if err != nil {
  53. log.Errorf("Error reading logs (json): %s", err)
  54. } else {
  55. dec := json.NewDecoder(cLog)
  56. for {
  57. l := &jsonlog.JSONLog{}
  58. if err := dec.Decode(l); err == io.EOF {
  59. break
  60. } else if err != nil {
  61. log.Errorf("Error streaming logs: %s", err)
  62. break
  63. }
  64. if l.Stream == "stdout" && stdout {
  65. io.WriteString(job.Stdout, l.Log)
  66. }
  67. if l.Stream == "stderr" && stderr {
  68. io.WriteString(job.Stderr, l.Log)
  69. }
  70. }
  71. }
  72. }
  73. //stream
  74. if stream {
  75. var (
  76. cStdin io.ReadCloser
  77. cStdout, cStderr io.Writer
  78. )
  79. if stdin {
  80. r, w := io.Pipe()
  81. go func() {
  82. defer w.Close()
  83. defer log.Debugf("Closing buffered stdin pipe")
  84. io.Copy(w, job.Stdin)
  85. }()
  86. cStdin = r
  87. }
  88. if stdout {
  89. cStdout = job.Stdout
  90. }
  91. if stderr {
  92. cStderr = job.Stderr
  93. }
  94. <-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdout, cStderr)
  95. // If we are in stdinonce mode, wait for the process to end
  96. // otherwise, simply return
  97. if container.Config.StdinOnce && !container.Config.Tty {
  98. container.WaitStop(-1 * time.Second)
  99. }
  100. }
  101. return engine.StatusOK
  102. }
  103. func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
  104. var (
  105. cStdout, cStderr io.ReadCloser
  106. cStdin io.WriteCloser
  107. wg sync.WaitGroup
  108. errors = make(chan error, 3)
  109. )
  110. if stdin != nil && openStdin {
  111. cStdin = streamConfig.StdinPipe()
  112. wg.Add(1)
  113. }
  114. if stdout != nil {
  115. cStdout = streamConfig.StdoutPipe()
  116. wg.Add(1)
  117. }
  118. if stderr != nil {
  119. cStderr = streamConfig.StderrPipe()
  120. wg.Add(1)
  121. }
  122. // Connect stdin of container to the http conn.
  123. go func() {
  124. if stdin == nil || !openStdin {
  125. return
  126. }
  127. log.Debugf("attach: stdin: begin")
  128. defer func() {
  129. if stdinOnce && !tty {
  130. cStdin.Close()
  131. } else {
  132. // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
  133. if cStdout != nil {
  134. cStdout.Close()
  135. }
  136. if cStderr != nil {
  137. cStderr.Close()
  138. }
  139. }
  140. wg.Done()
  141. log.Debugf("attach: stdin: end")
  142. }()
  143. var err error
  144. if tty {
  145. _, err = utils.CopyEscapable(cStdin, stdin)
  146. } else {
  147. _, err = io.Copy(cStdin, stdin)
  148. }
  149. if err == io.ErrClosedPipe {
  150. err = nil
  151. }
  152. if err != nil {
  153. log.Errorf("attach: stdin: %s", err)
  154. errors <- err
  155. return
  156. }
  157. }()
  158. attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
  159. if stream == nil {
  160. return
  161. }
  162. defer func() {
  163. // Make sure stdin gets closed
  164. if stdin != nil {
  165. stdin.Close()
  166. }
  167. streamPipe.Close()
  168. wg.Done()
  169. log.Debugf("attach: %s: end", name)
  170. }()
  171. log.Debugf("attach: %s: begin", name)
  172. _, err := io.Copy(stream, streamPipe)
  173. if err == io.ErrClosedPipe {
  174. err = nil
  175. }
  176. if err != nil {
  177. log.Errorf("attach: %s: %v", name, err)
  178. errors <- err
  179. }
  180. }
  181. go attachStream("stdout", stdout, cStdout)
  182. go attachStream("stderr", stderr, cStderr)
  183. return promise.Go(func() error {
  184. wg.Wait()
  185. close(errors)
  186. for err := range errors {
  187. if err != nil {
  188. return err
  189. }
  190. }
  191. return nil
  192. })
  193. }