Pārlūkot izejas kodu

beam/examples/beamsh: 'beamsend' command serializes all messages and sends them over a network connection

Docker-DCO-1.1-Signed-off-by: Solomon Hykes <solomon@docker.com> (github: shykes)
Solomon Hykes 11 gadi atpakaļ
vecāks
revīzija
6d5c75a224
1 mainītis faili ar 60 papildinājumiem un 0 dzēšanām
  1. 60 0
      pkg/beam/examples/beamsh/beamsh.go

+ 60 - 0
pkg/beam/examples/beamsh/beamsh.go

@@ -305,6 +305,66 @@ func GetHandler(name string) Handler {
 				beam.Send(out, data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
 			}
 		}
+	} else if name == "beamsend" {
+		return func(args []string, in *net.UnixConn, out *net.UnixConn) {
+			if len(args) != 2 {
+				if err := beam.Send(out, data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
+					Fatal(err)
+				}
+				return
+			}
+			u, err := url.Parse(args[1])
+			if err != nil {
+				beam.Send(out, data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
+				return
+			}
+			var tasks sync.WaitGroup
+			for {
+				payload, attachment, err := beam.Receive(in)
+				if err != nil {
+					Logf("receive failed with err=%v\n", err)
+					break
+				}
+				conn, err := net.Dial(u.Scheme, u.Host)
+				if err != nil {
+					beam.Send(out, data.Empty().Set("cmd", "msg", "connect error: " + err.Error()).Bytes(), nil)
+					return
+				}
+				beam.Send(out, data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
+				tasks.Add(1)
+				func(payload []byte, attachment *os.File, conn net.Conn) {
+					defer tasks.Done()
+					defer conn.Close()
+					// even when successful, conn.File() returns a duplicate,
+					// so we must close the original
+					if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
+						return
+					}
+					if attachment == nil {
+						return
+					}
+					var iotasks sync.WaitGroup
+					iotasks.Add(2)
+					go func(attachment *os.File, conn net.Conn) {
+						defer iotasks.Done()
+						Debugf("copying the connection to [%d]\n", attachment.Fd())
+						io.Copy(attachment, conn)
+						attachment.Close()
+						Debugf("done copying the connection to [%d]\n", attachment.Fd())
+					}(attachment, conn)
+					go func(attachment *os.File, conn net.Conn) {
+						defer iotasks.Done()
+						Debugf("copying [%d] to the connection\n", attachment.Fd())
+						io.Copy(conn, attachment)
+						conn.Close()
+						Debugf("done copying [%d] to the connection\n", attachment.Fd())
+					}(attachment, conn)
+					iotasks.Wait()
+					attachment.Close()
+				}(payload, attachment, conn)
+			}
+			tasks.Wait()
+		}
 	} else if name == "connect" {
 		return func(args []string, in *net.UnixConn, out *net.UnixConn) {
 			if len(args) != 2 {