stdcopy.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package stdcopy
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "github.com/docker/docker/pkg/log"
  7. )
  8. const (
  9. StdWriterPrefixLen = 8
  10. StdWriterFdIndex = 0
  11. StdWriterSizeIndex = 4
  12. )
  13. type StdType [StdWriterPrefixLen]byte
  14. var (
  15. Stdin StdType = StdType{0: 0}
  16. Stdout StdType = StdType{0: 1}
  17. Stderr StdType = StdType{0: 2}
  18. )
  19. type StdWriter struct {
  20. io.Writer
  21. prefix StdType
  22. sizeBuf []byte
  23. }
  24. func (w *StdWriter) Write(buf []byte) (n int, err error) {
  25. var n1, n2 int
  26. if w == nil || w.Writer == nil {
  27. return 0, errors.New("Writer not instanciated")
  28. }
  29. binary.BigEndian.PutUint32(w.prefix[4:], uint32(len(buf)))
  30. n1, err = w.Writer.Write(w.prefix[:])
  31. if err != nil {
  32. n = n1 - StdWriterPrefixLen
  33. } else {
  34. n2, err = w.Writer.Write(buf)
  35. n = n1 + n2 - StdWriterPrefixLen
  36. }
  37. if n < 0 {
  38. n = 0
  39. }
  40. return
  41. }
  42. // NewStdWriter instanciates a new Writer.
  43. // Everything written to it will be encapsulated using a custom format,
  44. // and written to the underlying `w` stream.
  45. // This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection.
  46. // `t` indicates the id of the stream to encapsulate.
  47. // It can be utils.Stdin, utils.Stdout, utils.Stderr.
  48. func NewStdWriter(w io.Writer, t StdType) *StdWriter {
  49. if len(t) != StdWriterPrefixLen {
  50. return nil
  51. }
  52. return &StdWriter{
  53. Writer: w,
  54. prefix: t,
  55. sizeBuf: make([]byte, 4),
  56. }
  57. }
  58. var ErrInvalidStdHeader = errors.New("Unrecognized input header")
  59. // StdCopy is a modified version of io.Copy.
  60. //
  61. // StdCopy will demultiplex `src`, assuming that it contains two streams,
  62. // previously multiplexed together using a StdWriter instance.
  63. // As it reads from `src`, StdCopy will write to `dstout` and `dsterr`.
  64. //
  65. // StdCopy will read until it hits EOF on `src`. It will then return a nil error.
  66. // In other words: if `err` is non nil, it indicates a real underlying error.
  67. //
  68. // `written` will hold the total number of bytes written to `dstout` and `dsterr`.
  69. func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
  70. var (
  71. buf = make([]byte, 32*1024+StdWriterPrefixLen+1)
  72. bufLen = len(buf)
  73. nr, nw int
  74. er, ew error
  75. out io.Writer
  76. frameSize int
  77. )
  78. for {
  79. // Make sure we have at least a full header
  80. for nr < StdWriterPrefixLen {
  81. var nr2 int
  82. nr2, er = src.Read(buf[nr:])
  83. nr += nr2
  84. if er == io.EOF {
  85. if nr < StdWriterPrefixLen {
  86. log.Debugf("Corrupted prefix: %v", buf[:nr])
  87. return written, nil
  88. }
  89. break
  90. }
  91. if er != nil {
  92. log.Debugf("Error reading header: %s", er)
  93. return 0, er
  94. }
  95. }
  96. // Check the first byte to know where to write
  97. switch buf[StdWriterFdIndex] {
  98. case 0:
  99. fallthrough
  100. case 1:
  101. // Write on stdout
  102. out = dstout
  103. case 2:
  104. // Write on stderr
  105. out = dsterr
  106. default:
  107. log.Debugf("Error selecting output fd: (%d)", buf[StdWriterFdIndex])
  108. return 0, ErrInvalidStdHeader
  109. }
  110. // Retrieve the size of the frame
  111. frameSize = int(binary.BigEndian.Uint32(buf[StdWriterSizeIndex : StdWriterSizeIndex+4]))
  112. log.Debugf("framesize: %d", frameSize)
  113. // Check if the buffer is big enough to read the frame.
  114. // Extend it if necessary.
  115. if frameSize+StdWriterPrefixLen > bufLen {
  116. log.Debugf("Extending buffer cap by %d (was %d)", frameSize+StdWriterPrefixLen-bufLen+1, len(buf))
  117. buf = append(buf, make([]byte, frameSize+StdWriterPrefixLen-bufLen+1)...)
  118. bufLen = len(buf)
  119. }
  120. // While the amount of bytes read is less than the size of the frame + header, we keep reading
  121. for nr < frameSize+StdWriterPrefixLen {
  122. var nr2 int
  123. nr2, er = src.Read(buf[nr:])
  124. nr += nr2
  125. if er == io.EOF {
  126. if nr < frameSize+StdWriterPrefixLen {
  127. log.Debugf("Corrupted frame: %v", buf[StdWriterPrefixLen:nr])
  128. return written, nil
  129. }
  130. break
  131. }
  132. if er != nil {
  133. log.Debugf("Error reading frame: %s", er)
  134. return 0, er
  135. }
  136. }
  137. // Write the retrieved frame (without header)
  138. nw, ew = out.Write(buf[StdWriterPrefixLen : frameSize+StdWriterPrefixLen])
  139. if ew != nil {
  140. log.Debugf("Error writing frame: %s", ew)
  141. return 0, ew
  142. }
  143. // If the frame has not been fully written: error
  144. if nw != frameSize {
  145. log.Debugf("Error Short Write: (%d on %d)", nw, frameSize)
  146. return 0, io.ErrShortWrite
  147. }
  148. written += int64(nw)
  149. // Move the rest of the buffer to the beginning
  150. copy(buf, buf[frameSize+StdWriterPrefixLen:])
  151. // Move the index
  152. nr -= frameSize + StdWriterPrefixLen
  153. }
  154. }