utils.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package container
  2. import (
  3. "strconv"
  4. "golang.org/x/net/context"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/docker/api/types"
  7. "github.com/docker/docker/api/types/events"
  8. "github.com/docker/docker/api/types/filters"
  9. "github.com/docker/docker/api/types/versions"
  10. "github.com/docker/docker/cli/command"
  11. clientapi "github.com/docker/docker/client"
  12. )
  13. func waitExitOrRemoved(ctx context.Context, dockerCli *command.DockerCli, containerID string, waitRemove bool) chan int {
  14. if len(containerID) == 0 {
  15. // containerID can never be empty
  16. panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
  17. }
  18. var removeErr error
  19. statusChan := make(chan int)
  20. exitCode := 125
  21. // Get events via Events API
  22. f := filters.NewArgs()
  23. f.Add("type", "container")
  24. f.Add("container", containerID)
  25. options := types.EventsOptions{
  26. Filters: f,
  27. }
  28. eventCtx, cancel := context.WithCancel(ctx)
  29. eventq, errq := dockerCli.Client().Events(eventCtx, options)
  30. eventProcessor := func(e events.Message) bool {
  31. stopProcessing := false
  32. switch e.Status {
  33. case "die":
  34. if v, ok := e.Actor.Attributes["exitCode"]; ok {
  35. code, cerr := strconv.Atoi(v)
  36. if cerr != nil {
  37. logrus.Errorf("failed to convert exitcode '%q' to int: %v", v, cerr)
  38. } else {
  39. exitCode = code
  40. }
  41. }
  42. if !waitRemove {
  43. stopProcessing = true
  44. } else {
  45. // If we are talking to an older daemon, `AutoRemove` is not supported.
  46. // We need to fall back to the old behavior, which is client-side removal
  47. if versions.LessThan(dockerCli.Client().ClientVersion(), "1.25") {
  48. go func() {
  49. removeErr = dockerCli.Client().ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{RemoveVolumes: true})
  50. if removeErr != nil {
  51. logrus.Errorf("error removing container: %v", removeErr)
  52. cancel() // cancel the event Q
  53. }
  54. }()
  55. }
  56. }
  57. case "detach":
  58. exitCode = 0
  59. stopProcessing = true
  60. case "destroy":
  61. stopProcessing = true
  62. }
  63. return stopProcessing
  64. }
  65. go func() {
  66. defer func() {
  67. statusChan <- exitCode // must always send an exit code or the caller will block
  68. cancel()
  69. }()
  70. for {
  71. select {
  72. case <-eventCtx.Done():
  73. if removeErr != nil {
  74. return
  75. }
  76. case evt := <-eventq:
  77. if eventProcessor(evt) {
  78. return
  79. }
  80. case err := <-errq:
  81. logrus.Errorf("error getting events from daemon: %v", err)
  82. return
  83. }
  84. }
  85. }()
  86. return statusChan
  87. }
  88. // getExitCode performs an inspect on the container. It returns
  89. // the running state and the exit code.
  90. func getExitCode(ctx context.Context, dockerCli *command.DockerCli, containerID string) (bool, int, error) {
  91. c, err := dockerCli.Client().ContainerInspect(ctx, containerID)
  92. if err != nil {
  93. // If we can't connect, then the daemon probably died.
  94. if !clientapi.IsErrConnectionFailed(err) {
  95. return false, -1, err
  96. }
  97. return false, -1, nil
  98. }
  99. return c.State.Running, c.State.ExitCode, nil
  100. }
  101. func parallelOperation(ctx context.Context, containers []string, op func(ctx context.Context, container string) error) chan error {
  102. if len(containers) == 0 {
  103. return nil
  104. }
  105. const defaultParallel int = 50
  106. sem := make(chan struct{}, defaultParallel)
  107. errChan := make(chan error)
  108. // make sure result is printed in correct order
  109. output := map[string]chan error{}
  110. for _, c := range containers {
  111. output[c] = make(chan error, 1)
  112. }
  113. go func() {
  114. for _, c := range containers {
  115. err := <-output[c]
  116. errChan <- err
  117. }
  118. }()
  119. go func() {
  120. for _, c := range containers {
  121. sem <- struct{}{} // Wait for active queue sem to drain.
  122. go func(container string) {
  123. output[container] <- op(ctx, container)
  124. <-sem
  125. }(c)
  126. }
  127. }()
  128. return errChan
  129. }