瀏覽代碼

Merge remote-tracking branch 'origin/328-i_o_error_uncloced_connection-fix'

Solomon Hykes 12 年之前
父節點
當前提交
a7f191d51d
共有 3 個文件被更改,包括 99 次插入11 次删除
  1. 15 5
      commands.go
  2. 52 0
      commands_test.go
  3. 32 6
      container.go

+ 15 - 5
commands.go

@@ -799,7 +799,8 @@ func (srv *Server) CmdAttach(stdin io.ReadCloser, stdout io.Writer, args ...stri
 	if container == nil {
 		return fmt.Errorf("No such container: %s", name)
 	}
-	return <-container.Attach(stdin, stdout, stdout)
+
+	return <-container.Attach(stdin, nil, stdout, stdout)
 }
 
 // Ports type - Used to parse multiple -p flags
@@ -901,11 +902,17 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string)
 		}
 	}
 	var (
-		cStdin           io.Reader
+		cStdin           io.ReadCloser
 		cStdout, cStderr io.Writer
 	)
 	if config.AttachStdin {
-		cStdin = stdin
+		r, w := io.Pipe()
+		go func() {
+			defer w.Close()
+			defer Debugf("Closing buffered stdin pipe")
+			io.Copy(w, stdin)
+		}()
+		cStdin = r
 	}
 	if config.AttachStdout {
 		cStdout = stdout
@@ -913,7 +920,8 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string)
 	if config.AttachStderr {
 		cStderr = stdout // FIXME: rcli can't differentiate stdout from stderr
 	}
-	attachErr := container.Attach(cStdin, cStdout, cStderr)
+
+	attachErr := container.Attach(cStdin, stdin, cStdout, cStderr)
 	Debugf("Starting\n")
 	if err := container.Start(); err != nil {
 		return err
@@ -922,7 +930,9 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string)
 		fmt.Fprintln(stdout, container.ShortId())
 	}
 	Debugf("Waiting for attach to return\n")
-	return <-attachErr
+	<-attachErr
+	// Expecting I/O pipe error, discarding
+	return nil
 }
 
 func NewServer() (*Server, error) {

+ 52 - 0
commands_test.go

@@ -80,6 +80,57 @@ func TestRunHostname(t *testing.T) {
 	}
 }
 
+func TestRunExit(t *testing.T) {
+	runtime, err := newTestRuntime()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer nuke(runtime)
+
+	srv := &Server{runtime: runtime}
+
+	stdin, stdinPipe := io.Pipe()
+	stdout, stdoutPipe := io.Pipe()
+	c1 := make(chan struct{})
+	go func() {
+		srv.CmdRun(stdin, stdoutPipe, "-i", GetTestImage(runtime).Id, "/bin/cat")
+		close(c1)
+	}()
+
+	setTimeout(t, "Read/Write assertion timed out", 2*time.Second, func() {
+		if err := assertPipe("hello\n", "hello", stdout, stdinPipe, 15); err != nil {
+			t.Fatal(err)
+		}
+	})
+
+	container := runtime.List()[0]
+
+	// Closing /bin/cat stdin, expect it to exit
+	p, err := container.StdinPipe()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err := p.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	// as the process exited, CmdRun must finish and unblock. Wait for it
+	setTimeout(t, "Waiting for CmdRun timed out", 2*time.Second, func() {
+		<-c1
+	})
+
+	// Make sure that the client has been disconnected
+	setTimeout(t, "The client should have been disconnected once the remote process exited.", 2*time.Second, func() {
+		// Expecting pipe i/o error, just check that read does not block
+		stdin.Read([]byte{})
+	})
+
+	// Cleanup pipes
+	if err := closeWrap(stdin, stdinPipe, stdout, stdoutPipe); err != nil {
+		t.Fatal(err)
+	}
+}
+
 // Expected behaviour: the process dies when the client disconnects
 func TestRunDisconnect(t *testing.T) {
 	runtime, err := newTestRuntime()
@@ -237,6 +288,7 @@ func TestAttachDisconnect(t *testing.T) {
 	setTimeout(t, "Waiting for CmdAttach timed out", 2*time.Second, func() {
 		<-c1
 	})
+
 	// We closed stdin, expect /bin/cat to still be running
 	// Wait a little bit to make sure container.monitor() did his thing
 	err = container.WaitTimeout(500 * time.Millisecond)

+ 32 - 6
container.go

@@ -257,9 +257,9 @@ func (container *Container) start() error {
 	return container.cmd.Start()
 }
 
-func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.Writer) chan error {
-	var cStdout io.ReadCloser
-	var cStderr io.ReadCloser
+func (container *Container) Attach(stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error {
+	var cStdout, cStderr io.ReadCloser
+
 	var nJobs int
 	errors := make(chan error, 3)
 	if stdin != nil && container.Config.OpenStdin {
@@ -269,15 +269,23 @@ func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.
 		} else {
 			go func() {
 				Debugf("[start] attach stdin\n")
-				defer Debugf("[end]  attach stdin\n")
+				defer Debugf("[end] attach stdin\n")
+				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
+				if cStdout != nil {
+					defer cStdout.Close()
+				}
+				if cStderr != nil {
+					defer cStderr.Close()
+				}
 				if container.Config.StdinOnce {
 					defer cStdin.Close()
 				}
 				_, err := io.Copy(cStdin, stdin)
 				if err != nil {
-					Debugf("[error] attach stdout: %s\n", err)
+					Debugf("[error] attach stdin: %s\n", err)
 				}
-				errors <- err
+				// Discard error, expecting pipe error
+				errors <- nil
 			}()
 		}
 	}
@@ -290,6 +298,15 @@ func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.
 			go func() {
 				Debugf("[start] attach stdout\n")
 				defer Debugf("[end]  attach stdout\n")
+				// If we are in StdinOnce mode, then close stdin
+				if container.Config.StdinOnce {
+					if stdin != nil {
+						defer stdin.Close()
+					}
+					if stdinCloser != nil {
+						defer stdinCloser.Close()
+					}
+				}
 				_, err := io.Copy(stdout, cStdout)
 				if err != nil {
 					Debugf("[error] attach stdout: %s\n", err)
@@ -307,6 +324,15 @@ func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.
 			go func() {
 				Debugf("[start] attach stderr\n")
 				defer Debugf("[end]  attach stderr\n")
+				// If we are in StdinOnce mode, then close stdin
+				if container.Config.StdinOnce {
+					if stdin != nil {
+						defer stdin.Close()
+					}
+					if stdinCloser != nil {
+						defer stdinCloser.Close()
+					}
+				}
 				_, err := io.Copy(stderr, cStderr)
 				if err != nil {
 					Debugf("[error] attach stderr: %s\n", err)