streams.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package engine
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "sync"
  8. )
  9. type Output struct {
  10. sync.Mutex
  11. dests []io.Writer
  12. tasks sync.WaitGroup
  13. used bool
  14. }
  15. // Tail returns the n last lines of a buffer
  16. // stripped out of the last \n, if any
  17. // if n <= 0, returns an empty string
  18. func Tail(buffer *bytes.Buffer, n int) string {
  19. if n <= 0 {
  20. return ""
  21. }
  22. bytes := buffer.Bytes()
  23. if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' {
  24. bytes = bytes[:len(bytes)-1]
  25. }
  26. for i := buffer.Len() - 2; i >= 0; i-- {
  27. if bytes[i] == '\n' {
  28. n--
  29. if n == 0 {
  30. return string(bytes[i+1:])
  31. }
  32. }
  33. }
  34. return string(bytes)
  35. }
  36. // NewOutput returns a new Output object with no destinations attached.
  37. // Writing to an empty Output will cause the written data to be discarded.
  38. func NewOutput() *Output {
  39. return &Output{}
  40. }
  41. // Return true if something was written on this output
  42. func (o *Output) Used() bool {
  43. o.Lock()
  44. defer o.Unlock()
  45. return o.used
  46. }
  47. // Add attaches a new destination to the Output. Any data subsequently written
  48. // to the output will be written to the new destination in addition to all the others.
  49. // This method is thread-safe.
  50. func (o *Output) Add(dst io.Writer) {
  51. o.Lock()
  52. defer o.Unlock()
  53. o.dests = append(o.dests, dst)
  54. }
  55. // Set closes and remove existing destination and then attaches a new destination to
  56. // the Output. Any data subsequently written to the output will be written to the new
  57. // destination in addition to all the others. This method is thread-safe.
  58. func (o *Output) Set(dst io.Writer) {
  59. o.Close()
  60. o.Lock()
  61. defer o.Unlock()
  62. o.dests = []io.Writer{dst}
  63. }
  64. // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
  65. // and returns its reading end for consumption by the caller.
  66. // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
  67. // This method is thread-safe.
  68. func (o *Output) AddPipe() (io.Reader, error) {
  69. r, w := io.Pipe()
  70. o.Add(w)
  71. return r, nil
  72. }
  73. // Write writes the same data to all registered destinations.
  74. // This method is thread-safe.
  75. func (o *Output) Write(p []byte) (n int, err error) {
  76. o.Lock()
  77. defer o.Unlock()
  78. o.used = true
  79. var firstErr error
  80. for _, dst := range o.dests {
  81. _, err := dst.Write(p)
  82. if err != nil && firstErr == nil {
  83. firstErr = err
  84. }
  85. }
  86. return len(p), firstErr
  87. }
  88. // Close unregisters all destinations and waits for all background
  89. // AddTail and AddString tasks to complete.
  90. // The Close method of each destination is called if it exists.
  91. func (o *Output) Close() error {
  92. o.Lock()
  93. defer o.Unlock()
  94. var firstErr error
  95. for _, dst := range o.dests {
  96. if closer, ok := dst.(io.Closer); ok {
  97. err := closer.Close()
  98. if err != nil && firstErr == nil {
  99. firstErr = err
  100. }
  101. }
  102. }
  103. o.tasks.Wait()
  104. return firstErr
  105. }
  106. type Input struct {
  107. src io.Reader
  108. sync.Mutex
  109. }
  110. // NewInput returns a new Input object with no source attached.
  111. // Reading to an empty Input will return io.EOF.
  112. func NewInput() *Input {
  113. return &Input{}
  114. }
  115. // Read reads from the input in a thread-safe way.
  116. func (i *Input) Read(p []byte) (n int, err error) {
  117. i.Mutex.Lock()
  118. defer i.Mutex.Unlock()
  119. if i.src == nil {
  120. return 0, io.EOF
  121. }
  122. return i.src.Read(p)
  123. }
  124. // Closes the src
  125. // Not thread safe on purpose
  126. func (i *Input) Close() error {
  127. if i.src != nil {
  128. if closer, ok := i.src.(io.Closer); ok {
  129. return closer.Close()
  130. }
  131. }
  132. return nil
  133. }
  134. // Add attaches a new source to the input.
  135. // Add can only be called once per input. Subsequent calls will
  136. // return an error.
  137. func (i *Input) Add(src io.Reader) error {
  138. i.Mutex.Lock()
  139. defer i.Mutex.Unlock()
  140. if i.src != nil {
  141. return fmt.Errorf("Maximum number of sources reached: 1")
  142. }
  143. i.src = src
  144. return nil
  145. }
  146. // AddEnv starts a new goroutine which will decode all subsequent data
  147. // as a stream of json-encoded objects, and point `dst` to the last
  148. // decoded object.
  149. // The result `env` can be queried using the type-neutral Env interface.
  150. // It is not safe to query `env` until the Output is closed.
  151. func (o *Output) AddEnv() (dst *Env, err error) {
  152. src, err := o.AddPipe()
  153. if err != nil {
  154. return nil, err
  155. }
  156. dst = &Env{}
  157. o.tasks.Add(1)
  158. go func() {
  159. defer o.tasks.Done()
  160. decoder := NewDecoder(src)
  161. for {
  162. env, err := decoder.Decode()
  163. if err != nil {
  164. return
  165. }
  166. *dst = *env
  167. }
  168. }()
  169. return dst, nil
  170. }
  171. func (o *Output) AddListTable() (dst *Table, err error) {
  172. src, err := o.AddPipe()
  173. if err != nil {
  174. return nil, err
  175. }
  176. dst = NewTable("", 0)
  177. o.tasks.Add(1)
  178. go func() {
  179. defer o.tasks.Done()
  180. content, err := ioutil.ReadAll(src)
  181. if err != nil {
  182. return
  183. }
  184. if _, err := dst.ReadListFrom(content); err != nil {
  185. return
  186. }
  187. }()
  188. return dst, nil
  189. }
  190. func (o *Output) AddTable() (dst *Table, err error) {
  191. src, err := o.AddPipe()
  192. if err != nil {
  193. return nil, err
  194. }
  195. dst = NewTable("", 0)
  196. o.tasks.Add(1)
  197. go func() {
  198. defer o.tasks.Done()
  199. if _, err := dst.ReadFrom(src); err != nil {
  200. return
  201. }
  202. }()
  203. return dst, nil
  204. }