Преглед изворни кода

Refactored CmdRun and CmdAttach to use Container.Attach

Solomon Hykes пре 12 година
родитељ
комит
c808940c04
2 измењених фајлова са 98 додато и 93 уклоњено
  1. 17 93
      commands.go
  2. 81 0
      container.go

+ 17 - 93
commands.go

@@ -13,7 +13,6 @@ import (
 	"runtime"
 	"runtime"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
-	"sync"
 	"text/tabwriter"
 	"text/tabwriter"
 	"time"
 	"time"
 	"unicode"
 	"unicode"
@@ -533,6 +532,7 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string
 		return nil
 		return nil
 	}
 	}
 
 
+	// FIXME: CmdPull should be a wrapper around Runtime.Pull()
 	if srv.runtime.graph.LookupRemoteImage(remote, srv.runtime.authConfig) {
 	if srv.runtime.graph.LookupRemoteImage(remote, srv.runtime.authConfig) {
 		if err := srv.runtime.graph.PullImage(stdout, remote, srv.runtime.authConfig); err != nil {
 		if err := srv.runtime.graph.PullImage(stdout, remote, srv.runtime.authConfig); err != nil {
 			return err
 			return err
@@ -796,56 +796,7 @@ func (srv *Server) CmdAttach(stdin io.ReadCloser, stdout io.Writer, args ...stri
 	if container == nil {
 	if container == nil {
 		return fmt.Errorf("No such container: %s", name)
 		return fmt.Errorf("No such container: %s", name)
 	}
 	}
-
-	cStdout, err := container.StdoutPipe()
-	if err != nil {
-		return err
-	}
-	cStderr, err := container.StderrPipe()
-	if err != nil {
-		return err
-	}
-
-	var wg sync.WaitGroup
-	if container.Config.OpenStdin {
-		cStdin, err := container.StdinPipe()
-		if err != nil {
-			return err
-		}
-		wg.Add(1)
-		go func() {
-			Debugf("Begin stdin pipe [attach]")
-			io.Copy(cStdin, stdin)
-
-			// When stdin get closed, it means the client has been detached
-			// Make sure all pipes are closed.
-			if err := cStdout.Close(); err != nil {
-				Debugf("Error closing stdin pipe: %s", err)
-			}
-			if err := cStderr.Close(); err != nil {
-				Debugf("Error closing stderr pipe: %s", err)
-			}
-
-			wg.Add(-1)
-			Debugf("End of stdin pipe [attach]")
-		}()
-	}
-	wg.Add(1)
-	go func() {
-		Debugf("Begin stdout pipe [attach]")
-		io.Copy(stdout, cStdout)
-		wg.Add(-1)
-		Debugf("End of stdout pipe [attach]")
-	}()
-	wg.Add(1)
-	go func() {
-		Debugf("Begin stderr pipe [attach]")
-		io.Copy(stdout, cStderr)
-		wg.Add(-1)
-		Debugf("End of stderr pipe [attach]")
-	}()
-	wg.Wait()
-	return nil
+	return <-container.Attach(stdin, stdout, stdout)
 }
 }
 
 
 // Ports type - Used to parse multiple -p flags
 // Ports type - Used to parse multiple -p flags
@@ -919,55 +870,28 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string)
 			return err
 			return err
 		}
 		}
 	}
 	}
-	if config.OpenStdin {
-		cmdStdin, err := container.StdinPipe()
-		if err != nil {
-			return err
-		}
-		if !config.Detach {
-			Go(func() error {
-				_, err := io.Copy(cmdStdin, stdin)
-				cmdStdin.Close()
-				return err
-			})
-		}
-	}
 	// Run the container
 	// Run the container
 	if !config.Detach {
 	if !config.Detach {
-		cmdStderr, err := container.StderrPipe()
-		if err != nil {
-			return err
-		}
-		cmdStdout, err := container.StdoutPipe()
-		if err != nil {
-			return err
-		}
-		if err := container.Start(); err != nil {
-			return err
-		}
-		sendingStdout := Go(func() error {
-			_, err := io.Copy(stdout, cmdStdout)
-			return err
-		})
-		sendingStderr := Go(func() error {
-			_, err := io.Copy(stdout, cmdStderr)
-			return err
-		})
-		errSendingStdout := <-sendingStdout
-		errSendingStderr := <-sendingStderr
-		if errSendingStdout != nil {
-			return errSendingStdout
-		}
-		if errSendingStderr != nil {
-			return errSendingStderr
+		var attachErr chan error
+		if config.OpenStdin {
+			config.StdinOnce = true
+			Debugf("Attaching with stdin\n")
+			attachErr = container.Attach(stdin, stdout, stdout)
+		} else {
+			Debugf("Attaching without stdin\n")
+			attachErr = container.Attach(nil, stdout, nil)
 		}
 		}
-		container.Wait()
-	} else {
+		Debugf("Starting\n")
 		if err := container.Start(); err != nil {
 		if err := container.Start(); err != nil {
 			return err
 			return err
 		}
 		}
-		fmt.Fprintln(stdout, container.ShortId())
+		Debugf("Waiting for attach to return\n")
+		return <-attachErr
+	}
+	if err := container.Start(); err != nil {
+		return err
 	}
 	}
