streams.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package engine
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "strings"
  8. "sync"
  9. "unicode"
  10. )
  11. type Output struct {
  12. sync.Mutex
  13. dests []io.Writer
  14. tasks sync.WaitGroup
  15. used bool
  16. }
  17. // Tail returns the n last lines of a buffer
  18. // stripped out of trailing white spaces, if any.
  19. //
  20. // if n <= 0, returns an empty string
  21. func Tail(buffer *bytes.Buffer, n int) string {
  22. if n <= 0 {
  23. return ""
  24. }
  25. s := strings.TrimRightFunc(buffer.String(), unicode.IsSpace)
  26. i := len(s) - 1
  27. for ; i >= 0 && n > 0; i-- {
  28. if s[i] == '\n' {
  29. n--
  30. if n == 0 {
  31. break
  32. }
  33. }
  34. }
  35. // when i == -1, return the whole string which is s[0:]
  36. return s[i+1:]
  37. }
  38. // NewOutput returns a new Output object with no destinations attached.
  39. // Writing to an empty Output will cause the written data to be discarded.
  40. func NewOutput() *Output {
  41. return &Output{}
  42. }
  43. // Return true if something was written on this output
  44. func (o *Output) Used() bool {
  45. o.Lock()
  46. defer o.Unlock()
  47. return o.used
  48. }
  49. // Add attaches a new destination to the Output. Any data subsequently written
  50. // to the output will be written to the new destination in addition to all the others.
  51. // This method is thread-safe.
  52. func (o *Output) Add(dst io.Writer) {
  53. o.Lock()
  54. defer o.Unlock()
  55. o.dests = append(o.dests, dst)
  56. }
  57. // Set closes and remove existing destination and then attaches a new destination to
  58. // the Output. Any data subsequently written to the output will be written to the new
  59. // destination in addition to all the others. This method is thread-safe.
  60. func (o *Output) Set(dst io.Writer) {
  61. o.Close()
  62. o.Lock()
  63. defer o.Unlock()
  64. o.dests = []io.Writer{dst}
  65. }
  66. // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
  67. // and returns its reading end for consumption by the caller.
  68. // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
  69. // This method is thread-safe.
  70. func (o *Output) AddPipe() (io.Reader, error) {
  71. r, w := io.Pipe()
  72. o.Add(w)
  73. return r, nil
  74. }
  75. // Write writes the same data to all registered destinations.
  76. // This method is thread-safe.
  77. func (o *Output) Write(p []byte) (n int, err error) {
  78. o.Lock()
  79. defer o.Unlock()
  80. o.used = true
  81. var firstErr error
  82. for _, dst := range o.dests {
  83. _, err := dst.Write(p)
  84. if err != nil && firstErr == nil {
  85. firstErr = err
  86. }
  87. }
  88. return len(p), firstErr
  89. }
  90. // Close unregisters all destinations and waits for all background
  91. // AddTail and AddString tasks to complete.
  92. // The Close method of each destination is called if it exists.
  93. func (o *Output) Close() error {
  94. o.Lock()
  95. defer o.Unlock()
  96. var firstErr error
  97. for _, dst := range o.dests {
  98. if closer, ok := dst.(io.Closer); ok {
  99. err := closer.Close()
  100. if err != nil && firstErr == nil {
  101. firstErr = err
  102. }
  103. }
  104. }
  105. o.tasks.Wait()
  106. o.dests = nil
  107. return firstErr
  108. }
  109. type Input struct {
  110. src io.Reader
  111. sync.Mutex
  112. }
  113. // NewInput returns a new Input object with no source attached.
  114. // Reading to an empty Input will return io.EOF.
  115. func NewInput() *Input {
  116. return &Input{}
  117. }
  118. // Read reads from the input in a thread-safe way.
  119. func (i *Input) Read(p []byte) (n int, err error) {
  120. i.Mutex.Lock()
  121. defer i.Mutex.Unlock()
  122. if i.src == nil {
  123. return 0, io.EOF
  124. }
  125. return i.src.Read(p)
  126. }
  127. // Closes the src
  128. // Not thread safe on purpose
  129. func (i *Input) Close() error {
  130. if i.src != nil {
  131. if closer, ok := i.src.(io.Closer); ok {
  132. return closer.Close()
  133. }
  134. }
  135. return nil
  136. }
  137. // Add attaches a new source to the input.
  138. // Add can only be called once per input. Subsequent calls will
  139. // return an error.
  140. func (i *Input) Add(src io.Reader) error {
  141. i.Mutex.Lock()
  142. defer i.Mutex.Unlock()
  143. if i.src != nil {
  144. return fmt.Errorf("Maximum number of sources reached: 1")
  145. }
  146. i.src = src
  147. return nil
  148. }
  149. // AddEnv starts a new goroutine which will decode all subsequent data
  150. // as a stream of json-encoded objects, and point `dst` to the last
  151. // decoded object.
  152. // The result `env` can be queried using the type-neutral Env interface.
  153. // It is not safe to query `env` until the Output is closed.
  154. func (o *Output) AddEnv() (dst *Env, err error) {
  155. src, err := o.AddPipe()
  156. if err != nil {
  157. return nil, err
  158. }
  159. dst = &Env{}
  160. o.tasks.Add(1)
  161. go func() {
  162. defer o.tasks.Done()
  163. decoder := NewDecoder(src)
  164. for {
  165. env, err := decoder.Decode()
  166. if err != nil {
  167. return
  168. }
  169. *dst = *env
  170. }
  171. }()
  172. return dst, nil
  173. }
  174. func (o *Output) AddListTable() (dst *Table, err error) {
  175. src, err := o.AddPipe()
  176. if err != nil {
  177. return nil, err
  178. }
  179. dst = NewTable("", 0)
  180. o.tasks.Add(1)
  181. go func() {
  182. defer o.tasks.Done()
  183. content, err := ioutil.ReadAll(src)
  184. if err != nil {
  185. return
  186. }
  187. if _, err := dst.ReadListFrom(content); err != nil {
  188. return
  189. }
  190. }()
  191. return dst, nil
  192. }
  193. func (o *Output) AddTable() (dst *Table, err error) {
  194. src, err := o.AddPipe()
  195. if err != nil {
  196. return nil, err
  197. }
  198. dst = NewTable("", 0)
  199. o.tasks.Add(1)
  200. go func() {
  201. defer o.tasks.Done()
  202. if _, err := dst.ReadFrom(src); err != nil {
  203. return
  204. }
  205. }()
  206. return dst, nil
  207. }