Browse Source

Merged attachStdin

Solomon Hykes 12 years ago
parent
commit
15c3096e89
4 changed files with 253 additions and 129 deletions
  1. 49 98
      commands.go
  2. 69 10
      commands_test.go
  3. 125 21
      container.go
  4. 10 0
      runtime_test.go

+ 49 - 98
commands.go

@@ -13,7 +13,6 @@ import (
 	"runtime"
 	"strconv"
 	"strings"
-	"sync"
 	"text/tabwriter"
 	"time"
 	"unicode"
@@ -536,6 +535,7 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string
 		return nil
 	}
 
+	// FIXME: CmdPull should be a wrapper around Runtime.Pull()
 	if srv.runtime.graph.LookupRemoteImage(remote, srv.runtime.authConfig) {
 		if err := srv.runtime.graph.PullImage(stdout, remote, srv.runtime.authConfig); err != nil {
 			return err
@@ -799,56 +799,7 @@ func (srv *Server) CmdAttach(stdin io.ReadCloser, stdout io.Writer, args ...stri
 	if container == nil {
 		return fmt.Errorf("No such container: %s", name)
 	}
-
-	cStdout, err := container.StdoutPipe()
-	if err != nil {
-		return err
-	}
-	cStderr, err := container.StderrPipe()
-	if err != nil {
-		return err
-	}
-
-	var wg sync.WaitGroup
-	if container.Config.OpenStdin {
-		cStdin, err := container.StdinPipe()
-		if err != nil {
-			return err
-		}
-		wg.Add(1)
-		go func() {
-			Debugf("Begin stdin pipe [attach]")
-			io.Copy(cStdin, stdin)
-
-			// When stdin get closed, it means the client has been detached
-			// Make sure all pipes are closed.
-			if err := cStdout.Close(); err != nil {
-				Debugf("Error closing stdin pipe: %s", err)
-			}
-			if err := cStderr.Close(); err != nil {
-				Debugf("Error closing stderr pipe: %s", err)
-			}
-
-			wg.Add(-1)
-			Debugf("End of stdin pipe [attach]")
-		}()
-	}
-	wg.Add(1)
-	go func() {
-		Debugf("Begin stdout pipe [attach]")
-		io.Copy(stdout, cStdout)
-		wg.Add(-1)
-		Debugf("End of stdout pipe [attach]")
-	}()
-	wg.Add(1)
-	go func() {
-		Debugf("Begin stderr pipe [attach]")
-		io.Copy(stdout, cStderr)
-		wg.Add(-1)
-		Debugf("End of stderr pipe [attach]")
-	}()
-	wg.Wait()
-	return nil
+	return <-container.Attach(stdin, stdout, stdout)
 }
 
 // Ports type - Used to parse multiple -p flags
@@ -879,6 +830,33 @@ func (opts *ListOpts) Set(value string) error {
 	return nil
 }
 
+// AttachOpts stores arguments to 'docker run -a', eg. which streams to attach to
+type AttachOpts map[string]bool
+
+func NewAttachOpts() *AttachOpts {
+	opts := make(map[string]bool)
+	return (*AttachOpts)(&opts)
+}
+
+func (opts *AttachOpts) String() string {
+	return fmt.Sprint(*opts)
+}
+
+func (opts *AttachOpts) Set(val string) error {
+	if val != "stdin" && val != "stdout" && val != "stderr" {
+		return fmt.Errorf("Unsupported stream name: %s", val)
+	}
+	(*opts)[val] = true
+	return nil
+}
+
+func (opts *AttachOpts) Get(val string) bool {
+	if res, exists := (*opts)[val]; exists {
+		return res
+	}
+	return false
+}
+
 func (srv *Server) CmdTag(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
 	cmd := rcli.Subcmd(stdout, "tag", "[OPTIONS] IMAGE REPOSITORY [TAG]", "Tag an image into a repository")
 	force := cmd.Bool("f", false, "Force")
@@ -922,56 +900,29 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string)
 			return err
 		}
 	}
