pipeline.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package pty
  2. import (
  3. "encoding/json"
  4. "github.com/creack/pty"
  5. "github.com/gorilla/websocket"
  6. "github.com/pkg/errors"
  7. "os"
  8. "os/exec"
  9. "time"
  10. "unicode/utf8"
  11. )
  12. type Pipeline struct {
  13. Pty *os.File
  14. ws *websocket.Conn
  15. }
  16. type Message struct {
  17. Type MsgType
  18. Data json.RawMessage
  19. }
  20. const bufferSize = 2048
  21. func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) {
  22. c := exec.Command("login")
  23. ptmx, err := pty.StartWithSize(c, &pty.Winsize{Cols: 90, Rows: 60})
  24. if err != nil {
  25. return nil, errors.Wrap(err, "start pty error")
  26. }
  27. p = &Pipeline{
  28. Pty: ptmx,
  29. ws: conn,
  30. }
  31. return
  32. }
  33. func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) {
  34. for {
  35. msgType, payload, err := p.ws.ReadMessage()
  36. if err != nil {
  37. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
  38. websocket.CloseNormalClosure) {
  39. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
  40. }
  41. return
  42. }
  43. if msgType != websocket.TextMessage {
  44. errorChan <- errors.Errorf("Error ReadWsAndWritePty Invalid msgType: %v", msgType)
  45. return
  46. }
  47. var msg Message
  48. err = json.Unmarshal(payload, &msg)
  49. if err != nil {
  50. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal")
  51. return
  52. }
  53. switch msg.Type {
  54. case TypeData:
  55. var data string
  56. err = json.Unmarshal(msg.Data, &data)
  57. if err != nil {
  58. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal msg.Data")
  59. return
  60. }
  61. _, err = p.Pty.Write([]byte(data))
  62. if err != nil {
  63. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty write pty")
  64. return
  65. }
  66. case TypeResize:
  67. var win struct {
  68. Cols uint16
  69. Rows uint16
  70. }
  71. err = json.Unmarshal(msg.Data, &win)
  72. if err != nil {
  73. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty Invalid resize message")
  74. return
  75. }
  76. err = pty.Setsize(p.Pty, &pty.Winsize{Rows: win.Rows, Cols: win.Cols})
  77. if err != nil {
  78. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty set pty size")
  79. return
  80. }
  81. case TypePing:
  82. err = p.ws.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
  83. if err != nil {
  84. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty write pong")
  85. return
  86. }
  87. default:
  88. errorChan <- errors.Errorf("Error ReadWsAndWritePty unknown msg.Type %v", msg.Type)
  89. return
  90. }
  91. }
  92. }
  93. func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) {
  94. buf := make([]byte, bufferSize)
  95. for {
  96. n, err := p.Pty.Read(buf)
  97. if err != nil {
  98. errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs read pty")
  99. return
  100. }
  101. processedOutput := validString(string(buf[:n]))
  102. err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput))
  103. if err != nil {
  104. if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  105. errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
  106. }
  107. return
  108. }
  109. }
  110. }
  111. func validString(s string) string {
  112. if !utf8.ValidString(s) {
  113. v := make([]rune, 0, len(s))
  114. for i, r := range s {
  115. if r == utf8.RuneError {
  116. _, size := utf8.DecodeRuneInString(s[i:])
  117. if size == 1 {
  118. continue
  119. }
  120. }
  121. v = append(v, r)
  122. }
  123. s = string(v)
  124. }
  125. return s
  126. }