Browse Source

beam/examples/beamsh: remote communication over beam (experimental).

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
Solomon Hykes 11 years ago
parent
commit
222fc87ade
2 changed files with 186 additions and 46 deletions
  1. 1 0
      pkg/beam/data/data.go
  2. 185 46
      pkg/beam/examples/beamsh/beamsh.go

+ 1 - 0
pkg/beam/data/data.go

@@ -24,6 +24,7 @@ func encodeString(s string) string {
 }
 
 var EncodeString = encodeString
+var DecodeString = decodeString
 
 func encodeList(l []string) string {
 	values := make([]string, 0, len(l))

+ 185 - 46
pkg/beam/examples/beamsh/beamsh.go

@@ -124,6 +124,168 @@ func executeScript(client *net.UnixConn, script []*dockerscript.Command) error {
 	return nil
 }
 
+func dialer(addr string) (chan net.Conn, error) {
+	u, err := url.Parse(addr)
+	if err != nil {
+		return nil, err
+	}
+	connections := make(chan net.Conn)
+	go func() {
+		defer close(connections)
+		for {
+			conn, err := net.Dial(u.Scheme, u.Host)
+			if err != nil {
+				return
+			}
+			connections <-conn
+		}
+	}()
+	return connections, nil
+}
+
+func listener(addr string) (chan net.Conn, error) {
+	u, err := url.Parse(addr)
+	if err != nil {
+		return nil, err
+	}
+	l, err := net.Listen(u.Scheme, u.Host)
+	if err != nil {
+		return nil, err
+	}
+	connections := make(chan net.Conn)
+	go func() {
+		defer close(connections)
+		for {
+			conn, err := l.Accept()
+			if err != nil {
+				return
+			}
+			Logf("new connection\n")
+			connections<-conn
+		}
+	}()
+	return connections, nil
+}
+
+func msgDesc(payload []byte, attachment *os.File) string {
+	var filedesc string = "<nil>"
+	if attachment != nil {
+		filedesc = fmt.Sprintf("%d", attachment.Fd())
+	}
+	return fmt.Sprintf("'%s'[%s]", payload, filedesc)
+
+}
+
+func SendToConn(connections chan net.Conn, src *net.UnixConn) error {
+	var tasks sync.WaitGroup
+	defer tasks.Wait()
+	for {
+		payload, attachment, err := beam.Receive(src)
+		if err == io.EOF {
+			return nil
+		} else if err != nil {
+			return err
+		}
+		conn, ok := <-connections
+		if !ok {
+			break
+		}
+		Logf("Sending %s\n", msgDesc(payload, attachment))
+		tasks.Add(1)
+		go func(payload []byte, attachment *os.File, conn net.Conn) {
+			defer tasks.Done()
+			if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
+				return
+			}
+			if attachment == nil {
+				conn.Close()
+				return
+			}
+			var iotasks sync.WaitGroup
+			iotasks.Add(2)
+			go func(attachment *os.File, conn net.Conn) {
+				defer iotasks.Done()
+				Debugf("copying the connection to [%d]\n", attachment.Fd())
+				io.Copy(attachment, conn)
+				attachment.Close()
+				Debugf("done copying the connection to [%d]\n", attachment.Fd())
+			}(attachment, conn)
+			go func(attachment *os.File, conn net.Conn) {
+				defer iotasks.Done()
+				Debugf("copying [%d] to the connection\n", attachment.Fd())
+				io.Copy(conn, attachment)
+				conn.Close()
+				Debugf("done copying [%d] to the connection\n", attachment.Fd())
+			}(attachment, conn)
+			iotasks.Wait()
+		}(payload, attachment, conn)
+	}
+	return nil
+}
+
+func bicopy(a, b io.ReadWriteCloser) {
+	var iotasks sync.WaitGroup
+	oneCopy := func(dst io.WriteCloser, src io.Reader) {
+		defer iotasks.Done()
+		io.Copy(dst, src)
+		dst.Close()
+	}
+	iotasks.Add(2)
+	go oneCopy(a, b)
+	go oneCopy(b, a)
+	iotasks.Wait()
+}
+
+func ReceiveFromConn(connections chan net.Conn, dst *net.UnixConn) error {
+	for conn := range connections {
+		err := func () error {
+			Logf("parsing message from network...\n")
+			defer Logf("done parsing message from network\n")
+			buf := make([]byte, 4098)
+			n, err := conn.Read(buf)
+			if n == 0 {
+				conn.Close()
+				if err == io.EOF {
+					return nil
+				} else {
+					return err
+				}
+			}
+			Logf("decoding message from '%s'\n", buf[:n])
+			header, skip, err := data.DecodeString(string(buf[:n]))
+			if err != nil {
+				conn.Close()
+				return err
+			}
+			pub, priv, err := beam.SocketPair()
+			if err != nil {
+				return err
+			}
+			Logf("decoded message: %s\n", data.Message(header).Pretty())
+			go func(skipped []byte, conn net.Conn, f *os.File) {
+				// this closes both conn and f
+				if len(skipped) > 0 {
+					if _, err := f.Write(skipped); err != nil {
+						Logf("ERROR: %v\n", err)
+						f.Close()
+						conn.Close()
+						return
+					}
+				}
+				bicopy(conn, f)
+			}(buf[skip:n], conn, pub)
+			if err := beam.Send(dst, []byte(header), priv); err != nil {
+				return err
+			}
+			return nil
+		}()
+		if err != nil {
+			Logf("Error reading from connection: %v\n", err)
+		}
+	}
+	return nil
+}
+
 //	1) Find a handler for the command (if no handler, fail)
 //	2) Attach new in & out pair to the handler
 //	3) [in the background] Copy handler output to our own output
