Преглед на файлове

Merge pull request #24761 from WeiZhang555/parallel-stop

Enhancement: allow parallel stop, pause, unpause
Daniel Nephin преди 8 години
родител
ревизия
c2decbe5ee
променени са 4 файла, в които са добавени 42 реда и са изтрити 4 реда
  1. 2 1
      cli/command/container/pause.go
  2. 6 2
      cli/command/container/stop.go
  3. 2 1
      cli/command/container/unpause.go
  4. 32 0
      cli/command/container/utils.go

+ 2 - 1
cli/command/container/pause.go

@@ -34,8 +34,9 @@ func runPause(dockerCli *command.DockerCli, opts *pauseOptions) error {
 	ctx := context.Background()
 
 	var errs []string
+	errChan := parallelOperation(ctx, opts.containers, dockerCli.Client().ContainerPause)
 	for _, container := range opts.containers {
-		if err := dockerCli.Client().ContainerPause(ctx, container); err != nil {
+		if err := <-errChan; err != nil {
 			errs = append(errs, err.Error())
 		} else {
 			fmt.Fprintf(dockerCli.Out(), "%s\n", container)

+ 6 - 2
cli/command/container/stop.go

@@ -39,11 +39,15 @@ func NewStopCommand(dockerCli *command.DockerCli) *cobra.Command {
 
 func runStop(dockerCli *command.DockerCli, opts *stopOptions) error {
 	ctx := context.Background()
+	timeout := time.Duration(opts.time) * time.Second
 
 	var errs []string
+
+	errChan := parallelOperation(ctx, opts.containers, func(ctx context.Context, id string) error {
+		return dockerCli.Client().ContainerStop(ctx, id, &timeout)
+	})
 	for _, container := range opts.containers {
-		timeout := time.Duration(opts.time) * time.Second
-		if err := dockerCli.Client().ContainerStop(ctx, container, &timeout); err != nil {
+		if err := <-errChan; err != nil {
 			errs = append(errs, err.Error())
 		} else {
 			fmt.Fprintf(dockerCli.Out(), "%s\n", container)

+ 2 - 1
cli/command/container/unpause.go

@@ -35,8 +35,9 @@ func runUnpause(dockerCli *command.DockerCli, opts *unpauseOptions) error {
 	ctx := context.Background()
 
 	var errs []string
+	errChan := parallelOperation(ctx, opts.containers, dockerCli.Client().ContainerUnpause)
 	for _, container := range opts.containers {
-		if err := dockerCli.Client().ContainerUnpause(ctx, container); err != nil {
+		if err := <-errChan; err != nil {
 			errs = append(errs, err.Error())
 		} else {
 			fmt.Fprintf(dockerCli.Out(), "%s\n", container)

+ 32 - 0
cli/command/container/utils.go

@@ -90,3 +90,35 @@ func getExitCode(dockerCli *command.DockerCli, ctx context.Context, containerID
 	}
 	return c.State.Running, c.State.ExitCode, nil
 }
+
+func parallelOperation(ctx context.Context, cids []string, op func(ctx context.Context, id string) error) chan error {
+	if len(cids) == 0 {
+		return nil
+	}
+	const defaultParallel int = 50
+	sem := make(chan struct{}, defaultParallel)
+	errChan := make(chan error)
+
+	// make sure result is printed in correct order
+	output := map[string]chan error{}
+	for _, c := range cids {
+		output[c] = make(chan error, 1)
+	}
+	go func() {
+		for _, c := range cids {
+			err := <-output[c]
+			errChan <- err
+		}
+	}()
+
+	go func() {
+		for _, c := range cids {
+			sem <- struct{}{} // Wait for active queue sem to drain.
+			go func(container string) {
+				output[container] <- op(ctx, container)
+				<-sem
+			}(c)
+		}
+	}()
+	return errChan
+}