states.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package swarm
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/docker/docker/api/types"
  6. "github.com/docker/docker/api/types/filters"
  7. swarmtypes "github.com/docker/docker/api/types/swarm"
  8. "github.com/docker/docker/client"
  9. "gotest.tools/v3/poll"
  10. )
  11. // NoTasksForService verifies that there are no more tasks for the given service
  12. func NoTasksForService(ctx context.Context, client client.ServiceAPIClient, serviceID string) func(log poll.LogT) poll.Result {
  13. return func(log poll.LogT) poll.Result {
  14. tasks, err := client.TaskList(ctx, types.TaskListOptions{
  15. Filters: filters.NewArgs(
  16. filters.Arg("service", serviceID),
  17. ),
  18. })
  19. if err == nil {
  20. if len(tasks) == 0 {
  21. return poll.Success()
  22. }
  23. if len(tasks) > 0 {
  24. return poll.Continue("task count for service %s at %d waiting for 0", serviceID, len(tasks))
  25. }
  26. return poll.Continue("waiting for tasks for service %s to be deleted", serviceID)
  27. }
  28. // TODO we should not use an error as indication that the tasks are gone. There may be other reasons for an error to occur.
  29. return poll.Success()
  30. }
  31. }
  32. // NoTasks verifies that all tasks are gone
  33. func NoTasks(ctx context.Context, client client.ServiceAPIClient) func(log poll.LogT) poll.Result {
  34. return func(log poll.LogT) poll.Result {
  35. tasks, err := client.TaskList(ctx, types.TaskListOptions{})
  36. switch {
  37. case err != nil:
  38. return poll.Error(err)
  39. case len(tasks) == 0:
  40. return poll.Success()
  41. default:
  42. return poll.Continue("waiting for all tasks to be removed: task count at %d", len(tasks))
  43. }
  44. }
  45. }
  46. // RunningTasksCount verifies there are `instances` tasks running for `serviceID`
  47. func RunningTasksCount(client client.ServiceAPIClient, serviceID string, instances uint64) func(log poll.LogT) poll.Result {
  48. return func(log poll.LogT) poll.Result {
  49. filter := filters.NewArgs()
  50. filter.Add("service", serviceID)
  51. tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
  52. Filters: filter,
  53. })
  54. var running int
  55. var taskError string
  56. for _, task := range tasks {
  57. switch task.Status.State {
  58. case swarmtypes.TaskStateRunning:
  59. running++
  60. case swarmtypes.TaskStateFailed:
  61. if task.Status.Err != "" {
  62. taskError = task.Status.Err
  63. }
  64. }
  65. }
  66. switch {
  67. case err != nil:
  68. return poll.Error(err)
  69. case running > int(instances):
  70. return poll.Continue("waiting for tasks to terminate")
  71. case running < int(instances) && taskError != "":
  72. return poll.Continue("waiting for tasks to enter run state. task failed with error: %s", taskError)
  73. case running == int(instances):
  74. return poll.Success()
  75. default:
  76. return poll.Continue("running task count at %d waiting for %d (total tasks: %d)", running, instances, len(tasks))
  77. }
  78. }
  79. }
  80. // JobComplete is a poll function for determining that a ReplicatedJob is
  81. // completed additionally, while polling, it verifies that the job never
  82. // exceeds MaxConcurrent running tasks
  83. func JobComplete(client client.CommonAPIClient, service swarmtypes.Service) func(log poll.LogT) poll.Result {
  84. filter := filters.NewArgs(filters.Arg("service", service.ID))
  85. var jobIteration swarmtypes.Version
  86. if service.JobStatus != nil {
  87. jobIteration = service.JobStatus.JobIteration
  88. }
  89. maxRaw := service.Spec.Mode.ReplicatedJob.MaxConcurrent
  90. totalRaw := service.Spec.Mode.ReplicatedJob.TotalCompletions
  91. max := int(*maxRaw)
  92. total := int(*totalRaw)
  93. previousResult := ""
  94. return func(log poll.LogT) poll.Result {
  95. tasks, err := client.TaskList(context.Background(), types.TaskListOptions{
  96. Filters: filter,
  97. })
  98. if err != nil {
  99. poll.Error(err)
  100. }
  101. var running int
  102. var completed int
  103. var runningSlot []int
  104. var runningID []string
  105. for _, task := range tasks {
  106. // make sure the task has the same job iteration
  107. if task.JobIteration == nil || task.JobIteration.Index != jobIteration.Index {
  108. continue
  109. }
  110. switch task.Status.State {
  111. case swarmtypes.TaskStateRunning:
  112. running++
  113. runningSlot = append(runningSlot, task.Slot)
  114. runningID = append(runningID, task.ID)
  115. case swarmtypes.TaskStateComplete:
  116. completed++
  117. }
  118. }
  119. switch {
  120. case running > max:
  121. return poll.Error(fmt.Errorf(
  122. "number of running tasks (%v) exceeds max (%v)", running, max,
  123. ))
  124. case (completed + running) > total:
  125. return poll.Error(fmt.Errorf(
  126. "number of tasks exceeds total (%v), %v running and %v completed",
  127. total, running, completed,
  128. ))
  129. case completed == total && running == 0:
  130. return poll.Success()
  131. default:
  132. newRes := fmt.Sprintf(
  133. "Completed: %2d Running: %v\n\t%v",
  134. completed, runningSlot, runningID,
  135. )
  136. if newRes == previousResult {
  137. } else {
  138. previousResult = newRes
  139. }
  140. return poll.Continue(
  141. "Job not yet finished, %v completed and %v running out of %v total",
  142. completed, running, total,
  143. )
  144. }
  145. }
  146. }