controller.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package exec
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/api/equality"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/protobuf/ptypes"
  9. "github.com/pkg/errors"
  10. "github.com/sirupsen/logrus"
  11. "golang.org/x/net/context"
  12. )
  13. // Controller controls execution of a task.
  14. type Controller interface {
  15. // Update the task definition seen by the controller. Will return
  16. // ErrTaskUpdateFailed if the provided task definition changes fields that
  17. // cannot be changed.
  18. //
  19. // Will be ignored if the task has exited.
  20. Update(ctx context.Context, t *api.Task) error
  21. // Prepare the task for execution. This should ensure that all resources
  22. // are created such that a call to start should execute immediately.
  23. Prepare(ctx context.Context) error
  24. // Start the target and return when it has started successfully.
  25. Start(ctx context.Context) error
  26. // Wait blocks until the target has exited.
  27. Wait(ctx context.Context) error
  28. // Shutdown requests to exit the target gracefully.
  29. Shutdown(ctx context.Context) error
  30. // Terminate the target.
  31. Terminate(ctx context.Context) error
  32. // Remove all resources allocated by the controller.
  33. Remove(ctx context.Context) error
  34. // Close closes any ephemeral resources associated with controller instance.
  35. Close() error
  36. }
  37. // ControllerLogs defines a component that makes logs accessible.
  38. //
  39. // Can usually be accessed on a controller instance via type assertion.
  40. type ControllerLogs interface {
  41. // Logs will write publisher until the context is cancelled or an error
  42. // occurs.
  43. Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
  44. }
  45. // LogPublisher defines the protocol for receiving a log message.
  46. type LogPublisher interface {
  47. Publish(ctx context.Context, message api.LogMessage) error
  48. }
  49. // LogPublisherFunc implements publisher with just a function.
  50. type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
  51. // Publish calls the wrapped function.
  52. func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
  53. return fn(ctx, message)
  54. }
  55. // LogPublisherProvider defines the protocol for receiving a log publisher
  56. type LogPublisherProvider interface {
  57. Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
  58. }
  59. // ContainerStatuser reports status of a container.
  60. //
  61. // This can be implemented by controllers or error types.
  62. type ContainerStatuser interface {
  63. // ContainerStatus returns the status of the target container, if
  64. // available. When the container is not available, the status will be nil.
  65. ContainerStatus(ctx context.Context) (*api.ContainerStatus, error)
  66. }
  67. // PortStatuser reports status of ports which are allocated by the executor
  68. type PortStatuser interface {
  69. // PortStatus returns the status on a list of PortConfigs
  70. // which are managed at the host level by the controller.
  71. PortStatus(ctx context.Context) (*api.PortStatus, error)
  72. }
  73. // Resolve attempts to get a controller from the executor and reports the
  74. // correct status depending on the tasks current state according to the result.
  75. //
  76. // Unlike Do, if an error is returned, the status should still be reported. The
  77. // error merely reports the failure at getting the controller.
  78. func Resolve(ctx context.Context, task *api.Task, executor Executor) (Controller, *api.TaskStatus, error) {
  79. status := task.Status.Copy()
  80. defer func() {
  81. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  82. }()
  83. ctlr, err := executor.Controller(task)
  84. // depending on the tasks state, a failed controller resolution has varying
  85. // impact. The following expresses that impact.
  86. if err != nil {
  87. status.Message = "resolving controller failed"
  88. status.Err = err.Error()
  89. // before the task has been started, we consider it a rejection.
  90. // if task is running, consider the task has failed
  91. // otherwise keep the existing state
  92. if task.Status.State < api.TaskStateStarting {
  93. status.State = api.TaskStateRejected
  94. } else if task.Status.State <= api.TaskStateRunning {
  95. status.State = api.TaskStateFailed
  96. }
  97. } else if task.Status.State < api.TaskStateAccepted {
  98. // we always want to proceed to accepted when we resolve the controller
  99. status.Message = "accepted"
  100. status.State = api.TaskStateAccepted
  101. status.Err = ""
  102. }
  103. return ctlr, status, err
  104. }
  105. // Do progresses the task state using the controller performing a single
  106. // operation on the controller. The return TaskStatus should be marked as the
  107. // new state of the task.
  108. //
  109. // The returned status should be reported and placed back on to task
  110. // before the next call. The operation can be cancelled by creating a
  111. // cancelling context.
  112. //
  113. // Errors from the task controller will reported on the returned status. Any
  114. // errors coming from this function should not be reported as related to the
  115. // individual task.
  116. //
  117. // If ErrTaskNoop is returned, it means a second call to Do will result in no
  118. // change. If ErrTaskDead is returned, calls to Do will no longer result in any
  119. // action.
  120. func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus, error) {
  121. status := task.Status.Copy()
  122. // stay in the current state.
  123. noop := func(errs ...error) (*api.TaskStatus, error) {
  124. return status, ErrTaskNoop
  125. }
  126. retry := func() (*api.TaskStatus, error) {
  127. // while we retry on all errors, this allows us to explicitly declare
  128. // retry cases.
  129. return status, ErrTaskRetry
  130. }
  131. // transition moves the task to the next state.
  132. transition := func(state api.TaskState, msg string) (*api.TaskStatus, error) {
  133. current := status.State
  134. status.State = state
  135. status.Message = msg
  136. status.Err = ""
  137. if current > state {
  138. panic("invalid state transition")
  139. }
  140. return status, nil
  141. }
  142. // containerStatus exitCode keeps track of whether or not we've set it in
  143. // this particular method. Eventually, we assemble this as part of a defer.
  144. var (
  145. containerStatus *api.ContainerStatus
  146. portStatus *api.PortStatus
  147. exitCode int
  148. )
  149. // returned when a fatal execution of the task is fatal. In this case, we
  150. // proceed to a terminal error state and set the appropriate fields.
  151. //
  152. // Common checks for the nature of an error should be included here. If the
  153. // error is determined not to be fatal for the task,
  154. fatal := func(err error) (*api.TaskStatus, error) {
  155. if err == nil {
  156. panic("err must not be nil when fatal")
  157. }
  158. if cs, ok := err.(ContainerStatuser); ok {
  159. var err error
  160. containerStatus, err = cs.ContainerStatus(ctx)
  161. if err != nil && !contextDoneError(err) {
  162. log.G(ctx).WithError(err).Error("error resolving container status on fatal")
  163. }
  164. }
  165. // make sure we've set the *correct* exit code
  166. if ec, ok := err.(ExitCoder); ok {
  167. exitCode = ec.ExitCode()
  168. }
  169. if cause := errors.Cause(err); cause == context.DeadlineExceeded || cause == context.Canceled {
  170. return retry()
  171. }
  172. status.Err = err.Error() // still reported on temporary
  173. if IsTemporary(err) {
  174. return retry()
  175. }
  176. // only at this point do we consider the error fatal to the task.
  177. log.G(ctx).WithError(err).Error("fatal task error")
  178. // NOTE(stevvooe): The following switch dictates the terminal failure
  179. // state based on the state in which the failure was encountered.
  180. switch {
  181. case status.State < api.TaskStateStarting:
  182. status.State = api.TaskStateRejected
  183. case status.State >= api.TaskStateStarting:
  184. status.State = api.TaskStateFailed
  185. }
  186. return status, nil
  187. }
  188. // below, we have several callbacks that are run after the state transition
  189. // is completed.
  190. defer func() {
  191. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  192. if !equality.TaskStatusesEqualStable(status, &task.Status) {
  193. status.Timestamp = ptypes.MustTimestampProto(time.Now())
  194. }
  195. }()
  196. // extract the container status from the container, if supported.
  197. defer func() {
  198. // only do this if in an active state
  199. if status.State < api.TaskStateStarting {
  200. return
  201. }
  202. if containerStatus == nil {
  203. // collect this, if we haven't
  204. cctlr, ok := ctlr.(ContainerStatuser)
  205. if !ok {
  206. return
  207. }
  208. var err error
  209. containerStatus, err = cctlr.ContainerStatus(ctx)
  210. if err != nil && !contextDoneError(err) {
  211. log.G(ctx).WithError(err).Error("container status unavailable")
  212. }
  213. // at this point, things have gone fairly wrong. Remain positive
  214. // and let's get something out the door.
  215. if containerStatus == nil {
  216. containerStatus = new(api.ContainerStatus)
  217. containerStatusTask := task.Status.GetContainer()
  218. if containerStatusTask != nil {
  219. *containerStatus = *containerStatusTask // copy it over.
  220. }
  221. }
  222. }
  223. // at this point, we *must* have a containerStatus.
  224. if exitCode != 0 {
  225. containerStatus.ExitCode = int32(exitCode)
  226. }
  227. status.RuntimeStatus = &api.TaskStatus_Container{
  228. Container: containerStatus,
  229. }
  230. if portStatus == nil {
  231. pctlr, ok := ctlr.(PortStatuser)
  232. if !ok {
  233. return
  234. }
  235. var err error
  236. portStatus, err = pctlr.PortStatus(ctx)
  237. if err != nil && !contextDoneError(err) {
  238. log.G(ctx).WithError(err).Error("container port status unavailable")
  239. }
  240. }
  241. status.PortStatus = portStatus
  242. }()
  243. if task.DesiredState == api.TaskStateShutdown {
  244. if status.State >= api.TaskStateCompleted {
  245. return noop()
  246. }
  247. if err := ctlr.Shutdown(ctx); err != nil {
  248. return fatal(err)
  249. }
  250. return transition(api.TaskStateShutdown, "shutdown")
  251. }
  252. if status.State > task.DesiredState {
  253. return noop() // way beyond desired state, pause
  254. }
  255. // the following states may proceed past desired state.
  256. switch status.State {
  257. case api.TaskStatePreparing:
  258. if err := ctlr.Prepare(ctx); err != nil && err != ErrTaskPrepared {
  259. return fatal(err)
  260. }
  261. return transition(api.TaskStateReady, "prepared")
  262. case api.TaskStateStarting:
  263. if err := ctlr.Start(ctx); err != nil && err != ErrTaskStarted {
  264. return fatal(err)
  265. }
  266. return transition(api.TaskStateRunning, "started")
  267. case api.TaskStateRunning:
  268. if err := ctlr.Wait(ctx); err != nil {
  269. return fatal(err)
  270. }
  271. return transition(api.TaskStateCompleted, "finished")
  272. }
  273. // The following represent "pause" states. We can only proceed when the
  274. // desired state is beyond our current state.
  275. if status.State >= task.DesiredState {
  276. return noop()
  277. }
  278. switch status.State {
  279. case api.TaskStateNew, api.TaskStatePending, api.TaskStateAssigned:
  280. return transition(api.TaskStateAccepted, "accepted")
  281. case api.TaskStateAccepted:
  282. return transition(api.TaskStatePreparing, "preparing")
  283. case api.TaskStateReady:
  284. return transition(api.TaskStateStarting, "starting")
  285. default: // terminal states
  286. return noop()
  287. }
  288. }
  289. func logStateChange(ctx context.Context, desired, previous, next api.TaskState) {
  290. if previous != next {
  291. fields := logrus.Fields{
  292. "state.transition": fmt.Sprintf("%v->%v", previous, next),
  293. "state.desired": desired,
  294. }
  295. log.G(ctx).WithFields(fields).Debug("state changed")
  296. }
  297. }
  298. func contextDoneError(err error) bool {
  299. cause := errors.Cause(err)
  300. return cause == context.Canceled || cause == context.DeadlineExceeded
  301. }