123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- package engine
- import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "sync"
- )
- type Output struct {
- sync.Mutex
- dests []io.Writer
- tasks sync.WaitGroup
- used bool
- }
- // Tail returns the n last lines of a buffer
- // stripped out of the last \n, if any
- // if n <= 0, returns an empty string
- func Tail(buffer *bytes.Buffer, n int) string {
- if n <= 0 {
- return ""
- }
- bytes := buffer.Bytes()
- if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' {
- bytes = bytes[:len(bytes)-1]
- }
- for i := buffer.Len() - 2; i >= 0; i-- {
- if bytes[i] == '\n' {
- n--
- if n == 0 {
- return string(bytes[i+1:])
- }
- }
- }
- return string(bytes)
- }
- // NewOutput returns a new Output object with no destinations attached.
- // Writing to an empty Output will cause the written data to be discarded.
- func NewOutput() *Output {
- return &Output{}
- }
- // Return true if something was written on this output
- func (o *Output) Used() bool {
- o.Lock()
- defer o.Unlock()
- return o.used
- }
- // Add attaches a new destination to the Output. Any data subsequently written
- // to the output will be written to the new destination in addition to all the others.
- // This method is thread-safe.
- func (o *Output) Add(dst io.Writer) {
- o.Lock()
- defer o.Unlock()
- o.dests = append(o.dests, dst)
- }
- // Set closes and remove existing destination and then attaches a new destination to
- // the Output. Any data subsequently written to the output will be written to the new
- // destination in addition to all the others. This method is thread-safe.
- func (o *Output) Set(dst io.Writer) {
- o.Close()
- o.Lock()
- defer o.Unlock()
- o.dests = []io.Writer{dst}
- }
- // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
- // and returns its reading end for consumption by the caller.
- // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
- // This method is thread-safe.
- func (o *Output) AddPipe() (io.Reader, error) {
- r, w := io.Pipe()
- o.Add(w)
- return r, nil
- }
- // Write writes the same data to all registered destinations.
- // This method is thread-safe.
- func (o *Output) Write(p []byte) (n int, err error) {
- o.Lock()
- defer o.Unlock()
- o.used = true
- var firstErr error
- for _, dst := range o.dests {
- _, err := dst.Write(p)
- if err != nil && firstErr == nil {
- firstErr = err
- }
- }
- return len(p), firstErr
- }
- // Close unregisters all destinations and waits for all background
- // AddTail and AddString tasks to complete.
- // The Close method of each destination is called if it exists.
- func (o *Output) Close() error {
- o.Lock()
- defer o.Unlock()
- var firstErr error
- for _, dst := range o.dests {
- if closer, ok := dst.(io.Closer); ok {
- err := closer.Close()
- if err != nil && firstErr == nil {
- firstErr = err
- }
- }
- }
- o.tasks.Wait()
- return firstErr
- }
- type Input struct {
- src io.Reader
- sync.Mutex
- }
- // NewInput returns a new Input object with no source attached.
- // Reading to an empty Input will return io.EOF.
- func NewInput() *Input {
- return &Input{}
- }
- // Read reads from the input in a thread-safe way.
- func (i *Input) Read(p []byte) (n int, err error) {
- i.Mutex.Lock()
- defer i.Mutex.Unlock()
- if i.src == nil {
- return 0, io.EOF
- }
- return i.src.Read(p)
- }
- // Closes the src
- // Not thread safe on purpose
- func (i *Input) Close() error {
- if i.src != nil {
- if closer, ok := i.src.(io.Closer); ok {
- return closer.Close()
- }
- }
- return nil
- }
- // Add attaches a new source to the input.
- // Add can only be called once per input. Subsequent calls will
- // return an error.
- func (i *Input) Add(src io.Reader) error {
- i.Mutex.Lock()
- defer i.Mutex.Unlock()
- if i.src != nil {
- return fmt.Errorf("Maximum number of sources reached: 1")
- }
- i.src = src
- return nil
- }
- // AddEnv starts a new goroutine which will decode all subsequent data
- // as a stream of json-encoded objects, and point `dst` to the last
- // decoded object.
- // The result `env` can be queried using the type-neutral Env interface.
- // It is not safe to query `env` until the Output is closed.
- func (o *Output) AddEnv() (dst *Env, err error) {
- src, err := o.AddPipe()
- if err != nil {
- return nil, err
- }
- dst = &Env{}
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- decoder := NewDecoder(src)
- for {
- env, err := decoder.Decode()
- if err != nil {
- return
- }
- *dst = *env
- }
- }()
- return dst, nil
- }
- func (o *Output) AddListTable() (dst *Table, err error) {
- src, err := o.AddPipe()
- if err != nil {
- return nil, err
- }
- dst = NewTable("", 0)
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- content, err := ioutil.ReadAll(src)
- if err != nil {
- return
- }
- if _, err := dst.ReadListFrom(content); err != nil {
- return
- }
- }()
- return dst, nil
- }
- func (o *Output) AddTable() (dst *Table, err error) {
- src, err := o.AddPipe()
- if err != nil {
- return nil, err
- }
- dst = NewTable("", 0)
- o.tasks.Add(1)
- go func() {
- defer o.tasks.Done()
- if _, err := dst.ReadFrom(src); err != nil {
- return
- }
- }()
- return dst, nil
- }
|