@@ -224,6 +386,7 @@ func GetHandler(name string) Handler {
 				return
 			}
 			cmd.Stderr = errW
+			cmd.Stdin = os.Stdin
 			beam.Send(out, data.Empty().Set("cmd", "log", "stdout").Bytes(), outR)
 			beam.Send(out, data.Empty().Set("cmd", "log", "stderr").Bytes(), errR)
 			execErr := cmd.Run()
@@ -327,63 +490,39 @@ func GetHandler(name string) Handler {
 		}
 	} else if name == "beamsend" {
 		return func(args []string, in *net.UnixConn, out *net.UnixConn) {
-			if len(args) != 2 {
+			if len(args) < 2 {
 				if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
 					Fatal(err)
 				}
 				return
 			}
-			u, err := url.Parse(args[1])
+			var connector func(string) (chan net.Conn, error)
+			connector = dialer
+			connections, err := connector(args[1])
 			if err != nil {
 				beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
 				return
 			}
-			var tasks sync.WaitGroup
-			for {
-				payload, attachment, err := beam.Receive(in)
-				if err != nil {
-					Logf("receive failed with err=%v\n", err)
-					break
-				}
-				conn, err := net.Dial(u.Scheme, u.Host)
-				if err != nil {
-					beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
-					return
+			// Copy in to conn
+			SendToConn(connections, in)
+		}
+	} else if name == "beamreceive" {
+		return func(args []string, in *net.UnixConn, out *net.UnixConn) {
+			if len(args) != 2 {
+				if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
+					Fatal(err)
 				}
-				beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
-				tasks.Add(1)
-				func(payload []byte, attachment *os.File, conn net.Conn) {
-					defer tasks.Done()
-					defer conn.Close()
-					// even when successful, conn.File() returns a duplicate,
-					// so we must close the original
-					if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
-						return
-					}
-					if attachment == nil {
-						return
-					}
-					var iotasks sync.WaitGroup
-					iotasks.Add(2)
-					go func(attachment *os.File, conn net.Conn) {
-						defer iotasks.Done()
-						Debugf("copying the connection to [%d]\n", attachment.Fd())
-						io.Copy(attachment, conn)
-						attachment.Close()
-						Debugf("done copying the connection to [%d]\n", attachment.Fd())
-					}(attachment, conn)
-					go func(attachment *os.File, conn net.Conn) {
-						defer iotasks.Done()
-						Debugf("copying [%d] to the connection\n", attachment.Fd())
-						io.Copy(conn, attachment)
-						conn.Close()
-						Debugf("done copying [%d] to the connection\n", attachment.Fd())
-					}(attachment, conn)
-					iotasks.Wait()
-					attachment.Close()
-				}(payload, attachment, conn)
+				return
 			}
-			tasks.Wait()
+			var connector func(string) (chan net.Conn, error)
+			connector = listener
+			connections, err := connector(args[1])
+			if err != nil {
+				beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
+				return
+			}
+			// Copy in to conn
+			ReceiveFromConn(connections, out)
 		}
 	} else if name == "connect" {
 		return func(args []string, in *net.UnixConn, out *net.UnixConn) {