123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package container
- import (
- "strconv"
- "golang.org/x/net/context"
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/events"
- "github.com/docker/docker/api/types/filters"
- "github.com/docker/docker/api/types/versions"
- "github.com/docker/docker/cli/command"
- clientapi "github.com/docker/docker/client"
- )
- func waitExitOrRemoved(ctx context.Context, dockerCli *command.DockerCli, containerID string, waitRemove bool) chan int {
- if len(containerID) == 0 {
- // containerID can never be empty
- panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
- }
- var removeErr error
- statusChan := make(chan int)
- exitCode := 125
- // Get events via Events API
- f := filters.NewArgs()
- f.Add("type", "container")
- f.Add("container", containerID)
- options := types.EventsOptions{
- Filters: f,
- }
- eventCtx, cancel := context.WithCancel(ctx)
- eventq, errq := dockerCli.Client().Events(eventCtx, options)
- eventProcessor := func(e events.Message) bool {
- stopProcessing := false
- switch e.Status {
- case "die":
- if v, ok := e.Actor.Attributes["exitCode"]; ok {
- code, cerr := strconv.Atoi(v)
- if cerr != nil {
- logrus.Errorf("failed to convert exitcode '%q' to int: %v", v, cerr)
- } else {
- exitCode = code
- }
- }
- if !waitRemove {
- stopProcessing = true
- } else {
- // If we are talking to an older daemon, `AutoRemove` is not supported.
- // We need to fall back to the old behavior, which is client-side removal
- if versions.LessThan(dockerCli.Client().ClientVersion(), "1.25") {
- go func() {
- removeErr = dockerCli.Client().ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{RemoveVolumes: true})
- if removeErr != nil {
- logrus.Errorf("error removing container: %v", removeErr)
- cancel() // cancel the event Q
- }
- }()
- }
- }
- case "detach":
- exitCode = 0
- stopProcessing = true
- case "destroy":
- stopProcessing = true
- }
- return stopProcessing
- }
- go func() {
- defer func() {
- statusChan <- exitCode // must always send an exit code or the caller will block
- cancel()
- }()
- for {
- select {
- case <-eventCtx.Done():
- if removeErr != nil {
- return
- }
- case evt := <-eventq:
- if eventProcessor(evt) {
- return
- }
- case err := <-errq:
- logrus.Errorf("error getting events from daemon: %v", err)
- return
- }
- }
- }()
- return statusChan
- }
- // getExitCode performs an inspect on the container. It returns
- // the running state and the exit code.
- func getExitCode(ctx context.Context, dockerCli *command.DockerCli, containerID string) (bool, int, error) {
- c, err := dockerCli.Client().ContainerInspect(ctx, containerID)
- if err != nil {
- // If we can't connect, then the daemon probably died.
- if !clientapi.IsErrConnectionFailed(err) {
- return false, -1, err
- }
- return false, -1, nil
- }
- return c.State.Running, c.State.ExitCode, nil
- }
- func parallelOperation(ctx context.Context, containers []string, op func(ctx context.Context, container string) error) chan error {
- if len(containers) == 0 {
- return nil
- }
- const defaultParallel int = 50
- sem := make(chan struct{}, defaultParallel)
- errChan := make(chan error)
- // make sure result is printed in correct order
- output := map[string]chan error{}
- for _, c := range containers {
- output[c] = make(chan error, 1)
- }
- go func() {
- for _, c := range containers {
- err := <-output[c]
- errChan <- err
- }
- }()
- go func() {
- for _, c := range containers {
- sem <- struct{}{} // Wait for active queue sem to drain.
- go func(container string) {
- output[container] <- op(ctx, container)
- <-sem
- }(c)
- }
- }()
- return errChan
- }
|