engine.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package engine
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/utils"
  13. )
  14. // Installer is a standard interface for objects which can "install" themselves
  15. // on an engine by registering handlers.
  16. // This can be used as an entrypoint for external plugins etc.
  17. type Installer interface {
  18. Install(*Engine) error
  19. }
  20. type Handler func(*Job) Status
  21. var globalHandlers map[string]Handler
  22. func init() {
  23. globalHandlers = make(map[string]Handler)
  24. }
  25. func Register(name string, handler Handler) error {
  26. _, exists := globalHandlers[name]
  27. if exists {
  28. return fmt.Errorf("Can't overwrite global handler for command %s", name)
  29. }
  30. globalHandlers[name] = handler
  31. return nil
  32. }
  33. func unregister(name string) {
  34. delete(globalHandlers, name)
  35. }
  36. // The Engine is the core of Docker.
  37. // It acts as a store for *containers*, and allows manipulation of these
  38. // containers by executing *jobs*.
  39. type Engine struct {
  40. handlers map[string]Handler
  41. catchall Handler
  42. hack Hack // data for temporary hackery (see hack.go)
  43. id string
  44. Stdout io.Writer
  45. Stderr io.Writer
  46. Stdin io.Reader
  47. Logging bool
  48. tasks sync.WaitGroup
  49. l sync.RWMutex // lock for shutdown
  50. shutdown bool
  51. onShutdown []func() // shutdown handlers
  52. }
  53. func (eng *Engine) Register(name string, handler Handler) error {
  54. _, exists := eng.handlers[name]
  55. if exists {
  56. return fmt.Errorf("Can't overwrite handler for command %s", name)
  57. }
  58. eng.handlers[name] = handler
  59. return nil
  60. }
  61. func (eng *Engine) RegisterCatchall(catchall Handler) {
  62. eng.catchall = catchall
  63. }
  64. // New initializes a new engine.
  65. func New() *Engine {
  66. eng := &Engine{
  67. handlers: make(map[string]Handler),
  68. id: utils.RandomString(),
  69. Stdout: os.Stdout,
  70. Stderr: os.Stderr,
  71. Stdin: os.Stdin,
  72. Logging: true,
  73. }
  74. eng.Register("commands", func(job *Job) Status {
  75. for _, name := range eng.commands() {
  76. job.Printf("%s\n", name)
  77. }
  78. return StatusOK
  79. })
  80. // Copy existing global handlers
  81. for k, v := range globalHandlers {
  82. eng.handlers[k] = v
  83. }
  84. return eng
  85. }
  86. func (eng *Engine) String() string {
  87. return fmt.Sprintf("%s", eng.id[:8])
  88. }
  89. // Commands returns a list of all currently registered commands,
  90. // sorted alphabetically.
  91. func (eng *Engine) commands() []string {
  92. names := make([]string, 0, len(eng.handlers))
  93. for name := range eng.handlers {
  94. names = append(names, name)
  95. }
  96. sort.Strings(names)
  97. return names
  98. }
  99. // Job creates a new job which can later be executed.
  100. // This function mimics `Command` from the standard os/exec package.
  101. func (eng *Engine) Job(name string, args ...string) *Job {
  102. job := &Job{
  103. Eng: eng,
  104. Name: name,
  105. Args: args,
  106. Stdin: NewInput(),
  107. Stdout: NewOutput(),
  108. Stderr: NewOutput(),
  109. env: &Env{},
  110. closeIO: true,
  111. }
  112. if eng.Logging {
  113. job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
  114. }
  115. // Catchall is shadowed by specific Register.
  116. if handler, exists := eng.handlers[name]; exists {
  117. job.handler = handler
  118. } else if eng.catchall != nil && name != "" {
  119. // empty job names are illegal, catchall or not.
  120. job.handler = eng.catchall
  121. }
  122. return job
  123. }
  124. // OnShutdown registers a new callback to be called by Shutdown.
  125. // This is typically used by services to perform cleanup.
  126. func (eng *Engine) OnShutdown(h func()) {
  127. eng.l.Lock()
  128. eng.onShutdown = append(eng.onShutdown, h)
  129. eng.l.Unlock()
  130. }
  131. // Shutdown permanently shuts down eng as follows:
  132. // - It refuses all new jobs, permanently.
  133. // - It waits for all active jobs to complete (with no timeout)
  134. // - It calls all shutdown handlers concurrently (if any)
  135. // - It returns when all handlers complete, or after 15 seconds,
  136. // whichever happens first.
  137. func (eng *Engine) Shutdown() {
  138. eng.l.Lock()
  139. if eng.shutdown {
  140. eng.l.Unlock()
  141. return
  142. }
  143. eng.shutdown = true
  144. eng.l.Unlock()
  145. // We don't need to protect the rest with a lock, to allow
  146. // for other calls to immediately fail with "shutdown" instead
  147. // of hanging for 15 seconds.
  148. // This requires all concurrent calls to check for shutdown, otherwise
  149. // it might cause a race.
  150. // Wait for all jobs to complete.
  151. // Timeout after 5 seconds.
  152. tasksDone := make(chan struct{})
  153. go func() {
  154. eng.tasks.Wait()
  155. close(tasksDone)
  156. }()
  157. select {
  158. case <-time.After(time.Second * 5):
  159. case <-tasksDone:
  160. }
  161. // Call shutdown handlers, if any.
  162. // Timeout after 10 seconds.
  163. var wg sync.WaitGroup
  164. for _, h := range eng.onShutdown {
  165. wg.Add(1)
  166. go func(h func()) {
  167. defer wg.Done()
  168. h()
  169. }(h)
  170. }
  171. done := make(chan struct{})
  172. go func() {
  173. wg.Wait()
  174. close(done)
  175. }()
  176. select {
  177. case <-time.After(time.Second * 10):
  178. case <-done:
  179. }
  180. return
  181. }
  182. // IsShutdown returns true if the engine is in the process
  183. // of shutting down, or already shut down.
  184. // Otherwise it returns false.
  185. func (eng *Engine) IsShutdown() bool {
  186. eng.l.RLock()
  187. defer eng.l.RUnlock()
  188. return eng.shutdown
  189. }
  190. // ParseJob creates a new job from a text description using a shell-like syntax.
  191. //
  192. // The following syntax is used to parse `input`:
  193. //
  194. // * Words are separated using standard whitespaces as separators.
  195. // * Quotes and backslashes are not interpreted.
  196. // * Words of the form 'KEY=[VALUE]' are added to the job environment.
  197. // * All other words are added to the job arguments.
  198. //
  199. // For example:
  200. //
  201. // job, _ := eng.ParseJob("VERBOSE=1 echo hello TEST=true world")
  202. //
  203. // The resulting job will have:
  204. // job.Args={"echo", "hello", "world"}
  205. // job.Env={"VERBOSE":"1", "TEST":"true"}
  206. //
  207. func (eng *Engine) ParseJob(input string) (*Job, error) {
  208. // FIXME: use a full-featured command parser
  209. scanner := bufio.NewScanner(strings.NewReader(input))
  210. scanner.Split(bufio.ScanWords)
  211. var (
  212. cmd []string
  213. env Env
  214. )
  215. for scanner.Scan() {
  216. word := scanner.Text()
  217. kv := strings.SplitN(word, "=", 2)
  218. if len(kv) == 2 {
  219. env.Set(kv[0], kv[1])
  220. } else {
  221. cmd = append(cmd, word)
  222. }
  223. }
  224. if len(cmd) == 0 {
  225. return nil, fmt.Errorf("empty command: '%s'", input)
  226. }
  227. job := eng.Job(cmd[0], cmd[1:]...)
  228. job.Env().Init(&env)
  229. return job, nil
  230. }