states.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package swarm
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/docker/docker/api/types"
  6. "github.com/docker/docker/api/types/filters"
  7. swarmtypes "github.com/docker/docker/api/types/swarm"
  8. "github.com/docker/docker/client"
  9. "gotest.tools/v3/poll"
  10. )
  11. // NoTasksForService verifies that there are no more tasks for the given service
  12. func NoTasksForService(ctx context.Context, client client.ServiceAPIClient, serviceID string) func(log poll.LogT) poll.Result {
  13. return func(log poll.LogT) poll.Result {
  14. tasks, err := client.TaskList(ctx, types.TaskListOptions{
  15. Filters: filters.NewArgs(
  16. filters.Arg("service", serviceID),
  17. ),
  18. })
  19. if err == nil {
  20. if len(tasks) == 0 {
  21. return poll.Success()
  22. }
  23. if len(tasks) > 0 {
  24. return poll.Continue("task count for service %s at %d waiting for 0", serviceID, len(tasks))
  25. }
  26. return poll.Continue("waiting for tasks for service %s to be deleted", serviceID)
  27. }
  28. // TODO we should not use an error as indication that the tasks are gone. There may be other reasons for an error to occur.
  29. return poll.Success()
  30. }
  31. }
  32. // NoTasks verifies that all tasks are gone
  33. func NoTasks(ctx context.Context, client client.ServiceAPIClient) func(log poll.LogT) poll.Result {
  34. return func(log poll.LogT) poll.Result {
  35. tasks, err := client.TaskList(ctx, types.TaskListOptions{})
  36. switch {
  37. case err != nil:
  38. return poll.Error(err)
  39. case len(tasks) == 0:
  40. return poll.Success()
  41. default:
  42. return poll.Continue("waiting for all tasks to be removed: task count at %d", len(tasks))
  43. }
  44. }
  45. }
  46. // RunningTasksCount verifies there are `instances` tasks running for `serviceID`
  47. func RunningTasksCount(client client.ServiceAPIClient, serviceID string, instances uint64) func(log poll.LogT) poll.Result {
  48. return func(log poll.LogT) poll.Result {
  49. filter := filters.NewArgs()
  50. filter.Add("service", serviceID)
  51. tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
  52. Filters: filter,
  53. })
  54. var running int
  55. var taskError string
  56. for _, task := range tasks {
  57. switch task.Status.State {
  58. case swarmtypes.TaskStateRunning:
  59. running++
  60. case swarmtypes.TaskStateFailed:
  61. if task.Status.Err != "" {
  62. taskError = task.Status.Err
  63. }
  64. }
  65. }
  66. switch {
  67. case err != nil:
  68. return poll.Error(err)
  69. case running > int(instances):
  70. return poll.Continue("waiting for tasks to terminate")
  71. case running < int(instances) && taskError != "":
  72. return poll.Continue("waiting for tasks to enter run state. task failed with error: %s", taskError)
  73. case running == int(instances):
  74. return poll.Success()
  75. default:
  76. return poll.Continue("running task count at %d waiting for %d (total tasks: %d)", running, instances, len(tasks))
  77. }
  78. }
  79. }
  80. // JobComplete is a poll function for determining that a ReplicatedJob is
  81. // completed additionally, while polling, it verifies that the job never
  82. // exceeds MaxConcurrent running tasks
  83. func JobComplete(client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result {
  84. filter := filters.NewArgs()
  85. filter.Add("service", service.ID)
  86. var jobIteration swarmtypes.Version
  87. if service.JobStatus != nil {
  88. jobIteration = service.JobStatus.JobIteration
  89. }
  90. maxRaw := service.Spec.Mode.ReplicatedJob.MaxConcurrent
  91. totalRaw := service.Spec.Mode.ReplicatedJob.TotalCompletions
  92. max := int(*maxRaw)
  93. total := int(*totalRaw)
  94. previousResult := ""
  95. return func(log poll.LogT) poll.Result {
  96. tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
  97. Filters: filter,
  98. })
  99. if err != nil {
  100. poll.Error(err)
  101. }
  102. var running int
  103. var completed int
  104. var runningSlot []int
  105. var runningID []string
  106. for _, task := range tasks {
  107. // make sure the task has the same job iteration
  108. if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index {
  109. continue
  110. }
  111. switch task.Status.State {
  112. case swarmtypes.TaskStateRunning:
  113. running++
  114. runningSlot = append(runningSlot, task.Slot)
  115. runningID = append(runningID, task.ID)
  116. case swarmtypes.TaskStateComplete:
  117. completed++
  118. }
  119. }
  120. switch {
  121. case running > max:
  122. return poll.Error(fmt.Errorf(
  123. "number of running tasks (%v) exceeds max (%v)", running, max,
  124. ))
  125. case (completed + running) > total:
  126. return poll.Error(fmt.Errorf(
  127. "number of tasks exceeds total (%v), %v running and %v completed",
  128. total, running, completed,
  129. ))
  130. case completed == total && running == 0:
  131. return poll.Success()
  132. default:
  133. newRes := fmt.Sprintf(
  134. "Completed: %2d Running: %v\n\t%v",
  135. completed, runningSlot, runningID,
  136. )
  137. if newRes == previousResult {
  138. } else {
  139. previousResult = newRes
  140. }
  141. return poll.Continue(
  142. "Job not yet finished, %v completed and %v running out of %v total",
  143. completed, running, total,
  144. )
  145. }
  146. }
  147. }