engine.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package engine
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/stringid"
  13. )
  14. // Installer is a standard interface for objects which can "install" themselves
  15. // on an engine by registering handlers.
  16. // This can be used as an entrypoint for external plugins etc.
  17. type Installer interface {
  18. Install(*Engine) error
  19. }
  20. type Handler func(*Job) error
  21. var globalHandlers map[string]Handler
  22. func init() {
  23. globalHandlers = make(map[string]Handler)
  24. }
  25. func Register(name string, handler Handler) error {
  26. _, exists := globalHandlers[name]
  27. if exists {
  28. return fmt.Errorf("Can't overwrite global handler for command %s", name)
  29. }
  30. globalHandlers[name] = handler
  31. return nil
  32. }
  33. func unregister(name string) {
  34. delete(globalHandlers, name)
  35. }
  36. // The Engine is the core of Docker.
  37. // It acts as a store for *containers*, and allows manipulation of these
  38. // containers by executing *jobs*.
  39. type Engine struct {
  40. handlers map[string]Handler
  41. catchall Handler
  42. hack Hack // data for temporary hackery (see hack.go)
  43. id string
  44. Stdout io.Writer
  45. Stderr io.Writer
  46. Stdin io.Reader
  47. Logging bool
  48. tasks sync.WaitGroup
  49. l sync.RWMutex // lock for shutdown
  50. shutdownWait sync.WaitGroup
  51. shutdown bool
  52. onShutdown []func() // shutdown handlers
  53. }
  54. func (eng *Engine) Register(name string, handler Handler) error {
  55. _, exists := eng.handlers[name]
  56. if exists {
  57. return fmt.Errorf("Can't overwrite handler for command %s", name)
  58. }
  59. eng.handlers[name] = handler
  60. return nil
  61. }
  62. func (eng *Engine) RegisterCatchall(catchall Handler) {
  63. eng.catchall = catchall
  64. }
  65. // New initializes a new engine.
  66. func New() *Engine {
  67. eng := &Engine{
  68. handlers: make(map[string]Handler),
  69. id: stringid.GenerateRandomID(),
  70. Stdout: os.Stdout,
  71. Stderr: os.Stderr,
  72. Stdin: os.Stdin,
  73. Logging: true,
  74. }
  75. eng.Register("commands", func(job *Job) error {
  76. for _, name := range eng.commands() {
  77. job.Printf("%s\n", name)
  78. }
  79. return nil
  80. })
  81. // Copy existing global handlers
  82. for k, v := range globalHandlers {
  83. eng.handlers[k] = v
  84. }
  85. return eng
  86. }
  87. func (eng *Engine) String() string {
  88. return fmt.Sprintf("%s", eng.id[:8])
  89. }
  90. // Commands returns a list of all currently registered commands,
  91. // sorted alphabetically.
  92. func (eng *Engine) commands() []string {
  93. names := make([]string, 0, len(eng.handlers))
  94. for name := range eng.handlers {
  95. names = append(names, name)
  96. }
  97. sort.Strings(names)
  98. return names
  99. }
  100. // Job creates a new job which can later be executed.
  101. // This function mimics `Command` from the standard os/exec package.
  102. func (eng *Engine) Job(name string, args ...string) *Job {
  103. job := &Job{
  104. Eng: eng,
  105. Name: name,
  106. Args: args,
  107. Stdin: NewInput(),
  108. Stdout: NewOutput(),
  109. Stderr: NewOutput(),
  110. env: &Env{},
  111. closeIO: true,
  112. cancelled: make(chan struct{}),
  113. }
  114. if eng.Logging {
  115. job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
  116. }
  117. // Catchall is shadowed by specific Register.
  118. if handler, exists := eng.handlers[name]; exists {
  119. job.handler = handler
  120. } else if eng.catchall != nil && name != "" {
  121. // empty job names are illegal, catchall or not.
  122. job.handler = eng.catchall
  123. }
  124. return job
  125. }
  126. // OnShutdown registers a new callback to be called by Shutdown.
  127. // This is typically used by services to perform cleanup.
  128. func (eng *Engine) OnShutdown(h func()) {
  129. eng.l.Lock()
  130. eng.onShutdown = append(eng.onShutdown, h)
  131. eng.shutdownWait.Add(1)
  132. eng.l.Unlock()
  133. }
  134. // Shutdown permanently shuts down eng as follows:
  135. // - It refuses all new jobs, permanently.
  136. // - It waits for all active jobs to complete (with no timeout)
  137. // - It calls all shutdown handlers concurrently (if any)
  138. // - It returns when all handlers complete, or after 15 seconds,
  139. // whichever happens first.
  140. func (eng *Engine) Shutdown() {
  141. eng.l.Lock()
  142. if eng.shutdown {
  143. eng.l.Unlock()
  144. eng.shutdownWait.Wait()
  145. return
  146. }
  147. eng.shutdown = true
  148. eng.l.Unlock()
  149. // We don't need to protect the rest with a lock, to allow
  150. // for other calls to immediately fail with "shutdown" instead
  151. // of hanging for 15 seconds.
  152. // This requires all concurrent calls to check for shutdown, otherwise
  153. // it might cause a race.
  154. // Wait for all jobs to complete.
  155. // Timeout after 5 seconds.
  156. tasksDone := make(chan struct{})
  157. go func() {
  158. eng.tasks.Wait()
  159. close(tasksDone)
  160. }()
  161. select {
  162. case <-time.After(time.Second * 5):
  163. case <-tasksDone:
  164. }
  165. // Call shutdown handlers, if any.
  166. // Timeout after 10 seconds.
  167. for _, h := range eng.onShutdown {
  168. go func(h func()) {
  169. h()
  170. eng.shutdownWait.Done()
  171. }(h)
  172. }
  173. done := make(chan struct{})
  174. go func() {
  175. eng.shutdownWait.Wait()
  176. close(done)
  177. }()
  178. select {
  179. case <-time.After(time.Second * 10):
  180. case <-done:
  181. }
  182. return
  183. }
  184. // IsShutdown returns true if the engine is in the process
  185. // of shutting down, or already shut down.
  186. // Otherwise it returns false.
  187. func (eng *Engine) IsShutdown() bool {
  188. eng.l.RLock()
  189. defer eng.l.RUnlock()
  190. return eng.shutdown
  191. }
  192. // ParseJob creates a new job from a text description using a shell-like syntax.
  193. //
  194. // The following syntax is used to parse `input`:
  195. //
  196. // * Words are separated using standard whitespaces as separators.
  197. // * Quotes and backslashes are not interpreted.
  198. // * Words of the form 'KEY=[VALUE]' are added to the job environment.
  199. // * All other words are added to the job arguments.
  200. //
  201. // For example:
  202. //
  203. // job, _ := eng.ParseJob("VERBOSE=1 echo hello TEST=true world")
  204. //
  205. // The resulting job will have:
  206. // job.Args={"echo", "hello", "world"}
  207. // job.Env={"VERBOSE":"1", "TEST":"true"}
  208. //
  209. func (eng *Engine) ParseJob(input string) (*Job, error) {
  210. // FIXME: use a full-featured command parser
  211. scanner := bufio.NewScanner(strings.NewReader(input))
  212. scanner.Split(bufio.ScanWords)
  213. var (
  214. cmd []string
  215. env Env
  216. )
  217. for scanner.Scan() {
  218. word := scanner.Text()
  219. kv := strings.SplitN(word, "=", 2)
  220. if len(kv) == 2 {
  221. env.Set(kv[0], kv[1])
  222. } else {
  223. cmd = append(cmd, word)
  224. }
  225. }
  226. if len(cmd) == 0 {
  227. return nil, fmt.Errorf("empty command: '%s'", input)
  228. }
  229. job := eng.Job(cmd[0], cmd[1:]...)
  230. job.Env().Init(&env)
  231. return job, nil
  232. }