controller.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package exec
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/api/equality"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/protobuf/ptypes"
  10. "github.com/pkg/errors"
  11. "golang.org/x/net/context"
  12. )
  13. // Controller controls execution of a task.
  14. type Controller interface {
  15. // Update the task definition seen by the controller. Will return
  16. // ErrTaskUpdateFailed if the provided task definition changes fields that
  17. // cannot be changed.
  18. //
  19. // Will be ignored if the task has exited.
  20. Update(ctx context.Context, t *api.Task) error
  21. // Prepare the task for execution. This should ensure that all resources
  22. // are created such that a call to start should execute immediately.
  23. Prepare(ctx context.Context) error
  24. // Start the target and return when it has started successfully.
  25. Start(ctx context.Context) error
  26. // Wait blocks until the target has exited.
  27. Wait(ctx context.Context) error
  28. // Shutdown requests to exit the target gracefully.
  29. Shutdown(ctx context.Context) error
  30. // Terminate the target.
  31. Terminate(ctx context.Context) error
  32. // Remove all resources allocated by the controller.
  33. Remove(ctx context.Context) error
  34. // Close closes any ephemeral resources associated with controller instance.
  35. Close() error
  36. }
  37. // ControllerLogs defines a component that makes logs accessible.
  38. //
  39. // Can usually be accessed on a controller instance via type assertion.
  40. type ControllerLogs interface {
  41. // Logs will write publisher until the context is cancelled or an error
  42. // occurs.
  43. Logs(ctx context.Context, publisher LogPublisher, options api.LogSubscriptionOptions) error
  44. }
  45. // LogPublisher defines the protocol for receiving a log message.
  46. type LogPublisher interface {
  47. Publish(ctx context.Context, message api.LogMessage) error
  48. }
  49. // LogPublisherFunc implements publisher with just a function.
  50. type LogPublisherFunc func(ctx context.Context, message api.LogMessage) error
  51. // Publish calls the wrapped function.
  52. func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) error {
  53. return fn(ctx, message)
  54. }
  55. // LogPublisherProvider defines the protocol for receiving a log publisher
  56. type LogPublisherProvider interface {
  57. Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
  58. }
  59. // ContainerStatuser reports status of a container.
  60. //
  61. // This can be implemented by controllers or error types.
  62. type ContainerStatuser interface {
  63. // ContainerStatus returns the status of the target container, if
  64. // available. When the container is not available, the status will be nil.
  65. ContainerStatus(ctx context.Context) (*api.ContainerStatus, error)
  66. }
  67. // PortStatuser reports status of ports which are allocated by the executor
  68. type PortStatuser interface {
  69. // PortStatus returns the status on a list of PortConfigs
  70. // which are managed at the host level by the controller.
  71. PortStatus(ctx context.Context) (*api.PortStatus, error)
  72. }
  73. // Resolve attempts to get a controller from the executor and reports the
  74. // correct status depending on the tasks current state according to the result.
  75. //
  76. // Unlike Do, if an error is returned, the status should still be reported. The
  77. // error merely reports the failure at getting the controller.
  78. func Resolve(ctx context.Context, task *api.Task, executor Executor) (Controller, *api.TaskStatus, error) {
  79. status := task.Status.Copy()
  80. defer func() {
  81. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  82. }()
  83. ctlr, err := executor.Controller(task)
  84. // depending on the tasks state, a failed controller resolution has varying
  85. // impact. The following expresses that impact.
  86. if task.Status.State < api.TaskStateStarting {
  87. if err != nil {
  88. // before the task has been started, we consider it a rejection.
  89. status.Message = "resolving controller failed"
  90. status.Err = err.Error()
  91. status.State = api.TaskStateRejected
  92. } else if task.Status.State < api.TaskStateAccepted {
  93. // we always want to proceed to accepted when we resolve the contoller
  94. status.Message = "accepted"
  95. status.State = api.TaskStateAccepted
  96. }
  97. }
  98. return ctlr, status, err
  99. }
  100. // Do progresses the task state using the controller performing a single
  101. // operation on the controller. The return TaskStatus should be marked as the
  102. // new state of the task.
  103. //
  104. // The returned status should be reported and placed back on to task
  105. // before the next call. The operation can be cancelled by creating a
  106. // cancelling context.
  107. //
  108. // Errors from the task controller will reported on the returned status. Any
  109. // errors coming from this function should not be reported as related to the
  110. // individual task.
  111. //
  112. // If ErrTaskNoop is returned, it means a second call to Do will result in no
  113. // change. If ErrTaskDead is returned, calls to Do will no longer result in any
  114. // action.
  115. func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus, error) {
  116. status := task.Status.Copy()
  117. // stay in the current state.
  118. noop := func(errs ...error) (*api.TaskStatus, error) {
  119. return status, ErrTaskNoop
  120. }
  121. retry := func() (*api.TaskStatus, error) {
  122. // while we retry on all errors, this allows us to explicitly declare
  123. // retry cases.
  124. return status, ErrTaskRetry
  125. }
  126. // transition moves the task to the next state.
  127. transition := func(state api.TaskState, msg string) (*api.TaskStatus, error) {
  128. current := status.State
  129. status.State = state
  130. status.Message = msg
  131. if current > state {
  132. panic("invalid state transition")
  133. }
  134. return status, nil
  135. }
  136. // containerStatus exitCode keeps track of whether or not we've set it in
  137. // this particular method. Eventually, we assemble this as part of a defer.
  138. var (
  139. containerStatus *api.ContainerStatus
  140. portStatus *api.PortStatus
  141. exitCode int
  142. )
  143. // returned when a fatal execution of the task is fatal. In this case, we
  144. // proceed to a terminal error state and set the appropriate fields.
  145. //
  146. // Common checks for the nature of an error should be included here. If the
  147. // error is determined not to be fatal for the task,
  148. fatal := func(err error) (*api.TaskStatus, error) {
  149. if err == nil {
  150. panic("err must not be nil when fatal")
  151. }
  152. if cs, ok := err.(ContainerStatuser); ok {
  153. var err error
  154. containerStatus, err = cs.ContainerStatus(ctx)
  155. if err != nil && !contextDoneError(err) {
  156. log.G(ctx).WithError(err).Error("error resolving container status on fatal")
  157. }
  158. }
  159. // make sure we've set the *correct* exit code
  160. if ec, ok := err.(ExitCoder); ok {
  161. exitCode = ec.ExitCode()
  162. }
  163. if cause := errors.Cause(err); cause == context.DeadlineExceeded || cause == context.Canceled {
  164. return retry()
  165. }
  166. status.Err = err.Error() // still reported on temporary
  167. if IsTemporary(err) {
  168. return retry()
  169. }
  170. // only at this point do we consider the error fatal to the task.
  171. log.G(ctx).WithError(err).Error("fatal task error")
  172. // NOTE(stevvooe): The following switch dictates the terminal failure
  173. // state based on the state in which the failure was encountered.
  174. switch {
  175. case status.State < api.TaskStateStarting:
  176. status.State = api.TaskStateRejected
  177. case status.State >= api.TaskStateStarting:
  178. status.State = api.TaskStateFailed
  179. }
  180. return status, nil
  181. }
  182. // below, we have several callbacks that are run after the state transition
  183. // is completed.
  184. defer func() {
  185. logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
  186. if !equality.TaskStatusesEqualStable(status, &task.Status) {
  187. status.Timestamp = ptypes.MustTimestampProto(time.Now())
  188. }
  189. }()
  190. // extract the container status from the container, if supported.
  191. defer func() {
  192. // only do this if in an active state
  193. if status.State < api.TaskStateStarting {
  194. return
  195. }
  196. if containerStatus == nil {
  197. // collect this, if we haven't
  198. cctlr, ok := ctlr.(ContainerStatuser)
  199. if !ok {
  200. return
  201. }
  202. var err error
  203. containerStatus, err = cctlr.ContainerStatus(ctx)
  204. if err != nil && !contextDoneError(err) {
  205. log.G(ctx).WithError(err).Error("container status unavailable")
  206. }
  207. // at this point, things have gone fairly wrong. Remain positive
  208. // and let's get something out the door.
  209. if containerStatus == nil {
  210. containerStatus = new(api.ContainerStatus)
  211. containerStatusTask := task.Status.GetContainer()
  212. if containerStatusTask != nil {
  213. *containerStatus = *containerStatusTask // copy it over.
  214. }
  215. }
  216. }
  217. // at this point, we *must* have a containerStatus.
  218. if exitCode != 0 {
  219. containerStatus.ExitCode = int32(exitCode)
  220. }
  221. status.RuntimeStatus = &api.TaskStatus_Container{
  222. Container: containerStatus,
  223. }
  224. if portStatus == nil {
  225. pctlr, ok := ctlr.(PortStatuser)
  226. if !ok {
  227. return
  228. }
  229. var err error
  230. portStatus, err = pctlr.PortStatus(ctx)
  231. if err != nil && !contextDoneError(err) {
  232. log.G(ctx).WithError(err).Error("container port status unavailable")
  233. }
  234. }
  235. status.PortStatus = portStatus
  236. }()
  237. if task.DesiredState == api.TaskStateShutdown {
  238. if status.State >= api.TaskStateCompleted {
  239. return noop()
  240. }
  241. if err := ctlr.Shutdown(ctx); err != nil {
  242. return fatal(err)
  243. }
  244. return transition(api.TaskStateShutdown, "shutdown")
  245. }
  246. if status.State > task.DesiredState {
  247. return noop() // way beyond desired state, pause
  248. }
  249. // the following states may proceed past desired state.
  250. switch status.State {
  251. case api.TaskStatePreparing:
  252. if err := ctlr.Prepare(ctx); err != nil && err != ErrTaskPrepared {
  253. return fatal(err)
  254. }
  255. return transition(api.TaskStateReady, "prepared")
  256. case api.TaskStateStarting:
  257. if err := ctlr.Start(ctx); err != nil && err != ErrTaskStarted {
  258. return fatal(err)
  259. }
  260. return transition(api.TaskStateRunning, "started")
  261. case api.TaskStateRunning:
  262. if err := ctlr.Wait(ctx); err != nil {
  263. return fatal(err)
  264. }
  265. return transition(api.TaskStateCompleted, "finished")
  266. }
  267. // The following represent "pause" states. We can only proceed when the
  268. // desired state is beyond our current state.
  269. if status.State >= task.DesiredState {
  270. return noop()
  271. }
  272. switch status.State {
  273. case api.TaskStateNew, api.TaskStatePending, api.TaskStateAssigned:
  274. return transition(api.TaskStateAccepted, "accepted")
  275. case api.TaskStateAccepted:
  276. return transition(api.TaskStatePreparing, "preparing")
  277. case api.TaskStateReady:
  278. return transition(api.TaskStateStarting, "starting")
  279. default: // terminal states
  280. return noop()
  281. }
  282. }
  283. func logStateChange(ctx context.Context, desired, previous, next api.TaskState) {
  284. if previous != next {
  285. fields := logrus.Fields{
  286. "state.transition": fmt.Sprintf("%v->%v", previous, next),
  287. "state.desired": desired,
  288. }
  289. log.G(ctx).WithFields(fields).Debug("state changed")
  290. }
  291. }
  292. func contextDoneError(err error) bool {
  293. cause := errors.Cause(err)
  294. return cause == context.Canceled || cause == context.DeadlineExceeded
  295. }