streams.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package stream // import "github.com/docker/docker/container/stream"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "sync"
  8. "github.com/containerd/containerd/cio"
  9. "github.com/containerd/log"
  10. "github.com/docker/docker/pkg/broadcaster"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/pools"
  13. )
  14. // Config holds information about I/O streams managed together.
  15. //
  16. // config.StdinPipe returns a WriteCloser which can be used to feed data
  17. // to the standard input of the streamConfig's active process.
  18. // config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
  19. // which can be used to retrieve the standard output (and error) generated
  20. // by the container's active process. The output (and error) are actually
  21. // copied and delivered to all StdoutPipe and StderrPipe consumers, using
  22. // a kind of "broadcaster".
  23. type Config struct {
  24. wg sync.WaitGroup
  25. stdout *broadcaster.Unbuffered
  26. stderr *broadcaster.Unbuffered
  27. stdin io.ReadCloser
  28. stdinPipe io.WriteCloser
  29. dio *cio.DirectIO
  30. }
  31. // NewConfig creates a stream config and initializes
  32. // the standard err and standard out to new unbuffered broadcasters.
  33. func NewConfig() *Config {
  34. return &Config{
  35. stderr: new(broadcaster.Unbuffered),
  36. stdout: new(broadcaster.Unbuffered),
  37. }
  38. }
  39. // Stdout returns the standard output in the configuration.
  40. func (c *Config) Stdout() *broadcaster.Unbuffered {
  41. return c.stdout
  42. }
  43. // Stderr returns the standard error in the configuration.
  44. func (c *Config) Stderr() *broadcaster.Unbuffered {
  45. return c.stderr
  46. }
  47. // Stdin returns the standard input in the configuration.
  48. func (c *Config) Stdin() io.ReadCloser {
  49. return c.stdin
  50. }
  51. // StdinPipe returns an input writer pipe as an io.WriteCloser.
  52. func (c *Config) StdinPipe() io.WriteCloser {
  53. return c.stdinPipe
  54. }
  55. // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
  56. // It adds this new out pipe to the Stdout broadcaster.
  57. // This will block stdout if unconsumed.
  58. func (c *Config) StdoutPipe() io.ReadCloser {
  59. bytesPipe := ioutils.NewBytesPipe()
  60. c.stdout.Add(bytesPipe)
  61. return bytesPipe
  62. }
  63. // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
  64. // It adds this new err pipe to the Stderr broadcaster.
  65. // This will block stderr if unconsumed.
  66. func (c *Config) StderrPipe() io.ReadCloser {
  67. bytesPipe := ioutils.NewBytesPipe()
  68. c.stderr.Add(bytesPipe)
  69. return bytesPipe
  70. }
  71. // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
  72. func (c *Config) NewInputPipes() {
  73. c.stdin, c.stdinPipe = io.Pipe()
  74. }
  75. // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
  76. func (c *Config) NewNopInputPipe() {
  77. c.stdinPipe = ioutils.NopWriteCloser(io.Discard)
  78. }
  79. // CloseStreams ensures that the configured streams are properly closed.
  80. func (c *Config) CloseStreams() error {
  81. var errors []string
  82. if c.stdin != nil {
  83. if err := c.stdin.Close(); err != nil {
  84. errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
  85. }
  86. }
  87. if err := c.stdout.Clean(); err != nil {
  88. errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
  89. }
  90. if err := c.stderr.Clean(); err != nil {
  91. errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
  92. }
  93. if len(errors) > 0 {
  94. return fmt.Errorf(strings.Join(errors, "\n"))
  95. }
  96. return nil
  97. }
  98. // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
  99. func (c *Config) CopyToPipe(iop *cio.DirectIO) {
  100. ctx := context.TODO()
  101. c.dio = iop
  102. copyFunc := func(w io.Writer, r io.ReadCloser) {
  103. c.wg.Add(1)
  104. go func() {
  105. if _, err := pools.Copy(w, r); err != nil {
  106. log.G(ctx).Errorf("stream copy error: %v", err)
  107. }
  108. r.Close()
  109. c.wg.Done()
  110. }()
  111. }
  112. if iop.Stdout != nil {
  113. copyFunc(c.Stdout(), iop.Stdout)
  114. }
  115. if iop.Stderr != nil {
  116. copyFunc(c.Stderr(), iop.Stderr)
  117. }
  118. if stdin := c.Stdin(); stdin != nil {
  119. if iop.Stdin != nil {
  120. go func() {
  121. pools.Copy(iop.Stdin, stdin)
  122. if err := iop.Stdin.Close(); err != nil {
  123. log.G(ctx).Warnf("failed to close stdin: %v", err)
  124. }
  125. }()
  126. }
  127. }
  128. }
  129. // Wait for the stream to close
  130. // Wait supports timeouts via the context to unblock and forcefully
  131. // close the io streams
  132. func (c *Config) Wait(ctx context.Context) {
  133. done := make(chan struct{}, 1)
  134. go func() {
  135. c.wg.Wait()
  136. close(done)
  137. }()
  138. select {
  139. case <-done:
  140. case <-ctx.Done():
  141. if c.dio != nil {
  142. c.dio.Cancel()
  143. c.dio.Wait()
  144. c.dio.Close()
  145. }
  146. }
  147. }