streams.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package engine
  2. import (
  3. "bufio"
  4. "container/ring"
  5. "fmt"
  6. "io"
  7. "sync"
  8. )
  9. type Output struct {
  10. sync.Mutex
  11. dests []io.Writer
  12. tasks sync.WaitGroup
  13. }
  14. // NewOutput returns a new Output object with no destinations attached.
  15. // Writing to an empty Output will cause the written data to be discarded.
  16. func NewOutput() *Output {
  17. return &Output{}
  18. }
  19. // Add attaches a new destination to the Output. Any data subsequently written
  20. // to the output will be written to the new destination in addition to all the others.
  21. // This method is thread-safe.
  22. // FIXME: Add cannot fail
  23. func (o *Output) Add(dst io.Writer) error {
  24. o.Mutex.Lock()
  25. defer o.Mutex.Unlock()
  26. o.dests = append(o.dests, dst)
  27. return nil
  28. }
  29. // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
  30. // and returns its reading end for consumption by the caller.
  31. // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
  32. // This method is thread-safe.
  33. func (o *Output) AddPipe() (io.Reader, error) {
  34. r, w := io.Pipe()
  35. o.Add(w)
  36. return r, nil
  37. }
  38. // AddTail starts a new goroutine which will read all subsequent data written to the output,
  39. // line by line, and append the last `n` lines to `dst`.
  40. func (o *Output) AddTail(dst *[]string, n int) error {
  41. src, err := o.AddPipe()
  42. if err != nil {
  43. return err
  44. }
  45. o.tasks.Add(1)
  46. go func() {
  47. defer o.tasks.Done()
  48. Tail(src, n, dst)
  49. }()
  50. return nil
  51. }
  52. // AddString starts a new goroutine which will read all subsequent data written to the output,
  53. // line by line, and store the last line into `dst`.
  54. func (o *Output) AddString(dst *string) error {
  55. src, err := o.AddPipe()
  56. if err != nil {
  57. return err
  58. }
  59. o.tasks.Add(1)
  60. go func() {
  61. defer o.tasks.Done()
  62. lines := make([]string, 0, 1)
  63. Tail(src, 1, &lines)
  64. if len(lines) == 0 {
  65. *dst = ""
  66. } else {
  67. *dst = lines[0]
  68. }
  69. }()
  70. return nil
  71. }
  72. // Write writes the same data to all registered destinations.
  73. // This method is thread-safe.
  74. func (o *Output) Write(p []byte) (n int, err error) {
  75. o.Mutex.Lock()
  76. defer o.Mutex.Unlock()
  77. var firstErr error
  78. for _, dst := range o.dests {
  79. _, err := dst.Write(p)
  80. if err != nil && firstErr == nil {
  81. firstErr = err
  82. }
  83. }
  84. return len(p), firstErr
  85. }
  86. // Close unregisters all destinations and waits for all background
  87. // AddTail and AddString tasks to complete.
  88. // The Close method of each destination is called if it exists.
  89. func (o *Output) Close() error {
  90. o.Mutex.Lock()
  91. defer o.Mutex.Unlock()
  92. var firstErr error
  93. for _, dst := range o.dests {
  94. if closer, ok := dst.(io.WriteCloser); ok {
  95. err := closer.Close()
  96. if err != nil && firstErr == nil {
  97. firstErr = err
  98. }
  99. }
  100. }
  101. o.tasks.Wait()
  102. return firstErr
  103. }
  104. type Input struct {
  105. src io.Reader
  106. sync.Mutex
  107. }
  108. // NewInput returns a new Input object with no source attached.
  109. // Reading to an empty Input will return io.EOF.
  110. func NewInput() *Input {
  111. return &Input{}
  112. }
  113. // Read reads from the input in a thread-safe way.
  114. func (i *Input) Read(p []byte) (n int, err error) {
  115. i.Mutex.Lock()
  116. defer i.Mutex.Unlock()
  117. if i.src == nil {
  118. return 0, io.EOF
  119. }
  120. return i.src.Read(p)
  121. }
  122. // Add attaches a new source to the input.
  123. // Add can only be called once per input. Subsequent calls will
  124. // return an error.
  125. func (i *Input) Add(src io.Reader) error {
  126. i.Mutex.Lock()
  127. defer i.Mutex.Unlock()
  128. if i.src != nil {
  129. return fmt.Errorf("Maximum number of sources reached: 1")
  130. }
  131. i.src = src
  132. return nil
  133. }
  134. // Tail reads from `src` line per line, and returns the last `n` lines as an array.
  135. // A ring buffer is used to only store `n` lines at any time.
  136. func Tail(src io.Reader, n int, dst *[]string) {
  137. scanner := bufio.NewScanner(src)
  138. r := ring.New(n)
  139. for scanner.Scan() {
  140. if n == 0 {
  141. continue
  142. }
  143. r.Value = scanner.Text()
  144. r = r.Next()
  145. }
  146. r.Do(func(v interface{}) {
  147. if v == nil {
  148. return
  149. }
  150. *dst = append(*dst, v.(string))
  151. })
  152. }
  153. // AddEnv starts a new goroutine which will decode all subsequent data
  154. // as a stream of json-encoded objects, and point `dst` to the last
  155. // decoded object.
  156. // The result `env` can be queried using the type-neutral Env interface.
  157. // It is not safe to query `env` until the Output is closed.
  158. func (o *Output) AddEnv() (dst *Env, err error) {
  159. src, err := o.AddPipe()
  160. if err != nil {
  161. return nil, err
  162. }
  163. dst = &Env{}
  164. o.tasks.Add(1)
  165. go func() {
  166. defer o.tasks.Done()
  167. decoder := NewDecoder(src)
  168. for {
  169. env, err := decoder.Decode()
  170. if err != nil {
  171. return
  172. }
  173. *dst = *env
  174. }
  175. }()
  176. return dst, nil
  177. }
  178. func (o *Output) AddTable() (dst *Table, err error) {
  179. src, err := o.AddPipe()
  180. if err != nil {
  181. return nil, err
  182. }
  183. dst = NewTable("", 0)
  184. o.tasks.Add(1)
  185. go func() {
  186. defer o.tasks.Done()
  187. if _, err := dst.ReadFrom(src); err != nil {
  188. return
  189. }
  190. }()
  191. return dst, nil
  192. }