streams.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package stream
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "strings"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/libcontainerd"
  10. "github.com/docker/docker/pkg/broadcaster"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/pools"
  13. )
  14. // Config holds information about I/O streams managed together.
  15. //
  16. // config.StdinPipe returns a WriteCloser which can be used to feed data
  17. // to the standard input of the streamConfig's active process.
  18. // config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
  19. // which can be used to retrieve the standard output (and error) generated
  20. // by the container's active process. The output (and error) are actually
  21. // copied and delivered to all StdoutPipe and StderrPipe consumers, using
  22. // a kind of "broadcaster".
  23. type Config struct {
  24. sync.WaitGroup
  25. stdout *broadcaster.Unbuffered
  26. stderr *broadcaster.Unbuffered
  27. stdin io.ReadCloser
  28. stdinPipe io.WriteCloser
  29. }
  30. // NewConfig creates a stream config and initializes
  31. // the standard err and standard out to new unbuffered broadcasters.
  32. func NewConfig() *Config {
  33. return &Config{
  34. stderr: new(broadcaster.Unbuffered),
  35. stdout: new(broadcaster.Unbuffered),
  36. }
  37. }
  38. // Stdout returns the standard output in the configuration.
  39. func (c *Config) Stdout() *broadcaster.Unbuffered {
  40. return c.stdout
  41. }
  42. // Stderr returns the standard error in the configuration.
  43. func (c *Config) Stderr() *broadcaster.Unbuffered {
  44. return c.stderr
  45. }
  46. // Stdin returns the standard input in the configuration.
  47. func (c *Config) Stdin() io.ReadCloser {
  48. return c.stdin
  49. }
  50. // StdinPipe returns an input writer pipe as an io.WriteCloser.
  51. func (c *Config) StdinPipe() io.WriteCloser {
  52. return c.stdinPipe
  53. }
  54. // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
  55. // It adds this new out pipe to the Stdout broadcaster.
  56. // This will block stdout if unconsumed.
  57. func (c *Config) StdoutPipe() io.ReadCloser {
  58. bytesPipe := ioutils.NewBytesPipe()
  59. c.stdout.Add(bytesPipe)
  60. return bytesPipe
  61. }
  62. // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
  63. // It adds this new err pipe to the Stderr broadcaster.
  64. // This will block stderr if unconsumed.
  65. func (c *Config) StderrPipe() io.ReadCloser {
  66. bytesPipe := ioutils.NewBytesPipe()
  67. c.stderr.Add(bytesPipe)
  68. return bytesPipe
  69. }
  70. // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
  71. func (c *Config) NewInputPipes() {
  72. c.stdin, c.stdinPipe = io.Pipe()
  73. }
  74. // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
  75. func (c *Config) NewNopInputPipe() {
  76. c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
  77. }
  78. // CloseStreams ensures that the configured streams are properly closed.
  79. func (c *Config) CloseStreams() error {
  80. var errors []string
  81. if c.stdin != nil {
  82. if err := c.stdin.Close(); err != nil {
  83. errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
  84. }
  85. }
  86. if err := c.stdout.Clean(); err != nil {
  87. errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
  88. }
  89. if err := c.stderr.Clean(); err != nil {
  90. errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
  91. }
  92. if len(errors) > 0 {
  93. return fmt.Errorf(strings.Join(errors, "\n"))
  94. }
  95. return nil
  96. }
  97. // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
  98. func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) {
  99. copyFunc := func(w io.Writer, r io.ReadCloser) {
  100. c.Add(1)
  101. go func() {
  102. if _, err := pools.Copy(w, r); err != nil {
  103. logrus.Errorf("stream copy error: %+v", err)
  104. }
  105. r.Close()
  106. c.Done()
  107. }()
  108. }
  109. if iop.Stdout != nil {
  110. copyFunc(c.Stdout(), iop.Stdout)
  111. }
  112. if iop.Stderr != nil {
  113. copyFunc(c.Stderr(), iop.Stderr)
  114. }
  115. if stdin := c.Stdin(); stdin != nil {
  116. if iop.Stdin != nil {
  117. go func() {
  118. pools.Copy(iop.Stdin, stdin)
  119. if err := iop.Stdin.Close(); err != nil {
  120. logrus.Warnf("failed to close stdin: %+v", err)
  121. }
  122. }()
  123. }
  124. }
  125. }