streams.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package stream
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "strings"
  7. "sync"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/libcontainerd"
  10. "github.com/docker/docker/pkg/broadcaster"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/pools"
  13. )
  14. // Config holds information about I/O streams managed together.
  15. //
  16. // config.StdinPipe returns a WriteCloser which can be used to feed data
  17. // to the standard input of the streamConfig's active process.
  18. // config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
  19. // which can be used to retrieve the standard output (and error) generated
  20. // by the container's active process. The output (and error) are actually
  21. // copied and delivered to all StdoutPipe and StderrPipe consumers, using
  22. // a kind of "broadcaster".
  23. type Config struct {
  24. sync.WaitGroup
  25. stdout *broadcaster.Unbuffered
  26. stderr *broadcaster.Unbuffered
  27. stdin io.ReadCloser
  28. stdinPipe io.WriteCloser
  29. }
  30. // NewConfig creates a stream config and initializes
  31. // the standard err and standard out to new unbuffered broadcasters.
  32. func NewConfig() *Config {
  33. return &Config{
  34. stderr: new(broadcaster.Unbuffered),
  35. stdout: new(broadcaster.Unbuffered),
  36. }
  37. }
  38. // Stdout returns the standard output in the configuration.
  39. func (c *Config) Stdout() *broadcaster.Unbuffered {
  40. return c.stdout
  41. }
  42. // Stderr returns the standard error in the configuration.
  43. func (c *Config) Stderr() *broadcaster.Unbuffered {
  44. return c.stderr
  45. }
  46. // Stdin returns the standard input in the configuration.
  47. func (c *Config) Stdin() io.ReadCloser {
  48. return c.stdin
  49. }
  50. // StdinPipe returns an input writer pipe as an io.WriteCloser.
  51. func (c *Config) StdinPipe() io.WriteCloser {
  52. return c.stdinPipe
  53. }
  54. // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
  55. // It adds this new out pipe to the Stdout broadcaster.
  56. func (c *Config) StdoutPipe() io.ReadCloser {
  57. bytesPipe := ioutils.NewBytesPipe()
  58. c.stdout.Add(bytesPipe)
  59. return bytesPipe
  60. }
  61. // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
  62. // It adds this new err pipe to the Stderr broadcaster.
  63. func (c *Config) StderrPipe() io.ReadCloser {
  64. bytesPipe := ioutils.NewBytesPipe()
  65. c.stderr.Add(bytesPipe)
  66. return bytesPipe
  67. }
  68. // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
  69. func (c *Config) NewInputPipes() {
  70. c.stdin, c.stdinPipe = io.Pipe()
  71. }
  72. // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
  73. func (c *Config) NewNopInputPipe() {
  74. c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
  75. }
  76. // CloseStreams ensures that the configured streams are properly closed.
  77. func (c *Config) CloseStreams() error {
  78. var errors []string
  79. if c.stdin != nil {
  80. if err := c.stdin.Close(); err != nil {
  81. errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
  82. }
  83. }
  84. if err := c.stdout.Clean(); err != nil {
  85. errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
  86. }
  87. if err := c.stderr.Clean(); err != nil {
  88. errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
  89. }
  90. if len(errors) > 0 {
  91. return fmt.Errorf(strings.Join(errors, "\n"))
  92. }
  93. return nil
  94. }
  95. // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
  96. func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) {
  97. copyFunc := func(w io.Writer, r io.Reader) {
  98. c.Add(1)
  99. go func() {
  100. if _, err := pools.Copy(w, r); err != nil {
  101. logrus.Errorf("stream copy error: %+v", err)
  102. }
  103. c.Done()
  104. }()
  105. }
  106. if iop.Stdout != nil {
  107. copyFunc(c.Stdout(), iop.Stdout)
  108. }
  109. if iop.Stderr != nil {
  110. copyFunc(c.Stderr(), iop.Stderr)
  111. }
  112. if stdin := c.Stdin(); stdin != nil {
  113. if iop.Stdin != nil {
  114. go func() {
  115. pools.Copy(iop.Stdin, stdin)
  116. if err := iop.Stdin.Close(); err != nil {
  117. logrus.Warnf("failed to close stdin: %+v", err)
  118. }
  119. }()
  120. }
  121. }
  122. }