123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- package engine
- import (
- "bufio"
- "container/ring"
- "fmt"
- "io"
- "sync"
- )
- type Output struct {
- sync.Mutex
- dests []io.Writer
- tasks sync.WaitGroup
- }
- // NewOutput returns a new Output object with no destinations attached.
- // Writing to an empty Output will cause the written data to be discarded.
- func NewOutput() *Output {
- return &Output{}
- }
- // Add attaches a new destination to the Output. Any data subsequently written
- // to the output will be written to the new destination in addition to all the others.
- // This method is thread-safe.
- // FIXME: Add cannot fail
- func (o *Output) Add(dst io.Writer) error {
- o.Mutex.Lock()
- defer o.Mutex.Unlock()
- o.dests = append(o.dests, dst)
- return nil
- }
- // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
- // and returns its reading end for consumption by the caller.
- // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
- // This method is thread-safe.
- func (o *Output) AddPipe() (io.Reader, error) {
- r, w := io.Pipe()
- o.Add(w)
- return r, nil
- }
- // AddTail starts a new goroutine which will read all subsequent data written to the output,
- // line by line, and append the last `n` lines to `dst`.
- func (o *Output) AddTail(dst *[]string, n int) error {
- src, err := o.AddPipe()
- if err != nil {
- return err
- }
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- Tail(src, n, dst)
- }()
- return nil
- }
- // AddString starts a new goroutine which will read all subsequent data written to the output,
- // line by line, and store the last line into `dst`.
- func (o *Output) AddString(dst *string) error {
- src, err := o.AddPipe()
- if err != nil {
- return err
- }
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- lines := make([]string, 0, 1)
- Tail(src, 1, &lines)
- if len(lines) == 0 {
- *dst = ""
- } else {
- *dst = lines[0]
- }
- }()
- return nil
- }
- // Write writes the same data to all registered destinations.
- // This method is thread-safe.
- func (o *Output) Write(p []byte) (n int, err error) {
- o.Mutex.Lock()
- defer o.Mutex.Unlock()
- var firstErr error
- for _, dst := range o.dests {
- _, err := dst.Write(p)
- if err != nil && firstErr == nil {
- firstErr = err
- }
- }
- return len(p), firstErr
- }
- // Close unregisters all destinations and waits for all background
- // AddTail and AddString tasks to complete.
- // The Close method of each destination is called if it exists.
- func (o *Output) Close() error {
- o.Mutex.Lock()
- defer o.Mutex.Unlock()
- var firstErr error
- for _, dst := range o.dests {
- if closer, ok := dst.(io.WriteCloser); ok {
- err := closer.Close()
- if err != nil && firstErr == nil {
- firstErr = err
- }
- }
- }
- o.tasks.Wait()
- return firstErr
- }
- type Input struct {
- src io.Reader
- sync.Mutex
- }
- // NewInput returns a new Input object with no source attached.
- // Reading to an empty Input will return io.EOF.
- func NewInput() *Input {
- return &Input{}
- }
- // Read reads from the input in a thread-safe way.
- func (i *Input) Read(p []byte) (n int, err error) {
- i.Mutex.Lock()
- defer i.Mutex.Unlock()
- if i.src == nil {
- return 0, io.EOF
- }
- return i.src.Read(p)
- }
- // Add attaches a new source to the input.
- // Add can only be called once per input. Subsequent calls will
- // return an error.
- func (i *Input) Add(src io.Reader) error {
- i.Mutex.Lock()
- defer i.Mutex.Unlock()
- if i.src != nil {
- return fmt.Errorf("Maximum number of sources reached: 1")
- }
- i.src = src
- return nil
- }
- // Tail reads from `src` line per line, and returns the last `n` lines as an array.
- // A ring buffer is used to only store `n` lines at any time.
- func Tail(src io.Reader, n int, dst *[]string) {
- scanner := bufio.NewScanner(src)
- r := ring.New(n)
- for scanner.Scan() {
- if n == 0 {
- continue
- }
- r.Value = scanner.Text()
- r = r.Next()
- }
- r.Do(func(v interface{}) {
- if v == nil {
- return
- }
- *dst = append(*dst, v.(string))
- })
- }
- // AddEnv starts a new goroutine which will decode all subsequent data
- // as a stream of json-encoded objects, and point `dst` to the last
- // decoded object.
- // The result `env` can be queried using the type-neutral Env interface.
- // It is not safe to query `env` until the Output is closed.
- func (o *Output) AddEnv() (dst *Env, err error) {
- src, err := o.AddPipe()
- if err != nil {
- return nil, err
- }
- dst = &Env{}
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- decoder := NewDecoder(src)
- for {
- env, err := decoder.Decode()
- if err != nil {
- return
- }
- *dst = *env
- }
- }()
- return dst, nil
- }
- func (o *Output) AddTable() (dst *Table, err error) {
- src, err := o.AddPipe()
- if err != nil {
- return nil, err
- }
- dst = NewTable("", 0)
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- if _, err := dst.ReadFrom(src); err != nil {
- return
- }
- }()
- return dst, nil
- }
|