Browse Source

Merge pull request #5687 from shykes/pr_out_engine_fix_a_timeout_bug_in_sender_receiver

Solomon Hykes 11 năm trước cách đây
mục cha
commit
a7e61a21c0

+ 5 - 3
engine/remote.go

@@ -90,19 +90,21 @@ func (rcv *Receiver) Run() error {
 			f.Close()
 			return err
 		}
+		f.Close()
+		defer peer.Close()
 		cmd := data.Message(p).Get("cmd")
 		job := rcv.Engine.Job(cmd[0], cmd[1:]...)
-		stdout, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
+		stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
 		if err != nil {
 			return err
 		}
 		job.Stdout.Add(stdout)
-		stderr, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes())
+		stderr, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes())
 		if err != nil {
 			return err
 		}
 		job.Stderr.Add(stderr)
-		stdin, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes())
+		stdin, err := beam.SendWPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes())
 		if err != nil {
 			return err
 		}

+ 92 - 1
engine/remote_test.go

@@ -1,3 +1,94 @@
 package engine
 
-import ()
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"github.com/dotcloud/docker/pkg/beam"
+	"strings"
+	"testing"
+	"time"
+)
+
+func TestHelloWorld(t *testing.T) {
+	for i := 0; i < 10; i++ {
+		testRemote(t,
+
+			// Sender side
+			func(eng *Engine) {
+				job := eng.Job("echo", "hello", "world")
+				out := &bytes.Buffer{}
+				job.Stdout.Add(out)
+				job.Run()
+				if job.status != StatusOK {
+					t.Fatalf("#%v", job.StatusCode())
+				}
+				lines := bufio.NewScanner(out)
+				var i int
+				for lines.Scan() {
+					if lines.Text() != "hello world" {
+						t.Fatalf("%#v", lines.Text())
+					}
+					i++
+				}
+				if i != 1000 {
+					t.Fatalf("%#v", i)
+				}
+			},
+
+			// Receiver side
+			func(eng *Engine) {
+				eng.Register("echo", func(job *Job) Status {
+					// Simulate more output with a delay in the middle
+					for i := 0; i < 500; i++ {
+						fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " "))
+					}
+					time.Sleep(5 * time.Millisecond)
+					for i := 0; i < 500; i++ {
+						fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " "))
+					}
+					return StatusOK
+				})
+			},
+		)
+	}
+}
+
+// Helpers
+
+func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {
+	sndConn, rcvConn, err := beam.USocketPair()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer sndConn.Close()
+	defer rcvConn.Close()
+	sender := NewSender(sndConn)
+	receiver := NewReceiver(rcvConn)
+
+	// Setup the sender side
+	eng := New()
+	sender.Install(eng)
+
+	// Setup the receiver side
+	receiverSide(receiver.Engine)
+	go receiver.Run()
+
+	timeout(t, func() {
+		senderSide(eng)
+	})
+}
+
+func timeout(t *testing.T, f func()) {
+	onTimeout := time.After(100 * time.Millisecond)
+	onDone := make(chan bool)
+	go func() {
+		f()
+		close(onDone)
+	}()
+	select {
+	case <-onTimeout:
+		t.Fatalf("timeout")
+	case <-onDone:
+	}
+}

+ 36 - 5
pkg/beam/beam.go

@@ -29,17 +29,48 @@ type ReceiveSender interface {
 	Sender
 }
 
-func SendPipe(dst Sender, data []byte) (*os.File, error) {
+const (
+	R int = 1 << (32 - 1 - iota)
+	W
+)
+
+func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
 	r, w, err := os.Pipe()
 	if err != nil {
 		return nil, err
 	}
-	if err := dst.Send(data, r); err != nil {
-		r.Close()
-		w.Close()
+	var (
+		remote *os.File
+		local  *os.File
+	)
+	if mode == R {
+		remote = r
+		local = w
+	} else if mode == W {
+		remote = w
+		local = r
+	}
+	if err := dst.Send(data, remote); err != nil {
+		local.Close()
+		remote.Close()
 		return nil, err
 	}
-	return w, nil
+	return local, nil
+
+}
+
+// SendRPipe create a pipe and sends its *read* end attached in a beam message
+// to `dst`, with `data` as the message payload.
+// It returns the *write* end of the pipe, or an error.
+func SendRPipe(dst Sender, data []byte) (*os.File, error) {
+	return sendPipe(dst, data, R)
+}
+
+// SendWPipe create a pipe and sends its *read* end attached in a beam message
+// to `dst`, with `data` as the message payload.
+// It returns the *write* end of the pipe, or an error.
+func SendWPipe(dst Sender, data []byte) (*os.File, error) {
+	return sendPipe(dst, data, W)
 }
 
 func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {

+ 2 - 2
pkg/beam/examples/beamsh/beamsh.go

@@ -257,12 +257,12 @@ func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
 				if handler == nil {
 					return
 				}
-				stdout, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
+				stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
 				if err != nil {
 					return
 				}
 				defer stdout.Close()
-				stderr, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
+				stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
 				if err != nil {
 					return
 				}

+ 1 - 1
pkg/beam/examples/beamsh/builtins.go

@@ -272,7 +272,7 @@ func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out bea
 		}
 		// Skip commands
 		if a != nil && data.Message(payload).Get("cmd") == nil {
-			dup, err := beam.SendPipe(out, payload)
+			dup, err := beam.SendRPipe(out, payload)
 			if err != nil {
 				a.Close()
 				return

+ 1 - 1
pkg/beam/router.go

@@ -78,7 +78,7 @@ func (route *Route) Tee(dst Sender) *Route {
 			return inner(payload, attachment)
 		}
 		// Setup the tee
-		w, err := SendPipe(dst, payload)
+		w, err := SendRPipe(dst, payload)
 		if err != nil {
 			return err
 		}