streams.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package stream // import "github.com/docker/docker/container/stream"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "sync"
  8. "github.com/containerd/containerd/cio"
  9. "github.com/docker/docker/pkg/broadcaster"
  10. "github.com/docker/docker/pkg/ioutils"
  11. "github.com/docker/docker/pkg/pools"
  12. "github.com/sirupsen/logrus"
  13. )
  14. // Config holds information about I/O streams managed together.
  15. //
  16. // config.StdinPipe returns a WriteCloser which can be used to feed data
  17. // to the standard input of the streamConfig's active process.
  18. // config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
  19. // which can be used to retrieve the standard output (and error) generated
  20. // by the container's active process. The output (and error) are actually
  21. // copied and delivered to all StdoutPipe and StderrPipe consumers, using
  22. // a kind of "broadcaster".
  23. type Config struct {
  24. wg sync.WaitGroup
  25. stdout *broadcaster.Unbuffered
  26. stderr *broadcaster.Unbuffered
  27. stdin io.ReadCloser
  28. stdinPipe io.WriteCloser
  29. dio *cio.DirectIO
  30. }
  31. // NewConfig creates a stream config and initializes
  32. // the standard err and standard out to new unbuffered broadcasters.
  33. func NewConfig() *Config {
  34. return &Config{
  35. stderr: new(broadcaster.Unbuffered),
  36. stdout: new(broadcaster.Unbuffered),
  37. }
  38. }
  39. // Stdout returns the standard output in the configuration.
  40. func (c *Config) Stdout() *broadcaster.Unbuffered {
  41. return c.stdout
  42. }
  43. // Stderr returns the standard error in the configuration.
  44. func (c *Config) Stderr() *broadcaster.Unbuffered {
  45. return c.stderr
  46. }
  47. // Stdin returns the standard input in the configuration.
  48. func (c *Config) Stdin() io.ReadCloser {
  49. return c.stdin
  50. }
  51. // StdinPipe returns an input writer pipe as an io.WriteCloser.
  52. func (c *Config) StdinPipe() io.WriteCloser {
  53. return c.stdinPipe
  54. }
  55. // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
  56. // It adds this new out pipe to the Stdout broadcaster.
  57. // This will block stdout if unconsumed.
  58. func (c *Config) StdoutPipe() io.ReadCloser {
  59. bytesPipe := ioutils.NewBytesPipe()
  60. c.stdout.Add(bytesPipe)
  61. return bytesPipe
  62. }
  63. // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
  64. // It adds this new err pipe to the Stderr broadcaster.
  65. // This will block stderr if unconsumed.
  66. func (c *Config) StderrPipe() io.ReadCloser {
  67. bytesPipe := ioutils.NewBytesPipe()
  68. c.stderr.Add(bytesPipe)
  69. return bytesPipe
  70. }
  71. // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
  72. func (c *Config) NewInputPipes() {
  73. c.stdin, c.stdinPipe = io.Pipe()
  74. }
  75. // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
  76. func (c *Config) NewNopInputPipe() {
  77. c.stdinPipe = ioutils.NopWriteCloser(io.Discard)
  78. }
  79. // CloseStreams ensures that the configured streams are properly closed.
  80. func (c *Config) CloseStreams() error {
  81. var errors []string
  82. if c.stdin != nil {
  83. if err := c.stdin.Close(); err != nil {
  84. errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
  85. }
  86. }
  87. if err := c.stdout.Clean(); err != nil {
  88. errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
  89. }
  90. if err := c.stderr.Clean(); err != nil {
  91. errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
  92. }
  93. if len(errors) > 0 {
  94. return fmt.Errorf(strings.Join(errors, "\n"))
  95. }
  96. return nil
  97. }
  98. // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
  99. func (c *Config) CopyToPipe(iop *cio.DirectIO) {
  100. c.dio = iop
  101. copyFunc := func(w io.Writer, r io.ReadCloser) {
  102. c.wg.Add(1)
  103. go func() {
  104. if _, err := pools.Copy(w, r); err != nil {
  105. logrus.Errorf("stream copy error: %v", err)
  106. }
  107. r.Close()
  108. c.wg.Done()
  109. }()
  110. }
  111. if iop.Stdout != nil {
  112. copyFunc(c.Stdout(), iop.Stdout)
  113. }
  114. if iop.Stderr != nil {
  115. copyFunc(c.Stderr(), iop.Stderr)
  116. }
  117. if stdin := c.Stdin(); stdin != nil {
  118. if iop.Stdin != nil {
  119. go func() {
  120. pools.Copy(iop.Stdin, stdin)
  121. if err := iop.Stdin.Close(); err != nil {
  122. logrus.Warnf("failed to close stdin: %v", err)
  123. }
  124. }()
  125. }
  126. }
  127. }
  128. // Wait for the stream to close
  129. // Wait supports timeouts via the context to unblock and forcefully
  130. // close the io streams
  131. func (c *Config) Wait(ctx context.Context) {
  132. done := make(chan struct{}, 1)
  133. go func() {
  134. c.wg.Wait()
  135. close(done)
  136. }()
  137. select {
  138. case <-done:
  139. case <-ctx.Done():
  140. if c.dio != nil {
  141. c.dio.Cancel()
  142. c.dio.Wait()
  143. c.dio.Close()
  144. }
  145. }
  146. }