exec.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // build linux
  2. package daemon
  3. import (
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "strings"
  8. "sync"
  9. log "github.com/Sirupsen/logrus"
  10. "github.com/docker/docker/daemon/execdriver"
  11. "github.com/docker/docker/daemon/execdriver/lxc"
  12. "github.com/docker/docker/engine"
  13. "github.com/docker/docker/pkg/broadcastwriter"
  14. "github.com/docker/docker/pkg/ioutils"
  15. "github.com/docker/docker/pkg/promise"
  16. "github.com/docker/docker/runconfig"
  17. "github.com/docker/docker/utils"
  18. )
  19. type execConfig struct {
  20. sync.Mutex
  21. ID string
  22. Running bool
  23. ProcessConfig execdriver.ProcessConfig
  24. StreamConfig
  25. OpenStdin bool
  26. OpenStderr bool
  27. OpenStdout bool
  28. Container *Container
  29. }
  30. type execStore struct {
  31. s map[string]*execConfig
  32. sync.Mutex
  33. }
  34. func newExecStore() *execStore {
  35. return &execStore{s: make(map[string]*execConfig, 0)}
  36. }
  37. func (e *execStore) Add(id string, execConfig *execConfig) {
  38. e.Lock()
  39. e.s[id] = execConfig
  40. e.Unlock()
  41. }
  42. func (e *execStore) Get(id string) *execConfig {
  43. e.Lock()
  44. res := e.s[id]
  45. e.Unlock()
  46. return res
  47. }
  48. func (e *execStore) Delete(id string) {
  49. e.Lock()
  50. delete(e.s, id)
  51. e.Unlock()
  52. }
  53. func (execConfig *execConfig) Resize(h, w int) error {
  54. return execConfig.ProcessConfig.Terminal.Resize(h, w)
  55. }
  56. func (d *Daemon) registerExecCommand(execConfig *execConfig) {
  57. // Storing execs in container inorder to kill them gracefully whenever the container is stopped or removed.
  58. execConfig.Container.execCommands.Add(execConfig.ID, execConfig)
  59. // Storing execs in daemon for easy access via remote API.
  60. d.execCommands.Add(execConfig.ID, execConfig)
  61. }
  62. func (d *Daemon) getExecConfig(name string) (*execConfig, error) {
  63. if execConfig := d.execCommands.Get(name); execConfig != nil {
  64. if !execConfig.Container.IsRunning() {
  65. return nil, fmt.Errorf("Container %s is not running", execConfig.Container.ID)
  66. }
  67. return execConfig, nil
  68. }
  69. return nil, fmt.Errorf("No such exec instance '%s' found in daemon", name)
  70. }
  71. func (d *Daemon) unregisterExecCommand(execConfig *execConfig) {
  72. execConfig.Container.execCommands.Delete(execConfig.ID)
  73. d.execCommands.Delete(execConfig.ID)
  74. }
  75. func (d *Daemon) getActiveContainer(name string) (*Container, error) {
  76. container := d.Get(name)
  77. if container == nil {
  78. return nil, fmt.Errorf("No such container: %s", name)
  79. }
  80. if !container.IsRunning() {
  81. return nil, fmt.Errorf("Container %s is not running", name)
  82. }
  83. return container, nil
  84. }
  85. func (d *Daemon) ContainerExecCreate(job *engine.Job) engine.Status {
  86. if len(job.Args) != 1 {
  87. return job.Errorf("Usage: %s [options] container command [args]", job.Name)
  88. }
  89. if strings.HasPrefix(d.execDriver.Name(), lxc.DriverName) {
  90. return job.Error(lxc.ErrExec)
  91. }
  92. var name = job.Args[0]
  93. container, err := d.getActiveContainer(name)
  94. if err != nil {
  95. return job.Error(err)
  96. }
  97. config := runconfig.ExecConfigFromJob(job)
  98. entrypoint, args := d.getEntrypointAndArgs(nil, config.Cmd)
  99. processConfig := execdriver.ProcessConfig{
  100. Privileged: config.Privileged,
  101. User: config.User,
  102. Tty: config.Tty,
  103. Entrypoint: entrypoint,
  104. Arguments: args,
  105. }
  106. execConfig := &execConfig{
  107. ID: utils.GenerateRandomID(),
  108. OpenStdin: config.AttachStdin,
  109. OpenStdout: config.AttachStdout,
  110. OpenStderr: config.AttachStderr,
  111. StreamConfig: StreamConfig{},
  112. ProcessConfig: processConfig,
  113. Container: container,
  114. Running: false,
  115. }
  116. d.registerExecCommand(execConfig)
  117. job.Printf("%s\n", execConfig.ID)
  118. return engine.StatusOK
  119. }
  120. func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status {
  121. if len(job.Args) != 1 {
  122. return job.Errorf("Usage: %s [options] exec", job.Name)
  123. }
  124. var (
  125. cStdin io.ReadCloser
  126. cStdout, cStderr io.Writer
  127. execName = job.Args[0]
  128. )
  129. execConfig, err := d.getExecConfig(execName)
  130. if err != nil {
  131. return job.Error(err)
  132. }
  133. func() {
  134. execConfig.Lock()
  135. defer execConfig.Unlock()
  136. if execConfig.Running {
  137. err = fmt.Errorf("Error: Exec command %s is already running", execName)
  138. }
  139. execConfig.Running = true
  140. }()
  141. if err != nil {
  142. return job.Error(err)
  143. }
  144. log.Debugf("starting exec command %s in container %s", execConfig.ID, execConfig.Container.ID)
  145. container := execConfig.Container
  146. if execConfig.OpenStdin {
  147. r, w := io.Pipe()
  148. go func() {
  149. defer w.Close()
  150. defer log.Debugf("Closing buffered stdin pipe")
  151. io.Copy(w, job.Stdin)
  152. }()
  153. cStdin = r
  154. }
  155. if execConfig.OpenStdout {
  156. cStdout = job.Stdout
  157. }
  158. if execConfig.OpenStderr {
  159. cStderr = job.Stderr
  160. }
  161. execConfig.StreamConfig.stderr = broadcastwriter.New()
  162. execConfig.StreamConfig.stdout = broadcastwriter.New()
  163. // Attach to stdin
  164. if execConfig.OpenStdin {
  165. execConfig.StreamConfig.stdin, execConfig.StreamConfig.stdinPipe = io.Pipe()
  166. } else {
  167. execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
  168. }
  169. attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr)
  170. execErr := make(chan error)
  171. // Remove exec from daemon and container.
  172. defer d.unregisterExecCommand(execConfig)
  173. go func() {
  174. err := container.Exec(execConfig)
  175. if err != nil {
  176. execErr <- fmt.Errorf("Cannot run exec command %s in container %s: %s", execName, container.ID, err)
  177. }
  178. }()
  179. select {
  180. case err := <-attachErr:
  181. if err != nil {
  182. return job.Errorf("attach failed with error: %s", err)
  183. }
  184. break
  185. case err := <-execErr:
  186. return job.Error(err)
  187. }
  188. return engine.StatusOK
  189. }
  190. func (d *Daemon) Exec(c *Container, execConfig *execConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) {
  191. return d.execDriver.Exec(c.command, &execConfig.ProcessConfig, pipes, startCallback)
  192. }
  193. func (container *Container) Exec(execConfig *execConfig) error {
  194. container.Lock()
  195. defer container.Unlock()
  196. waitStart := make(chan struct{})
  197. callback := func(processConfig *execdriver.ProcessConfig, pid int) {
  198. if processConfig.Tty {
  199. // The callback is called after the process Start()
  200. // so we are in the parent process. In TTY mode, stdin/out/err is the PtySlave
  201. // which we close here.
  202. if c, ok := processConfig.Stdout.(io.Closer); ok {
  203. c.Close()
  204. }
  205. }
  206. close(waitStart)
  207. }
  208. // We use a callback here instead of a goroutine and an chan for
  209. // syncronization purposes
  210. cErr := promise.Go(func() error { return container.monitorExec(execConfig, callback) })
  211. // Exec should not return until the process is actually running
  212. select {
  213. case <-waitStart:
  214. case err := <-cErr:
  215. return err
  216. }
  217. return nil
  218. }
  219. func (container *Container) monitorExec(execConfig *execConfig, callback execdriver.StartCallback) error {
  220. var (
  221. err error
  222. exitCode int
  223. )
  224. pipes := execdriver.NewPipes(execConfig.StreamConfig.stdin, execConfig.StreamConfig.stdout, execConfig.StreamConfig.stderr, execConfig.OpenStdin)
  225. exitCode, err = container.daemon.Exec(container, execConfig, pipes, callback)
  226. if err != nil {
  227. log.Errorf("Error running command in existing container %s: %s", container.ID, err)
  228. }
  229. log.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode)
  230. if execConfig.OpenStdin {
  231. if err := execConfig.StreamConfig.stdin.Close(); err != nil {
  232. log.Errorf("Error closing stdin while running in %s: %s", container.ID, err)
  233. }
  234. }
  235. if err := execConfig.StreamConfig.stdout.Clean(); err != nil {
  236. log.Errorf("Error closing stdout while running in %s: %s", container.ID, err)
  237. }
  238. if err := execConfig.StreamConfig.stderr.Clean(); err != nil {
  239. log.Errorf("Error closing stderr while running in %s: %s", container.ID, err)
  240. }
  241. if execConfig.ProcessConfig.Terminal != nil {
  242. if err := execConfig.ProcessConfig.Terminal.Close(); err != nil {
  243. log.Errorf("Error closing terminal while running in container %s: %s", container.ID, err)
  244. }
  245. }
  246. return err
  247. }