-	if config.OpenStdin {
-		cmdStdin, err := container.StdinPipe()
-		if err != nil {
-			return err
-		}
-		if !config.Detach {
-			Go(func() error {
-				_, err := io.Copy(cmdStdin, stdin)
-				cmdStdin.Close()
-				return err
-			})
-		}
+	var (
+		cStdin           io.Reader
+		cStdout, cStderr io.Writer
+	)
+	if config.AttachStdin {
+		cStdin = stdin
 	}
-	// Run the container
-	if !config.Detach {
-		cmdStderr, err := container.StderrPipe()
-		if err != nil {
-			return err
-		}
-		cmdStdout, err := container.StdoutPipe()
-		if err != nil {
-			return err
-		}
-		if err := container.Start(); err != nil {
-			return err
-		}
-		sendingStdout := Go(func() error {
-			_, err := io.Copy(stdout, cmdStdout)
-			return err
-		})
-		sendingStderr := Go(func() error {
-			_, err := io.Copy(stdout, cmdStderr)
-			return err
-		})
-		errSendingStdout := <-sendingStdout
-		errSendingStderr := <-sendingStderr
-		if errSendingStdout != nil {
-			return errSendingStdout
-		}
-		if errSendingStderr != nil {
-			return errSendingStderr
-		}
-		container.Wait()
-	} else {
-		if err := container.Start(); err != nil {
-			return err
-		}
+	if config.AttachStdout {
+		cStdout = stdout
+	}
+	if config.AttachStderr {
+		cStderr = stdout // FIXME: rcli can't differentiate stdout from stderr
+	}
+	attachErr := container.Attach(cStdin, cStdout, cStderr)
+	Debugf("Starting\n")
+	if err := container.Start(); err != nil {
+		return err
+	}
+	if cStdout == nil && cStderr == nil {
 		fmt.Fprintln(stdout, container.ShortId())
 	}
-	return nil
+	Debugf("Waiting for attach to return\n")
+	return <-attachErr
 }
 
 func NewServer() (*Server, error) {

+ 69 - 10
commands_test.go

@@ -94,9 +94,9 @@ func TestRunDisconnect(t *testing.T) {
 	stdout, stdoutPipe := io.Pipe()
 	c1 := make(chan struct{})
 	go func() {
-		if err := srv.CmdRun(stdin, stdoutPipe, "-i", GetTestImage(runtime).Id, "/bin/cat"); err != nil {
-			t.Fatal(err)
-		}
+		// We're simulating a disconnect so the return value doesn't matter. What matters is the
+		// fact that CmdRun returns.
+		srv.CmdRun(stdin, stdoutPipe, "-i", GetTestImage(runtime).Id, "/bin/cat")
 		close(c1)
 	}()
 
@@ -117,10 +117,69 @@ func TestRunDisconnect(t *testing.T) {
 		<-c1
 	})
 
-	// Check the status of the container
-	container := runtime.containers.Back().Value.(*Container)
-	if container.State.Running {
-		t.Fatalf("/bin/cat is still running after closing stdin")
+	// Client disconnect after run -i should cause stdin to be closed, which should
+	// cause /bin/cat to exit.
+	setTimeout(t, "Waiting for /bin/cat to exit timed out", 2*time.Second, func() {
+		container := runtime.List()[0]
+		container.Wait()
+		if container.State.Running {
+			t.Fatalf("/bin/cat is still running after closing stdin")
+		}
+	})
+}
+
+// TestAttachStdin checks attaching to stdin without stdout and stderr.
+// 'docker run -i -a stdin' should sends the client's stdin to the command,
+// then detach from it and print the container id.
+func TestAttachStdin(t *testing.T) {
+	runtime, err := newTestRuntime()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer nuke(runtime)
+	srv := &Server{runtime: runtime}
+
+	stdinR, stdinW := io.Pipe()
+	var stdout bytes.Buffer
+
+	ch := make(chan struct{})
+	go func() {
+		srv.CmdRun(stdinR, &stdout, "-i", "-a", "stdin", GetTestImage(runtime).Id, "sh", "-c", "echo hello; cat")
+		close(ch)
+	}()
+
+	// Send input to the command, close stdin, wait for CmdRun to return
+	setTimeout(t, "Read/Write timed out", 2*time.Second, func() {
+		if _, err := stdinW.Write([]byte("hi there\n")); err != nil {
+			t.Fatal(err)
+		}
+		stdinW.Close()
+		<-ch
+	})
+
+	// Check output
+	cmdOutput := string(stdout.Bytes())
+	container := runtime.List()[0]
+	if cmdOutput != container.ShortId()+"\n" {
+		t.Fatalf("Wrong output: should be '%s', not '%s'\n", container.ShortId()+"\n", cmdOutput)
+	}
+
+	setTimeout(t, "Waiting for command to exit timed out", 2*time.Second, func() {
+		container.Wait()
+	})
+
+	// Check logs
+	if cmdLogs, err := container.ReadLog("stdout"); err != nil {
+		t.Fatal(err)
+	} else {
+		if output, err := ioutil.ReadAll(cmdLogs); err != nil {
+			t.Fatal(err)
+		} else {
+			expectedLog := "hello\nhi there\n"
+			if string(output) != expectedLog {
+				t.Fatalf("Unexpected logs: should be '%s', not '%s'\n", expectedLog, output)
+			}
+		}
 	}
 }
 
@@ -158,9 +217,9 @@ func TestAttachDisconnect(t *testing.T) {
 	// Attach to it
 	c1 := make(chan struct{})
 	go func() {
-		if err := srv.CmdAttach(stdin, stdoutPipe, container.Id); err != nil {
-			t.Fatal(err)
-		}
+		// We're simulating a disconnect so the return value doesn't matter. What matters is the
+		// fact that CmdAttach returns.
+		srv.CmdAttach(stdin, stdoutPipe, container.Id)
 		close(c1)
 	}()
 

+ 125 - 21
container.go

@@ -48,17 +48,20 @@ type Container struct {
 }
 
 type Config struct {
-	Hostname   string
-	User       string
-	Memory     int64 // Memory limit (in bytes)
-	MemorySwap int64 // Total memory usage (memory + swap); set `-1' to disable swap
-	Detach     bool
-	Ports      []int
-	Tty        bool // Attach standard streams to a tty, including stdin if it is not closed.
-	OpenStdin  bool // Open stdin
-	Env        []string
-	Cmd        []string
-	Image      string // Name of the image as it was passed by the operator (eg. could be symbolic)
+	Hostname     string
+	User         string
+	Memory       int64 // Memory limit (in bytes)
+	MemorySwap   int64 // Total memory usage (memory + swap); set `-1' to disable swap
+	AttachStdin  bool
+	AttachStdout bool
+	AttachStderr bool
+	Ports        []int
+	Tty          bool // Attach standard streams to a tty, including stdin if it is not closed.
+	OpenStdin    bool // Open stdin
+	StdinOnce    bool // If true, close stdin after the 1 attached client disconnects.
+	Env          []string
+	Cmd          []string
+	Image        string // Name of the image as it was passed by the operator (eg. could be symbolic)
 }
 
 func ParseRun(args []string, stdout io.Writer) (*Config, error) {
@@ -70,6 +73,8 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) {
 	flHostname := cmd.String("h", "", "Container host name")
 	flUser := cmd.String("u", "", "Username or UID")
 	flDetach := cmd.Bool("d", false, "Detached mode: leave the container running in the background")
+	flAttach := NewAttachOpts()
+	cmd.Var(flAttach, "a", "Attach to stdin, stdout or stderr.")
 	flStdin := cmd.Bool("i", false, "Keep stdin open even if not attached")
 	flTty := cmd.Bool("t", false, "Allocate a pseudo-tty")
 	flMemory := cmd.Int64("m", 0, "Memory limit (in bytes)")
@@ -83,6 +88,19 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) {
 	if err := cmd.Parse(args); err != nil {
 		return nil, err
 	}
+	if *flDetach && len(*flAttach) > 0 {
+		return nil, fmt.Errorf("Conflicting options: -a and -d")
+	}
+	// If neither -d or -a are set, attach to everything by default
+	if len(*flAttach) == 0 && !*flDetach {
+		if !*flDetach {
+			flAttach.Set("stdout")
+			flAttach.Set("stderr")
+			if *flStdin {
+				flAttach.Set("stdin")
+			}
+		}
+	}
 	parsedArgs := cmd.Args()
 	runCmd := []string{}
 	image := ""
@@ -93,16 +111,22 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) {
 		runCmd = parsedArgs[1:]
 	}
 	config := &Config{
-		Hostname:  *flHostname,
-		Ports:     flPorts,
-		User:      *flUser,
-		Tty:       *flTty,
-		OpenStdin: *flStdin,
-		Memory:    *flMemory,
-		Detach:    *flDetach,
-		Env:       flEnv,
-		Cmd:       runCmd,
-		Image:     image,
+		Hostname:     *flHostname,
+		Ports:        flPorts,
+		User:         *flUser,
+		Tty:          *flTty,
+		OpenStdin:    *flStdin,
+		Memory:       *flMemory,
+		AttachStdin:  flAttach.Get("stdin"),
+		AttachStdout: flAttach.Get("stdout"),
+		AttachStderr: flAttach.Get("stderr"),
+		Env:          flEnv,
+		Cmd:          runCmd,
+		Image:        image,
+	}
+	// When allocating stdin in attached mode, close stdin at client disconnect
+	if config.OpenStdin && config.AttachStdin {
+		config.StdinOnce = true
 	}
 	return config, nil
 }
@@ -233,6 +257,86 @@ func (container *Container) start() error {
 	return container.cmd.Start()
 }
 
+func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.Writer) chan error {
+	var cStdout io.ReadCloser
+	var cStderr io.ReadCloser
+	var nJobs int
+	errors := make(chan error, 3)
+	if stdin != nil && container.Config.OpenStdin {
+		nJobs += 1
+		if cStdin, err := container.StdinPipe(); err != nil {
+			errors <- err
+		} else {
+			go func() {
+				Debugf("[start] attach stdin\n")
+				defer Debugf("[end]  attach stdin\n")
+				if container.Config.StdinOnce {
+					defer cStdin.Close()
+				}
+				_, err := io.Copy(cStdin, stdin)
+				if err != nil {
+					Debugf("[error] attach stdout: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	if stdout != nil {
+		nJobs += 1
+		if p, err := container.StdoutPipe(); err != nil {
+			errors <- err
+		} else {
+			cStdout = p
+			go func() {
+				Debugf("[start] attach stdout\n")
+				defer Debugf("[end]  attach stdout\n")
+				_, err := io.Copy(stdout, cStdout)
+				if err != nil {
+					Debugf("[error] attach stdout: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	if stderr != nil {
+		nJobs += 1
+		if p, err := container.StderrPipe(); err != nil {
+			errors <- err
+		} else {
+			cStderr = p
+			go func() {
+				Debugf("[start] attach stderr\n")
+				defer Debugf("[end]  attach stderr\n")
+				_, err := io.Copy(stderr, cStderr)
+				if err != nil {
+					Debugf("[error] attach stderr: %s\n", err)
+				}
+				errors <- err
+			}()
+		}
+	}
+	return Go(func() error {
+		if cStdout != nil {
+			defer cStdout.Close()
+		}
+		if cStderr != nil {
+			defer cStderr.Close()
+		}
+		// FIXME: how do clean up the stdin goroutine without the unwanted side effect
+		// of closing the passed stdin? Add an intermediary io.Pipe?
+		for i := 0; i < nJobs; i += 1 {
+			Debugf("Waiting for job %d/%d\n", i+1, nJobs)
+			if err := <-errors; err != nil {
+				Debugf("Job %d returned error %s. Aborting all jobs\n", i+1, err)
+				return err
+			}
+			Debugf("Job %d completed successfully\n", i+1)
+		}
+		Debugf("All jobs completed successfully\n")
+		return nil
+	})
+}
+
 func (container *Container) Start() error {
 	if container.State.Running {
 		return fmt.Errorf("The container %s is already running.", container.Id)

+ 10 - 0
runtime_test.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"os/exec"
 	"os/user"
+	"sync"
 	"testing"
 )
 
@@ -17,6 +18,15 @@ var unitTestStoreBase string
 var srv *Server
 
 func nuke(runtime *Runtime) error {
+	var wg sync.WaitGroup
+	for _, container := range runtime.List() {
+		wg.Add(1)
+		go func() {
+			container.Kill()
+			wg.Add(-1)
+		}()
+	}
+	wg.Wait()
 	return os.RemoveAll(runtime.root)
 }