stdcopy.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package utils
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. )
  7. const (
  8. StdWriterPrefixLen = 8
  9. StdWriterFdIndex = 0
  10. StdWriterSizeIndex = 4
  11. )
  12. type StdType [StdWriterPrefixLen]byte
  13. var (
  14. Stdin StdType = StdType{0: 0}
  15. Stdout StdType = StdType{0: 1}
  16. Stderr StdType = StdType{0: 2}
  17. )
  18. type StdWriter struct {
  19. io.Writer
  20. prefix StdType
  21. sizeBuf []byte
  22. }
  23. func (w *StdWriter) Write(buf []byte) (n int, err error) {
  24. if w == nil || w.Writer == nil {
  25. return 0, errors.New("Writer not instanciated")
  26. }
  27. binary.BigEndian.PutUint32(w.prefix[4:], uint32(len(buf)))
  28. buf = append(w.prefix[:], buf...)
  29. n, err = w.Writer.Write(buf)
  30. return n - StdWriterPrefixLen, err
  31. }
  32. // NewStdWriter instanciates a new Writer.
  33. // Everything written to it will be encapsulated using a custom format,
  34. // and written to the underlying `w` stream.
  35. // This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection.
  36. // `t` indicates the id of the stream to encapsulate.
  37. // It can be utils.Stdin, utils.Stdout, utils.Stderr.
  38. func NewStdWriter(w io.Writer, t StdType) *StdWriter {
  39. if len(t) != StdWriterPrefixLen {
  40. return nil
  41. }
  42. return &StdWriter{
  43. Writer: w,
  44. prefix: t,
  45. sizeBuf: make([]byte, 4),
  46. }
  47. }
  48. var ErrInvalidStdHeader = errors.New("Unrecognized input header")
  49. // StdCopy is a modified version of io.Copy.
  50. //
  51. // StdCopy will demultiplex `src`, assuming that it contains two streams,
  52. // previously multiplexed together using a StdWriter instance.
  53. // As it reads from `src`, StdCopy will write to `dstout` and `dsterr`.
  54. //
  55. // StdCopy will read until it hits EOF on `src`. It will then return a nil error.
  56. // In other words: if `err` is non nil, it indicates a real underlying error.
  57. //
  58. // `written` will hold the total number of bytes written to `dstout` and `dsterr`.
  59. func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
  60. var (
  61. buf = make([]byte, 32*1024+StdWriterPrefixLen+1)
  62. bufLen = len(buf)
  63. nr, nw int
  64. er, ew error
  65. out io.Writer
  66. frameSize int
  67. )
  68. for {
  69. // Make sure we have at least a full header
  70. for nr < StdWriterPrefixLen {
  71. var nr2 int
  72. nr2, er = src.Read(buf[nr:])
  73. if er == io.EOF {
  74. return written, nil
  75. }
  76. if er != nil {
  77. return 0, er
  78. }
  79. nr += nr2
  80. }
  81. // Check the first byte to know where to write
  82. switch buf[StdWriterFdIndex] {
  83. case 0:
  84. fallthrough
  85. case 1:
  86. // Write on stdout
  87. out = dstout
  88. case 2:
  89. // Write on stderr
  90. out = dsterr
  91. default:
  92. Debugf("Error selecting output fd: (%d)", buf[StdWriterFdIndex])
  93. return 0, ErrInvalidStdHeader
  94. }
  95. // Retrieve the size of the frame
  96. frameSize = int(binary.BigEndian.Uint32(buf[StdWriterSizeIndex : StdWriterSizeIndex+4]))
  97. Debugf("framesize: %d", frameSize)
  98. // Check if the buffer is big enough to read the frame.
  99. // Extend it if necessary.
  100. if frameSize+StdWriterPrefixLen > bufLen {
  101. Debugf("Extending buffer cap by %d (was %d)", frameSize+StdWriterPrefixLen-bufLen+1, len(buf))
  102. buf = append(buf, make([]byte, frameSize+StdWriterPrefixLen-bufLen+1)...)
  103. bufLen = len(buf)
  104. }
  105. // While the amount of bytes read is less than the size of the frame + header, we keep reading
  106. for nr < frameSize+StdWriterPrefixLen {
  107. var nr2 int
  108. nr2, er = src.Read(buf[nr:])
  109. if er == io.EOF {
  110. return written, nil
  111. }
  112. if er != nil {
  113. Debugf("Error reading frame: %s", er)
  114. return 0, er
  115. }
  116. nr += nr2
  117. }
  118. // Write the retrieved frame (without header)
  119. nw, ew = out.Write(buf[StdWriterPrefixLen : frameSize+StdWriterPrefixLen])
  120. if nw > 0 {
  121. written += int64(nw)
  122. }
  123. if ew != nil {
  124. Debugf("Error writing frame: %s", ew)
  125. return 0, ew
  126. }
  127. // If the frame has not been fully written: error
  128. if nw != frameSize {
  129. Debugf("Error Short Write: (%d on %d)", nw, frameSize)
  130. return 0, io.ErrShortWrite
  131. }
  132. // Move the rest of the buffer to the beginning
  133. copy(buf, buf[frameSize+StdWriterPrefixLen:])
  134. // Move the index
  135. nr -= frameSize + StdWriterPrefixLen
  136. }
  137. }