engine.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package engine
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/timeutils"
  13. "github.com/docker/docker/utils"
  14. )
  15. // Installer is a standard interface for objects which can "install" themselves
  16. // on an engine by registering handlers.
  17. // This can be used as an entrypoint for external plugins etc.
  18. type Installer interface {
  19. Install(*Engine) error
  20. }
  21. type Handler func(*Job) Status
  22. var globalHandlers map[string]Handler
  23. func init() {
  24. globalHandlers = make(map[string]Handler)
  25. }
  26. func Register(name string, handler Handler) error {
  27. _, exists := globalHandlers[name]
  28. if exists {
  29. return fmt.Errorf("Can't overwrite global handler for command %s", name)
  30. }
  31. globalHandlers[name] = handler
  32. return nil
  33. }
  34. func unregister(name string) {
  35. delete(globalHandlers, name)
  36. }
  37. // The Engine is the core of Docker.
  38. // It acts as a store for *containers*, and allows manipulation of these
  39. // containers by executing *jobs*.
  40. type Engine struct {
  41. handlers map[string]Handler
  42. catchall Handler
  43. hack Hack // data for temporary hackery (see hack.go)
  44. id string
  45. Stdout io.Writer
  46. Stderr io.Writer
  47. Stdin io.Reader
  48. Logging bool
  49. tasks sync.WaitGroup
  50. l sync.RWMutex // lock for shutdown
  51. shutdown bool
  52. onShutdown []func() // shutdown handlers
  53. }
  54. func (eng *Engine) Register(name string, handler Handler) error {
  55. _, exists := eng.handlers[name]
  56. if exists {
  57. return fmt.Errorf("Can't overwrite handler for command %s", name)
  58. }
  59. eng.handlers[name] = handler
  60. return nil
  61. }
  62. func (eng *Engine) RegisterCatchall(catchall Handler) {
  63. eng.catchall = catchall
  64. }
  65. // New initializes a new engine.
  66. func New() *Engine {
  67. eng := &Engine{
  68. handlers: make(map[string]Handler),
  69. id: utils.RandomString(),
  70. Stdout: os.Stdout,
  71. Stderr: os.Stderr,
  72. Stdin: os.Stdin,
  73. Logging: true,
  74. }
  75. eng.Register("commands", func(job *Job) Status {
  76. for _, name := range eng.commands() {
  77. job.Printf("%s\n", name)
  78. }
  79. return StatusOK
  80. })
  81. // Copy existing global handlers
  82. for k, v := range globalHandlers {
  83. eng.handlers[k] = v
  84. }
  85. return eng
  86. }
  87. func (eng *Engine) String() string {
  88. return fmt.Sprintf("%s", eng.id[:8])
  89. }
  90. // Commands returns a list of all currently registered commands,
  91. // sorted alphabetically.
  92. func (eng *Engine) commands() []string {
  93. names := make([]string, 0, len(eng.handlers))
  94. for name := range eng.handlers {
  95. names = append(names, name)
  96. }
  97. sort.Strings(names)
  98. return names
  99. }
  100. // Job creates a new job which can later be executed.
  101. // This function mimics `Command` from the standard os/exec package.
  102. func (eng *Engine) Job(name string, args ...string) *Job {
  103. job := &Job{
  104. Eng: eng,
  105. Name: name,
  106. Args: args,
  107. Stdin: NewInput(),
  108. Stdout: NewOutput(),
  109. Stderr: NewOutput(),
  110. env: &Env{},
  111. closeIO: true,
  112. }
  113. if eng.Logging {
  114. job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
  115. }
  116. // Catchall is shadowed by specific Register.
  117. if handler, exists := eng.handlers[name]; exists {
  118. job.handler = handler
  119. } else if eng.catchall != nil && name != "" {
  120. // empty job names are illegal, catchall or not.
  121. job.handler = eng.catchall
  122. }
  123. return job
  124. }
  125. // OnShutdown registers a new callback to be called by Shutdown.
  126. // This is typically used by services to perform cleanup.
  127. func (eng *Engine) OnShutdown(h func()) {
  128. eng.l.Lock()
  129. eng.onShutdown = append(eng.onShutdown, h)
  130. eng.l.Unlock()
  131. }
  132. // Shutdown permanently shuts down eng as follows:
  133. // - It refuses all new jobs, permanently.
  134. // - It waits for all active jobs to complete (with no timeout)
  135. // - It calls all shutdown handlers concurrently (if any)
  136. // - It returns when all handlers complete, or after 15 seconds,
  137. // whichever happens first.
  138. func (eng *Engine) Shutdown() {
  139. eng.l.Lock()
  140. if eng.shutdown {
  141. eng.l.Unlock()
  142. return
  143. }
  144. eng.shutdown = true
  145. eng.l.Unlock()
  146. // We don't need to protect the rest with a lock, to allow
  147. // for other calls to immediately fail with "shutdown" instead
  148. // of hanging for 15 seconds.
  149. // This requires all concurrent calls to check for shutdown, otherwise
  150. // it might cause a race.
  151. // Wait for all jobs to complete.
  152. // Timeout after 5 seconds.
  153. tasksDone := make(chan struct{})
  154. go func() {
  155. eng.tasks.Wait()
  156. close(tasksDone)
  157. }()
  158. select {
  159. case <-time.After(time.Second * 5):
  160. case <-tasksDone:
  161. }
  162. // Call shutdown handlers, if any.
  163. // Timeout after 10 seconds.
  164. var wg sync.WaitGroup
  165. for _, h := range eng.onShutdown {
  166. wg.Add(1)
  167. go func(h func()) {
  168. defer wg.Done()
  169. h()
  170. }(h)
  171. }
  172. done := make(chan struct{})
  173. go func() {
  174. wg.Wait()
  175. close(done)
  176. }()
  177. select {
  178. case <-time.After(time.Second * 10):
  179. case <-done:
  180. }
  181. return
  182. }
  183. // IsShutdown returns true if the engine is in the process
  184. // of shutting down, or already shut down.
  185. // Otherwise it returns false.
  186. func (eng *Engine) IsShutdown() bool {
  187. eng.l.RLock()
  188. defer eng.l.RUnlock()
  189. return eng.shutdown
  190. }
  191. // ParseJob creates a new job from a text description using a shell-like syntax.
  192. //
  193. // The following syntax is used to parse `input`:
  194. //
  195. // * Words are separated using standard whitespaces as separators.
  196. // * Quotes and backslashes are not interpreted.
  197. // * Words of the form 'KEY=[VALUE]' are added to the job environment.
  198. // * All other words are added to the job arguments.
  199. //
  200. // For example:
  201. //
  202. // job, _ := eng.ParseJob("VERBOSE=1 echo hello TEST=true world")
  203. //
  204. // The resulting job will have:
  205. // job.Args={"echo", "hello", "world"}
  206. // job.Env={"VERBOSE":"1", "TEST":"true"}
  207. //
  208. func (eng *Engine) ParseJob(input string) (*Job, error) {
  209. // FIXME: use a full-featured command parser
  210. scanner := bufio.NewScanner(strings.NewReader(input))
  211. scanner.Split(bufio.ScanWords)
  212. var (
  213. cmd []string
  214. env Env
  215. )
  216. for scanner.Scan() {
  217. word := scanner.Text()
  218. kv := strings.SplitN(word, "=", 2)
  219. if len(kv) == 2 {
  220. env.Set(kv[0], kv[1])
  221. } else {
  222. cmd = append(cmd, word)
  223. }
  224. }
  225. if len(cmd) == 0 {
  226. return nil, fmt.Errorf("empty command: '%s'", input)
  227. }
  228. job := eng.Job(cmd[0], cmd[1:]...)
  229. job.Env().Init(&env)
  230. return job, nil
  231. }
  232. func (eng *Engine) Logf(format string, args ...interface{}) (n int, err error) {
  233. if !eng.Logging {
  234. return 0, nil
  235. }
  236. prefixedFormat := fmt.Sprintf("[%s] [%s] %s\n", time.Now().Format(timeutils.RFC3339NanoFixed), eng, strings.TrimRight(format, "\n"))
  237. return fmt.Fprintf(eng.Stderr, prefixedFormat, args...)
  238. }