Explorar o código

Engine: ensure all pipes are properly closed by Receiver and Sender

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)

[michael@docker.com: fix stdin closing in engine.Job.Run]
[michael@docker.com: fix fd leak in engine.Receiver.Run]
Docker-DCO-1.1-Signed-off-by: Michael Crosby <michael@docker.com> (github: crosbymichael)

Docker-Tested-By: Solomon Hykes <solomon@docker.com>
Docker-Tested-by: Michael Crosby <michael@docker.com>
Solomon Hykes %!s(int64=11) %!d(string=hai) anos
pai
achega
d61190169d
Modificáronse 3 ficheiros con 45 adicións e 7 borrados
  1. 3 0
      engine/job.go
  2. 14 7
      engine/remote.go
  3. 28 0
      engine/remote_test.go

+ 3 - 0
engine/job.go

@@ -72,6 +72,9 @@ func (job *Job) Run() error {
 	if err := job.Stderr.Close(); err != nil {
 	if err := job.Stderr.Close(); err != nil {
 		return err
 		return err
 	}
 	}
+	if err := job.Stdin.Close(); err != nil {
+		return err
+	}
 	if job.status != 0 {
 	if job.status != 0 {
 		return fmt.Errorf("%s", errorMessage)
 		return fmt.Errorf("%s", errorMessage)
 	}
 	}

+ 14 - 7
engine/remote.go

@@ -36,20 +36,27 @@ func (s *Sender) Handle(job *Job) Status {
 	r := beam.NewRouter(nil)
 	r := beam.NewRouter(nil)
 	r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
 	r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
 		tasks.Add(1)
 		tasks.Add(1)
-		io.Copy(job.Stdout, stdout)
-		tasks.Done()
+		go func() {
+			io.Copy(job.Stdout, stdout)
+			stdout.Close()
+			tasks.Done()
+		}()
 		return nil
 		return nil
 	})
 	})
 	r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
 	r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
 		tasks.Add(1)
 		tasks.Add(1)
-		io.Copy(job.Stderr, stderr)
-		tasks.Done()
+		go func() {
+			io.Copy(job.Stderr, stderr)
+			stderr.Close()
+			tasks.Done()
+		}()
 		return nil
 		return nil
 	})
 	})
 	r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error {
 	r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error {
-		tasks.Add(1)
-		io.Copy(stdin, job.Stdin)
-		tasks.Done()
+		go func() {
+			io.Copy(stdin, job.Stdin)
+			stdin.Close()
+		}()
 		return nil
 		return nil
 	})
 	})
 	var status int
 	var status int

+ 28 - 0
engine/remote_test.go

@@ -5,6 +5,7 @@ import (
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
 	"github.com/dotcloud/docker/pkg/beam"
 	"github.com/dotcloud/docker/pkg/beam"
+	"io"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -54,6 +55,33 @@ func TestHelloWorld(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestStdin(t *testing.T) {
+	testRemote(t,
+
+		func(eng *Engine) {
+			job := eng.Job("mirror")
+			job.Stdin.Add(strings.NewReader("hello world!\n"))
+			out := &bytes.Buffer{}
+			job.Stdout.Add(out)
+			if err := job.Run(); err != nil {
+				t.Fatal(err)
+			}
+			if out.String() != "hello world!\n" {
+				t.Fatalf("%#v", out.String())
+			}
+		},
+
+		func(eng *Engine) {
+			eng.Register("mirror", func(job *Job) Status {
+				if _, err := io.Copy(job.Stdout, job.Stdin); err != nil {
+					t.Fatal(err)
+				}
+				return StatusOK
+			})
+		},
+	)
+}
+
 // Helpers
 // Helpers
 
 
 func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {
 func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {