progress.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package progress
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/signal"
  8. "time"
  9. "github.com/docker/docker/api/types"
  10. "github.com/docker/docker/api/types/filters"
  11. "github.com/docker/docker/api/types/swarm"
  12. "github.com/docker/docker/client"
  13. "github.com/docker/docker/pkg/progress"
  14. "github.com/docker/docker/pkg/streamformatter"
  15. "github.com/docker/docker/pkg/stringid"
  16. "golang.org/x/net/context"
  17. )
  18. var (
  19. numberedStates = map[swarm.TaskState]int64{
  20. swarm.TaskStateNew: 1,
  21. swarm.TaskStateAllocated: 2,
  22. swarm.TaskStatePending: 3,
  23. swarm.TaskStateAssigned: 4,
  24. swarm.TaskStateAccepted: 5,
  25. swarm.TaskStatePreparing: 6,
  26. swarm.TaskStateReady: 7,
  27. swarm.TaskStateStarting: 8,
  28. swarm.TaskStateRunning: 9,
  29. }
  30. longestState int
  31. )
  32. const (
  33. maxProgress = 9
  34. maxProgressBars = 20
  35. )
  36. type progressUpdater interface {
  37. update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
  38. }
  39. func init() {
  40. for state := range numberedStates {
  41. if len(state) > longestState {
  42. longestState = len(state)
  43. }
  44. }
  45. }
  46. func stateToProgress(state swarm.TaskState, rollback bool) int64 {
  47. if !rollback {
  48. return numberedStates[state]
  49. }
  50. return int64(len(numberedStates)) - numberedStates[state]
  51. }
  52. // ServiceProgress outputs progress information for convergence of a service.
  53. func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
  54. defer progressWriter.Close()
  55. progressOut := streamformatter.NewJSONProgressOutput(progressWriter, false)
  56. sigint := make(chan os.Signal, 1)
  57. signal.Notify(sigint, os.Interrupt)
  58. defer signal.Stop(sigint)
  59. taskFilter := filters.NewArgs()
  60. taskFilter.Add("service", serviceID)
  61. taskFilter.Add("_up-to-date", "true")
  62. getUpToDateTasks := func() ([]swarm.Task, error) {
  63. return client.TaskList(ctx, types.TaskListOptions{Filters: taskFilter})
  64. }
  65. var (
  66. updater progressUpdater
  67. converged bool
  68. convergedAt time.Time
  69. monitor = 5 * time.Second
  70. rollback bool
  71. )
  72. for {
  73. service, _, err := client.ServiceInspectWithRaw(ctx, serviceID, types.ServiceInspectOptions{})
  74. if err != nil {
  75. return err
  76. }
  77. if service.Spec.UpdateConfig != nil && service.Spec.UpdateConfig.Monitor != 0 {
  78. monitor = service.Spec.UpdateConfig.Monitor
  79. }
  80. if updater == nil {
  81. updater, err = initializeUpdater(service, progressOut)
  82. if err != nil {
  83. return err
  84. }
  85. }
  86. if service.UpdateStatus != nil {
  87. switch service.UpdateStatus.State {
  88. case swarm.UpdateStateUpdating:
  89. rollback = false
  90. case swarm.UpdateStateCompleted:
  91. if !converged {
  92. return nil
  93. }
  94. case swarm.UpdateStatePaused:
  95. return fmt.Errorf("service update paused: %s", service.UpdateStatus.Message)
  96. case swarm.UpdateStateRollbackStarted:
  97. if !rollback && service.UpdateStatus.Message != "" {
  98. progressOut.WriteProgress(progress.Progress{
  99. ID: "rollback",
  100. Action: service.UpdateStatus.Message,
  101. })
  102. }
  103. rollback = true
  104. case swarm.UpdateStateRollbackPaused:
  105. return fmt.Errorf("service rollback paused: %s", service.UpdateStatus.Message)
  106. case swarm.UpdateStateRollbackCompleted:
  107. if !converged {
  108. return fmt.Errorf("service rolled back: %s", service.UpdateStatus.Message)
  109. }
  110. }
  111. }
  112. if converged && time.Since(convergedAt) >= monitor {
  113. return nil
  114. }
  115. tasks, err := getUpToDateTasks()
  116. if err != nil {
  117. return err
  118. }
  119. activeNodes, err := getActiveNodes(ctx, client)
  120. if err != nil {
  121. return err
  122. }
  123. converged, err = updater.update(service, tasks, activeNodes, rollback)
  124. if err != nil {
  125. return err
  126. }
  127. if converged {
  128. if convergedAt.IsZero() {
  129. convergedAt = time.Now()
  130. }
  131. wait := monitor - time.Since(convergedAt)
  132. if wait >= 0 {
  133. progressOut.WriteProgress(progress.Progress{
  134. // Ideally this would have no ID, but
  135. // the progress rendering code behaves
  136. // poorly on an "action" with no ID. It
  137. // returns the cursor to the beginning
  138. // of the line, so the first character
  139. // may be difficult to read. Then the
  140. // output is overwritten by the shell
  141. // prompt when the command finishes.
  142. ID: "verify",
  143. Action: fmt.Sprintf("Waiting %d seconds to verify that tasks are stable...", wait/time.Second+1),
  144. })
  145. }
  146. } else {
  147. if !convergedAt.IsZero() {
  148. progressOut.WriteProgress(progress.Progress{
  149. ID: "verify",
  150. Action: "Detected task failure",
  151. })
  152. }
  153. convergedAt = time.Time{}
  154. }
  155. select {
  156. case <-time.After(200 * time.Millisecond):
  157. case <-sigint:
  158. if !converged {
  159. progress.Message(progressOut, "", "Operation continuing in background.")
  160. progress.Messagef(progressOut, "", "Use `docker service ps %s` to check progress.", serviceID)
  161. }
  162. return nil
  163. }
  164. }
  165. }
  166. func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
  167. nodes, err := client.NodeList(ctx, types.NodeListOptions{})
  168. if err != nil {
  169. return nil, err
  170. }
  171. activeNodes := make(map[string]swarm.Node)
  172. for _, n := range nodes {
  173. if n.Status.State != swarm.NodeStateDown {
  174. activeNodes[n.ID] = n
  175. }
  176. }
  177. return activeNodes, nil
  178. }
  179. func initializeUpdater(service swarm.Service, progressOut progress.Output) (progressUpdater, error) {
  180. if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
  181. return &replicatedProgressUpdater{
  182. progressOut: progressOut,
  183. }, nil
  184. }
  185. if service.Spec.Mode.Global != nil {
  186. return &globalProgressUpdater{
  187. progressOut: progressOut,
  188. }, nil
  189. }
  190. return nil, errors.New("unrecognized service mode")
  191. }
  192. func writeOverallProgress(progressOut progress.Output, numerator, denominator int, rollback bool) {
  193. if rollback {
  194. progressOut.WriteProgress(progress.Progress{
  195. ID: "overall progress",
  196. Action: fmt.Sprintf("rolling back update: %d out of %d tasks", numerator, denominator),
  197. })
  198. return
  199. }
  200. progressOut.WriteProgress(progress.Progress{
  201. ID: "overall progress",
  202. Action: fmt.Sprintf("%d out of %d tasks", numerator, denominator),
  203. })
  204. }
  205. type replicatedProgressUpdater struct {
  206. progressOut progress.Output
  207. // used for maping slots to a contiguous space
  208. // this also causes progress bars to appear in order
  209. slotMap map[int]int
  210. initialized bool
  211. done bool
  212. }
  213. func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
  214. if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
  215. return false, errors.New("no replica count")
  216. }
  217. replicas := *service.Spec.Mode.Replicated.Replicas
  218. if !u.initialized {
  219. u.slotMap = make(map[int]int)
  220. // Draw progress bars in order
  221. writeOverallProgress(u.progressOut, 0, int(replicas), rollback)
  222. if replicas <= maxProgressBars {
  223. for i := uint64(1); i <= replicas; i++ {
  224. progress.Update(u.progressOut, fmt.Sprintf("%d/%d", i, replicas), " ")
  225. }
  226. }
  227. u.initialized = true
  228. }
  229. // If there are multiple tasks with the same slot number, favor the one
  230. // with the *lowest* desired state. This can happen in restart
  231. // scenarios.
  232. tasksBySlot := make(map[int]swarm.Task)
  233. for _, task := range tasks {
  234. if numberedStates[task.DesiredState] == 0 {
  235. continue
  236. }
  237. if existingTask, ok := tasksBySlot[task.Slot]; ok {
  238. if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
  239. continue
  240. }
  241. }
  242. if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
  243. tasksBySlot[task.Slot] = task
  244. }
  245. }
  246. // If we had reached a converged state, check if we are still converged.
  247. if u.done {
  248. for _, task := range tasksBySlot {
  249. if task.Status.State != swarm.TaskStateRunning {
  250. u.done = false
  251. break
  252. }
  253. }
  254. }
  255. running := uint64(0)
  256. for _, task := range tasksBySlot {
  257. mappedSlot := u.slotMap[task.Slot]
  258. if mappedSlot == 0 {
  259. mappedSlot = len(u.slotMap) + 1
  260. u.slotMap[task.Slot] = mappedSlot
  261. }
  262. if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
  263. u.progressOut.WriteProgress(progress.Progress{
  264. ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
  265. Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
  266. Current: stateToProgress(task.Status.State, rollback),
  267. Total: maxProgress,
  268. HideCounts: true,
  269. })
  270. }
  271. if task.Status.State == swarm.TaskStateRunning {
  272. running++
  273. }
  274. }
  275. if !u.done {
  276. writeOverallProgress(u.progressOut, int(running), int(replicas), rollback)
  277. if running == replicas {
  278. u.done = true
  279. }
  280. }
  281. return running == replicas, nil
  282. }
  283. type globalProgressUpdater struct {
  284. progressOut progress.Output
  285. initialized bool
  286. done bool
  287. }
  288. func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
  289. // If there are multiple tasks with the same node ID, favor the one
  290. // with the *lowest* desired state. This can happen in restart
  291. // scenarios.
  292. tasksByNode := make(map[string]swarm.Task)
  293. for _, task := range tasks {
  294. if numberedStates[task.DesiredState] == 0 {
  295. continue
  296. }
  297. if existingTask, ok := tasksByNode[task.NodeID]; ok {
  298. if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
  299. continue
  300. }
  301. }
  302. tasksByNode[task.NodeID] = task
  303. }
  304. // We don't have perfect knowledge of how many nodes meet the
  305. // constraints for this service. But the orchestrator creates tasks
  306. // for all eligible nodes at the same time, so we should see all those
  307. // nodes represented among the up-to-date tasks.
  308. nodeCount := len(tasksByNode)
  309. if !u.initialized {
  310. if nodeCount == 0 {
  311. // Two possibilities: either the orchestrator hasn't created
  312. // the tasks yet, or the service doesn't meet constraints for
  313. // any node. Either way, we wait.
  314. u.progressOut.WriteProgress(progress.Progress{
  315. ID: "overall progress",
  316. Action: "waiting for new tasks",
  317. })
  318. return false, nil
  319. }
  320. writeOverallProgress(u.progressOut, 0, nodeCount, rollback)
  321. u.initialized = true
  322. }
  323. // If we had reached a converged state, check if we are still converged.
  324. if u.done {
  325. for _, task := range tasksByNode {
  326. if task.Status.State != swarm.TaskStateRunning {
  327. u.done = false
  328. break
  329. }
  330. }
  331. }
  332. running := 0
  333. for _, task := range tasksByNode {
  334. if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
  335. if !u.done && nodeCount <= maxProgressBars {
  336. u.progressOut.WriteProgress(progress.Progress{
  337. ID: stringid.TruncateID(node.ID),
  338. Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
  339. Current: stateToProgress(task.Status.State, rollback),
  340. Total: maxProgress,
  341. HideCounts: true,
  342. })
  343. }
  344. if task.Status.State == swarm.TaskStateRunning {
  345. running++
  346. }
  347. }
  348. }
  349. if !u.done {
  350. writeOverallProgress(u.progressOut, running, nodeCount, rollback)
  351. if running == nodeCount {
  352. u.done = true
  353. }
  354. }
  355. return running == nodeCount, nil
  356. }