utils.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package container
  2. import (
  3. "strconv"
  4. "golang.org/x/net/context"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/docker/api/types"
  7. "github.com/docker/docker/api/types/events"
  8. "github.com/docker/docker/api/types/filters"
  9. "github.com/docker/docker/cli/command"
  10. clientapi "github.com/docker/docker/client"
  11. )
  12. func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int {
  13. if len(containerID) == 0 {
  14. // containerID can never be empty
  15. panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
  16. }
  17. statusChan := make(chan int)
  18. exitCode := 125
  19. eventProcessor := func(e events.Message) bool {
  20. stopProcessing := false
  21. switch e.Status {
  22. case "die":
  23. if v, ok := e.Actor.Attributes["exitCode"]; ok {
  24. code, cerr := strconv.Atoi(v)
  25. if cerr != nil {
  26. logrus.Errorf("failed to convert exitcode '%q' to int: %v", v, cerr)
  27. } else {
  28. exitCode = code
  29. }
  30. }
  31. if !waitRemove {
  32. stopProcessing = true
  33. }
  34. case "detach":
  35. exitCode = 0
  36. stopProcessing = true
  37. case "destroy":
  38. stopProcessing = true
  39. }
  40. if stopProcessing {
  41. statusChan <- exitCode
  42. return true
  43. }
  44. return false
  45. }
  46. // Get events via Events API
  47. f := filters.NewArgs()
  48. f.Add("type", "container")
  49. f.Add("container", containerID)
  50. options := types.EventsOptions{
  51. Filters: f,
  52. }
  53. eventCtx, cancel := context.WithCancel(ctx)
  54. eventq, errq := dockerCli.Client().Events(eventCtx, options)
  55. go func() {
  56. defer cancel()
  57. for {
  58. select {
  59. case evt := <-eventq:
  60. if eventProcessor(evt) {
  61. return
  62. }
  63. case err := <-errq:
  64. logrus.Errorf("error getting events from daemon: %v", err)
  65. statusChan <- exitCode
  66. return
  67. }
  68. }
  69. }()
  70. return statusChan
  71. }
  72. // getExitCode performs an inspect on the container. It returns
  73. // the running state and the exit code.
  74. func getExitCode(dockerCli *command.DockerCli, ctx context.Context, containerID string) (bool, int, error) {
  75. c, err := dockerCli.Client().ContainerInspect(ctx, containerID)
  76. if err != nil {
  77. // If we can't connect, then the daemon probably died.
  78. if err != clientapi.ErrConnectionFailed {
  79. return false, -1, err
  80. }
  81. return false, -1, nil
  82. }
  83. return c.State.Running, c.State.ExitCode, nil
  84. }
  85. func parallelOperation(ctx context.Context, containers []string, op func(ctx context.Context, container string) error) chan error {
  86. if len(containers) == 0 {
  87. return nil
  88. }
  89. const defaultParallel int = 50
  90. sem := make(chan struct{}, defaultParallel)
  91. errChan := make(chan error)
  92. // make sure result is printed in correct order
  93. output := map[string]chan error{}
  94. for _, c := range containers {
  95. output[c] = make(chan error, 1)
  96. }
  97. go func() {
  98. for _, c := range containers {
  99. err := <-output[c]
  100. errChan <- err
  101. }
  102. }()
  103. go func() {
  104. for _, c := range containers {
  105. sem <- struct{}{} // Wait for active queue sem to drain.
  106. go func(container string) {
  107. output[container] <- op(ctx, container)
  108. <-sem
  109. }(c)
  110. }
  111. }()
  112. return errChan
  113. }