streams.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package engine
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "sync"
  8. "unicode"
  9. )
  10. type Output struct {
  11. sync.Mutex
  12. dests []io.Writer
  13. tasks sync.WaitGroup
  14. used bool
  15. }
  16. // Tail returns the n last lines of a buffer
  17. // stripped out of trailing white spaces, if any.
  18. //
  19. // if n <= 0, returns an empty string
  20. func Tail(buffer *bytes.Buffer, n int) string {
  21. if n <= 0 {
  22. return ""
  23. }
  24. s := strings.TrimRightFunc(buffer.String(), unicode.IsSpace)
  25. i := len(s) - 1
  26. for ; i >= 0 && n > 0; i-- {
  27. if s[i] == '\n' {
  28. n--
  29. if n == 0 {
  30. break
  31. }
  32. }
  33. }
  34. // when i == -1, return the whole string which is s[0:]
  35. return s[i+1:]
  36. }
  37. // NewOutput returns a new Output object with no destinations attached.
  38. // Writing to an empty Output will cause the written data to be discarded.
  39. func NewOutput() *Output {
  40. return &Output{}
  41. }
  42. // Return true if something was written on this output
  43. func (o *Output) Used() bool {
  44. o.Lock()
  45. defer o.Unlock()
  46. return o.used
  47. }
  48. // Add attaches a new destination to the Output. Any data subsequently written
  49. // to the output will be written to the new destination in addition to all the others.
  50. // This method is thread-safe.
  51. func (o *Output) Add(dst io.Writer) {
  52. o.Lock()
  53. defer o.Unlock()
  54. o.dests = append(o.dests, dst)
  55. }
  56. // Set closes and remove existing destination and then attaches a new destination to
  57. // the Output. Any data subsequently written to the output will be written to the new
  58. // destination in addition to all the others. This method is thread-safe.
  59. func (o *Output) Set(dst io.Writer) {
  60. o.Close()
  61. o.Lock()
  62. defer o.Unlock()
  63. o.dests = []io.Writer{dst}
  64. }
  65. // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
  66. // and returns its reading end for consumption by the caller.
  67. // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
  68. // This method is thread-safe.
  69. func (o *Output) AddPipe() (io.Reader, error) {
  70. r, w := io.Pipe()
  71. o.Add(w)
  72. return r, nil
  73. }
  74. // Write writes the same data to all registered destinations.
  75. // This method is thread-safe.
  76. func (o *Output) Write(p []byte) (n int, err error) {
  77. o.Lock()
  78. defer o.Unlock()
  79. o.used = true
  80. var firstErr error
  81. for _, dst := range o.dests {
  82. _, err := dst.Write(p)
  83. if err != nil && firstErr == nil {
  84. firstErr = err
  85. }
  86. }
  87. return len(p), firstErr
  88. }
  89. // Close unregisters all destinations and waits for all background
  90. // AddTail and AddString tasks to complete.
  91. // The Close method of each destination is called if it exists.
  92. func (o *Output) Close() error {
  93. o.Lock()
  94. defer o.Unlock()
  95. var firstErr error
  96. for _, dst := range o.dests {
  97. if closer, ok := dst.(io.Closer); ok {
  98. err := closer.Close()
  99. if err != nil && firstErr == nil {
  100. firstErr = err
  101. }
  102. }
  103. }
  104. o.tasks.Wait()
  105. o.dests = nil
  106. return firstErr
  107. }
  108. type Input struct {
  109. src io.Reader
  110. sync.Mutex
  111. }
  112. // NewInput returns a new Input object with no source attached.
  113. // Reading to an empty Input will return io.EOF.
  114. func NewInput() *Input {
  115. return &Input{}
  116. }
  117. // Read reads from the input in a thread-safe way.
  118. func (i *Input) Read(p []byte) (n int, err error) {
  119. i.Mutex.Lock()
  120. defer i.Mutex.Unlock()
  121. if i.src == nil {
  122. return 0, io.EOF
  123. }
  124. return i.src.Read(p)
  125. }
  126. // Closes the src
  127. // Not thread safe on purpose
  128. func (i *Input) Close() error {
  129. if i.src != nil {
  130. if closer, ok := i.src.(io.Closer); ok {
  131. return closer.Close()
  132. }
  133. }
  134. return nil
  135. }
  136. // Add attaches a new source to the input.
  137. // Add can only be called once per input. Subsequent calls will
  138. // return an error.
  139. func (i *Input) Add(src io.Reader) error {
  140. i.Mutex.Lock()
  141. defer i.Mutex.Unlock()
  142. if i.src != nil {
  143. return fmt.Errorf("Maximum number of sources reached: 1")
  144. }
  145. i.src = src
  146. return nil
  147. }
  148. // AddEnv starts a new goroutine which will decode all subsequent data
  149. // as a stream of json-encoded objects, and point `dst` to the last
  150. // decoded object.
  151. // The result `env` can be queried using the type-neutral Env interface.
  152. // It is not safe to query `env` until the Output is closed.
  153. func (o *Output) AddEnv() (dst *Env, err error) {
  154. src, err := o.AddPipe()
  155. if err != nil {
  156. return nil, err
  157. }
  158. dst = &Env{}
  159. o.tasks.Add(1)
  160. go func() {
  161. defer o.tasks.Done()
  162. decoder := NewDecoder(src)
  163. for {
  164. env, err := decoder.Decode()
  165. if err != nil {
  166. return
  167. }
  168. *dst = *env
  169. }
  170. }()
  171. return dst, nil
  172. }