+	fmt.Fprintln(stdout, container.ShortId())
 	return nil
 	return nil
 }
 }
 
 

+ 81 - 0
container.go

@@ -56,6 +56,7 @@ type Config struct {
 	Ports      []int
 	Ports      []int
 	Tty        bool // Attach standard streams to a tty, including stdin if it is not closed.
 	Tty        bool // Attach standard streams to a tty, including stdin if it is not closed.
 	OpenStdin  bool // Open stdin
 	OpenStdin  bool // Open stdin
+	StdinOnce  bool // If true, close stdin after the 1 attached client disconnects.
 	Env        []string
 	Env        []string
 	Cmd        []string
 	Cmd        []string
 	Image      string // Name of the image as it was passed by the operator (eg. could be symbolic)
 	Image      string // Name of the image as it was passed by the operator (eg. could be symbolic)
@@ -229,6 +230,86 @@ func (container *Container) start() error {
 	return container.cmd.Start()
 	return container.cmd.Start()
 }
 }
 
 
+func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.Writer) chan error {
+	var cStdout io.ReadCloser
+	var cStderr io.ReadCloser
+	var nJobs int
+	errors := make(chan error, 3)
+	if stdin != nil && container.Config.OpenStdin {
+		nJobs += 1
+		if cStdin, err := container.StdinPipe(); err != nil {
+			errors <- err
+		} else {
+			go func() {
+				Debugf("[start] attach stdin\n")
+				defer Debugf("[end]  attach stdin\n")
+				if container.Config.StdinOnce {
+					defer cStdin.Close()
+				}
+				_, err := io.Copy(cStdin, stdin)
+				if err != nil {
+					Debugf("[error] attach stdout: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	if stdout != nil {
+		nJobs += 1
+		if p, err := container.StdoutPipe(); err != nil {
+			errors <- err
+		} else {
+			cStdout = p
+			go func() {
+				Debugf("[start] attach stdout\n")
+				defer Debugf("[end]  attach stdout\n")
+				_, err := io.Copy(stdout, cStdout)
+				if err != nil {
+					Debugf("[error] attach stdout: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	if stderr != nil {
+		nJobs += 1
+		if p, err := container.StderrPipe(); err != nil {
+			errors <- err
+		} else {
+			cStderr = p
+			go func() {
+				Debugf("[start] attach stderr\n")
+				defer Debugf("[end]  attach stderr\n")
+				_, err := io.Copy(stderr, cStderr)
+				if err != nil {
+					Debugf("[error] attach stderr: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	return Go(func() error {
+		if cStdout != nil {
+			defer cStdout.Close()
+		}
+		if cStderr != nil {
+			defer cStderr.Close()
+		}
+		// FIXME: how do clean up the stdin goroutine without the unwanted side effect
+		// of closing the passed stdin? Add an intermediary io.Pipe?
+		for i := 0; i < nJobs; i += 1 {
+			Debugf("Waiting for job %d/%d\n", i+1, nJobs)
+			if err := <-errors; err != nil {
+				Debugf("Job %d returned error %s. Aborting all jobs\n", i+1, err)
+				return err
+			}
+			Debugf("Job %d completed successfully\n", i+1)
+		}
+		Debugf("All jobs completed successfully\n")
+		return nil
+	})
+}
+
 func (container *Container) Start() error {
 func (container *Container) Start() error {
 	if container.State.Running {
 	if container.State.Running {
 		return fmt.Errorf("The container %s is already running.", container.Id)
 		return fmt.Errorf("The container %s is already running.", container.Id)