states.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package swarm
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/docker/docker/api/types"
  6. "github.com/docker/docker/api/types/filters"
  7. swarmtypes "github.com/docker/docker/api/types/swarm"
  8. "github.com/docker/docker/client"
  9. "gotest.tools/v3/poll"
  10. )
  11. // NoTasksForService verifies that there are no more tasks for the given service
  12. func NoTasksForService(ctx context.Context, client client.ServiceAPIClient, serviceID string) func(log poll.LogT) poll.Result {
  13. return func(log poll.LogT) poll.Result {
  14. tasks, err := client.TaskList(ctx, types.TaskListOptions{
  15. Filters: filters.NewArgs(
  16. filters.Arg("service", serviceID),
  17. ),
  18. })
  19. if err == nil {
  20. if len(tasks) == 0 {
  21. return poll.Success()
  22. }
  23. if len(tasks) > 0 {
  24. return poll.Continue("task count for service %s at %d waiting for 0", serviceID, len(tasks))
  25. }
  26. return poll.Continue("waiting for tasks for service %s to be deleted", serviceID)
  27. }
  28. // TODO we should not use an error as indication that the tasks are gone. There may be other reasons for an error to occur.
  29. return poll.Success()
  30. }
  31. }
  32. // NoTasks verifies that all tasks are gone
  33. func NoTasks(ctx context.Context, client client.ServiceAPIClient) func(log poll.LogT) poll.Result {
  34. return func(log poll.LogT) poll.Result {
  35. tasks, err := client.TaskList(ctx, types.TaskListOptions{})
  36. switch {
  37. case err != nil:
  38. return poll.Error(err)
  39. case len(tasks) == 0:
  40. return poll.Success()
  41. default:
  42. return poll.Continue("waiting for all tasks to be removed: task count at %d", len(tasks))
  43. }
  44. }
  45. }
  46. // RunningTasksCount verifies there are `instances` tasks running for `serviceID`
  47. func RunningTasksCount(ctx context.Context, client client.ServiceAPIClient, serviceID string, instances uint64) func(log poll.LogT) poll.Result {
  48. return func(log poll.LogT) poll.Result {
  49. filter := filters.NewArgs()
  50. filter.Add("service", serviceID)
  51. tasks, err := client.TaskList(ctx, types.TaskListOptions{
  52. Filters: filter,
  53. })
  54. var running int
  55. var taskError string
  56. for _, task := range tasks {
  57. switch task.Status.State {
  58. case swarmtypes.TaskStateRunning:
  59. running++
  60. case swarmtypes.TaskStateFailed:
  61. if task.Status.Err != "" {
  62. taskError = task.Status.Err
  63. }
  64. }
  65. }
  66. switch {
  67. case err != nil:
  68. return poll.Error(err)
  69. case running > int(instances):
  70. return poll.Continue("waiting for tasks to terminate")
  71. case running < int(instances) && taskError != "":
  72. return poll.Continue("waiting for tasks to enter run state. task failed with error: %s", taskError)
  73. case running == int(instances):
  74. return poll.Success()
  75. default:
  76. return poll.Continue("running task count at %d waiting for %d (total tasks: %d)", running, instances, len(tasks))
  77. }
  78. }
  79. }
  80. // JobComplete is a poll function for determining that a ReplicatedJob is
  81. // completed additionally, while polling, it verifies that the job never
  82. // exceeds MaxConcurrent running tasks
  83. func JobComplete(ctx context.Context, client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result {
  84. filter := filters.NewArgs(filters.Arg("service", service.ID))
  85. var jobIteration swarmtypes.Version
  86. if service.JobStatus != nil {
  87. jobIteration = service.JobStatus.JobIteration
  88. }
  89. maxConcurrent := int(*service.Spec.Mode.ReplicatedJob.MaxConcurrent)
  90. totalCompletions := int(*service.Spec.Mode.ReplicatedJob.TotalCompletions)
  91. previousResult := ""
  92. return func(log poll.LogT) poll.Result {
  93. tasks, err := client.TaskList(ctx, types.TaskListOptions{
  94. Filters: filter,
  95. })
  96. if err != nil {
  97. poll.Error(err)
  98. }
  99. var running int
  100. var completed int
  101. var runningSlot []int
  102. var runningID []string
  103. for _, task := range tasks {
  104. // make sure the task has the same job iteration
  105. if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index {
  106. continue
  107. }
  108. switch task.Status.State {
  109. case swarmtypes.TaskStateRunning:
  110. running++
  111. runningSlot = append(runningSlot, task.Slot)
  112. runningID = append(runningID, task.ID)
  113. case swarmtypes.TaskStateComplete:
  114. completed++
  115. }
  116. }
  117. switch {
  118. case running > maxConcurrent:
  119. return poll.Error(fmt.Errorf(
  120. "number of running tasks (%v) exceeds max (%v)", running, maxConcurrent,
  121. ))
  122. case (completed + running) > totalCompletions:
  123. return poll.Error(fmt.Errorf(
  124. "number of tasks exceeds total (%v), %v running and %v completed",
  125. totalCompletions, running, completed,
  126. ))
  127. case completed == totalCompletions && running == 0:
  128. return poll.Success()
  129. default:
  130. newRes := fmt.Sprintf(
  131. "Completed: %2d Running: %v\n\t%v",
  132. completed, runningSlot, runningID,
  133. )
  134. if newRes == previousResult {
  135. } else {
  136. previousResult = newRes
  137. }
  138. return poll.Continue(
  139. "Job not yet finished, %v completed and %v running out of %v total",
  140. completed, running, totalCompletions,
  141. )
  142. }
  143. }
  144. }