controller.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package exec
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/api/equality"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/protobuf/ptypes"
  10. "github.com/pkg/errors"
  11. "golang.org/x/net/context"
  12. )
  13. // Controller controls execution of a task.
  14. type Controller interface {
  15. // Update the task definition seen by the controller. Will return
  16. // ErrTaskUpdateFailed if the provided task definition changes fields that
  17. // cannot be changed.
  18. //
  19. // Will be ignored if the task has exited.
  20. Update(ctx context.Context, t *api.Task) error
  21. // Prepare the task for execution. This should ensure that all resources
  22. // are created such that a call to start should execute immediately.
  23. Prepare(ctx context.Context) error
  24. // Start the target and return when it has started successfully.
  25. Start(ctx context.Context) error
  26. // Wait blocks until the target has exited.
  27. Wait(ctx context.Context) error
  28. // Shutdown requests to exit the target gracefully.
  29. Shutdown(ctx context.Context) error
  30. // Terminate the target.
  31. Terminate(ctx context.Context) error
  32. // Remove all resources allocated by the controller.
  33. Remove(ctx context.Context) error
  34. // Close closes any ephemeral resources associated with controller instance.
  35. Close() error
  36. }
  37. // ControllerLogs defines a component that makes logs accessible.
  38. //
  39. // Can usually be accessed on a controller instance via type assertion.
  40. type ControllerLogs interface {
  41. // Logs will write publisher until the context is cancelled or an error
  42. // occurs.
  43. Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
  44. }
  45. // LogPublisher defines the protocol for receiving a log message.
  46. type LogPublisher interface {
  47. Publish(ctx context.Context, message api.LogMessage) error
  48. }
  49. // LogPublisherFunc implements publisher with just a function.
  50. type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
  51. // Publish calls the wrapped function.
  52. func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
  53. return fn(ctx, message)
  54. }
  55. // LogPublisherProvider defines the protocol for receiving a log publisher
  56. type LogPublisherProvider interface {
  57. Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
  58. }
  59. // ContainerStatuser reports status of a container.
  60. //
  61. // This can be implemented by controllers or error types.
  62. type ContainerStatuser interface {
  63. // ContainerStatus returns the status of the target container, if
  64. // available. When the container is not available, the status will be nil.
  65. ContainerStatus(ctx context.Context) (*api.ContainerStatus, error)
  66. }
  67. // PortStatuser reports status of ports which are allocated by the executor
  68. type PortStatuser interface {
  69. // PortStatus returns the status on a list of PortConfigs
  70. // which are managed at the host level by the controller.
  71. PortStatus(ctx context.Context) (*api.PortStatus, error)
  72. }
  73. // Resolve attempts to get a controller from the executor and reports the
  74. // correct status depending on the tasks current state according to the result.
  75. //
  76. // Unlike Do, if an error is returned, the status should still be reported. The
  77. // error merely reports the failure at getting the controller.
  78. func Resolve(ctx context.Context, task *api.Task, executor Executor) (Controller, *api.TaskStatus, error) {
  79. status := task.Status.Copy()
  80. defer func() {
  81. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  82. }()
  83. ctlr, err := executor.Controller(task)
  84. // depending on the tasks state, a failed controller resolution has varying
  85. // impact. The following expresses that impact.
  86. if err != nil {
  87. status.Message = "resolving controller failed"
  88. status.Err = err.Error()
  89. // before the task has been started, we consider it a rejection.
  90. // if task is running, consider the task has failed
  91. // otherwise keep the existing state
  92. if task.Status.State < api.TaskStateStarting {
  93. status.State = api.TaskStateRejected
  94. } else if task.Status.State <= api.TaskStateRunning {
  95. status.State = api.TaskStateFailed
  96. }
  97. } else if task.Status.State < api.TaskStateAccepted {
  98. // we always want to proceed to accepted when we resolve the controller
  99. status.Message = "accepted"
  100. status.State = api.TaskStateAccepted
  101. }
  102. return ctlr, status, err
  103. }
  104. // Do progresses the task state using the controller performing a single
  105. // operation on the controller. The return TaskStatus should be marked as the
  106. // new state of the task.
  107. //
  108. // The returned status should be reported and placed back on to task
  109. // before the next call. The operation can be cancelled by creating a
  110. // cancelling context.
  111. //
  112. // Errors from the task controller will reported on the returned status. Any
  113. // errors coming from this function should not be reported as related to the
  114. // individual task.
  115. //
  116. // If ErrTaskNoop is returned, it means a second call to Do will result in no
  117. // change. If ErrTaskDead is returned, calls to Do will no longer result in any
  118. // action.
  119. func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus, error) {
  120. status := task.Status.Copy()
  121. // stay in the current state.
  122. noop := func(errs ...error) (*api.TaskStatus, error) {
  123. return status, ErrTaskNoop
  124. }
  125. retry := func() (*api.TaskStatus, error) {
  126. // while we retry on all errors, this allows us to explicitly declare
  127. // retry cases.
  128. return status, ErrTaskRetry
  129. }
  130. // transition moves the task to the next state.
  131. transition := func(state api.TaskState, msg string) (*api.TaskStatus, error) {
  132. current := status.State
  133. status.State = state
  134. status.Message = msg
  135. if current > state {
  136. panic("invalid state transition")
  137. }
  138. return status, nil
  139. }
  140. // containerStatus exitCode keeps track of whether or not we've set it in
  141. // this particular method. Eventually, we assemble this as part of a defer.
  142. var (
  143. containerStatus *api.ContainerStatus
  144. portStatus *api.PortStatus
  145. exitCode int
  146. )
  147. // returned when a fatal execution of the task is fatal. In this case, we
  148. // proceed to a terminal error state and set the appropriate fields.
  149. //
  150. // Common checks for the nature of an error should be included here. If the
  151. // error is determined not to be fatal for the task,
  152. fatal := func(err error) (*api.TaskStatus, error) {
  153. if err == nil {
  154. panic("err must not be nil when fatal")
  155. }
  156. if cs, ok := err.(ContainerStatuser); ok {
  157. var err error
  158. containerStatus, err = cs.ContainerStatus(ctx)
  159. if err != nil && !contextDoneError(err) {
  160. log.G(ctx).WithError(err).Error("error resolving container status on fatal")
  161. }
  162. }
  163. // make sure we've set the *correct* exit code
  164. if ec, ok := err.(ExitCoder); ok {
  165. exitCode = ec.ExitCode()
  166. }
  167. if cause := errors.Cause(err); cause == context.DeadlineExceeded || cause == context.Canceled {
  168. return retry()
  169. }
  170. status.Err = err.Error() // still reported on temporary
  171. if IsTemporary(err) {
  172. return retry()
  173. }
  174. // only at this point do we consider the error fatal to the task.
  175. log.G(ctx).WithError(err).Error("fatal task error")
  176. // NOTE(stevvooe): The following switch dictates the terminal failure
  177. // state based on the state in which the failure was encountered.
  178. switch {
  179. case status.State < api.TaskStateStarting:
  180. status.State = api.TaskStateRejected
  181. case status.State >= api.TaskStateStarting:
  182. status.State = api.TaskStateFailed
  183. }
  184. return status, nil
  185. }
  186. // below, we have several callbacks that are run after the state transition
  187. // is completed.
  188. defer func() {
  189. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  190. if !equality.TaskStatusesEqualStable(status, &task.Status) {
  191. status.Timestamp = ptypes.MustTimestampProto(time.Now())
  192. }
  193. }()
  194. // extract the container status from the container, if supported.
  195. defer func() {
  196. // only do this if in an active state
  197. if status.State < api.TaskStateStarting {
  198. return
  199. }
  200. if containerStatus == nil {
  201. // collect this, if we haven't
  202. cctlr, ok := ctlr.(ContainerStatuser)
  203. if !ok {
  204. return
  205. }
  206. var err error
  207. containerStatus, err = cctlr.ContainerStatus(ctx)
  208. if err != nil && !contextDoneError(err) {
  209. log.G(ctx).WithError(err).Error("container status unavailable")
  210. }
  211. // at this point, things have gone fairly wrong. Remain positive
  212. // and let's get something out the door.
  213. if containerStatus == nil {
  214. containerStatus = new(api.ContainerStatus)
  215. containerStatusTask := task.Status.GetContainer()
  216. if containerStatusTask != nil {
  217. *containerStatus = *containerStatusTask // copy it over.
  218. }
  219. }
  220. }
  221. // at this point, we *must* have a containerStatus.
  222. if exitCode != 0 {
  223. containerStatus.ExitCode = int32(exitCode)
  224. }
  225. status.RuntimeStatus = &api.TaskStatus_Container{
  226. Container: containerStatus,
  227. }
  228. if portStatus == nil {
  229. pctlr, ok := ctlr.(PortStatuser)
  230. if !ok {
  231. return
  232. }
  233. var err error
  234. portStatus, err = pctlr.PortStatus(ctx)
  235. if err != nil && !contextDoneError(err) {
  236. log.G(ctx).WithError(err).Error("container port status unavailable")
  237. }
  238. }
  239. status.PortStatus = portStatus
  240. }()
  241. if task.DesiredState == api.TaskStateShutdown {
  242. if status.State >= api.TaskStateCompleted {
  243. return noop()
  244. }
  245. if err := ctlr.Shutdown(ctx); err != nil {
  246. return fatal(err)
  247. }
  248. return transition(api.TaskStateShutdown, "shutdown")
  249. }
  250. if status.State > task.DesiredState {
  251. return noop() // way beyond desired state, pause
  252. }
  253. // the following states may proceed past desired state.
  254. switch status.State {
  255. case api.TaskStatePreparing:
  256. if err := ctlr.Prepare(ctx); err != nil && err != ErrTaskPrepared {
  257. return fatal(err)
  258. }
  259. return transition(api.TaskStateReady, "prepared")
  260. case api.TaskStateStarting:
  261. if err := ctlr.Start(ctx); err != nil && err != ErrTaskStarted {
  262. return fatal(err)
  263. }
  264. return transition(api.TaskStateRunning, "started")
  265. case api.TaskStateRunning:
  266. if err := ctlr.Wait(ctx); err != nil {
  267. return fatal(err)
  268. }
  269. return transition(api.TaskStateCompleted, "finished")
  270. }
  271. // The following represent "pause" states. We can only proceed when the
  272. // desired state is beyond our current state.
  273. if status.State >= task.DesiredState {
  274. return noop()
  275. }
  276. switch status.State {
  277. case api.TaskStateNew, api.TaskStatePending, api.TaskStateAssigned:
  278. return transition(api.TaskStateAccepted, "accepted")
  279. case api.TaskStateAccepted:
  280. return transition(api.TaskStatePreparing, "preparing")
  281. case api.TaskStateReady:
  282. return transition(api.TaskStateStarting, "starting")
  283. default: // terminal states
  284. return noop()
  285. }
  286. }
  287. func logStateChange(ctx context.Context, desired, previous, next api.TaskState) {
  288. if previous != next {
  289. fields := logrus.Fields{
  290. "state.transition": fmt.Sprintf("%v->%v", previous, next),
  291. "state.desired": desired,
  292. }
  293. log.G(ctx).WithFields(fields).Debug("state changed")
  294. }
  295. }
  296. func contextDoneError(err error) bool {
  297. cause := errors.Cause(err)
  298. return cause == context.Canceled || cause == context.DeadlineExceeded
  299. }