|
@@ -8,7 +8,7 @@ import (
|
|
|
"sync"
|
|
|
)
|
|
|
|
|
|
-// Config holds the io configurations.
|
|
|
+// Config holds the IO configurations.
|
|
|
type Config struct {
|
|
|
// Terminal is true if one has been allocated
|
|
|
Terminal bool
|
|
@@ -24,173 +24,157 @@ type Config struct {
|
|
|
type IO interface {
|
|
|
// Config returns the IO configuration.
|
|
|
Config() Config
|
|
|
- // Cancel aborts all current io operations
|
|
|
+ // Cancel aborts all current io operations.
|
|
|
Cancel()
|
|
|
- // Wait blocks until all io copy operations have completed
|
|
|
+ // Wait blocks until all io copy operations have completed.
|
|
|
Wait()
|
|
|
- // Close cleans up all open io resources
|
|
|
+ // Close cleans up all open io resources. Cancel() is always called before
|
|
|
+ // Close()
|
|
|
Close() error
|
|
|
}
|
|
|
|
|
|
-// cio is a basic container IO implementation.
|
|
|
-type cio struct {
|
|
|
- config Config
|
|
|
+// Creator creates new IO sets for a task
|
|
|
+type Creator func(id string) (IO, error)
|
|
|
|
|
|
- closer *wgCloser
|
|
|
-}
|
|
|
+// Attach allows callers to reattach to running tasks
|
|
|
+//
|
|
|
+// There should only be one reader for a task's IO set
|
|
|
+// because fifo's can only be read from one reader or the output
|
|
|
+// will be sent only to the first reads
|
|
|
+type Attach func(*FIFOSet) (IO, error)
|
|
|
|
|
|
-func (c *cio) Config() Config {
|
|
|
- return c.config
|
|
|
+// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
|
|
|
+type FIFOSet struct {
|
|
|
+ Config
|
|
|
+ close func() error
|
|
|
}
|
|
|
|
|
|
-func (c *cio) Cancel() {
|
|
|
- if c.closer == nil {
|
|
|
- return
|
|
|
+// Close the FIFOSet
|
|
|
+func (f *FIFOSet) Close() error {
|
|
|
+ if f.close != nil {
|
|
|
+ return f.close()
|
|
|
}
|
|
|
- c.closer.Cancel()
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
-func (c *cio) Wait() {
|
|
|
- if c.closer == nil {
|
|
|
- return
|
|
|
- }
|
|
|
- c.closer.Wait()
|
|
|
+// NewFIFOSet returns a new FIFOSet from a Config and a close function
|
|
|
+func NewFIFOSet(config Config, close func() error) *FIFOSet {
|
|
|
+ return &FIFOSet{Config: config, close: close}
|
|
|
}
|
|
|
|
|
|
-func (c *cio) Close() error {
|
|
|
- if c.closer == nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- return c.closer.Close()
|
|
|
+// Streams used to configure a Creator or Attach
|
|
|
+type Streams struct {
|
|
|
+ Stdin io.Reader
|
|
|
+ Stdout io.Writer
|
|
|
+ Stderr io.Writer
|
|
|
+ Terminal bool
|
|
|
}
|
|
|
|
|
|
-// Creation creates new IO sets for a task
|
|
|
-type Creation func(id string) (IO, error)
|
|
|
+// Opt customize options for creating a Creator or Attach
|
|
|
+type Opt func(*Streams)
|
|
|
|
|
|
-// Attach allows callers to reattach to running tasks
|
|
|
-//
|
|
|
-// There should only be one reader for a task's IO set
|
|
|
-// because fifo's can only be read from one reader or the output
|
|
|
-// will be sent only to the first reads
|
|
|
-type Attach func(*FIFOSet) (IO, error)
|
|
|
+// WithStdio sets stream options to the standard input/output streams
|
|
|
+func WithStdio(opt *Streams) {
|
|
|
+ WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
|
|
|
+}
|
|
|
|
|
|
-// NewIO returns an Creation that will provide IO sets without a terminal
|
|
|
-func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
|
|
|
- return NewIOWithTerminal(stdin, stdout, stderr, false)
|
|
|
+// WithTerminal sets the terminal option
|
|
|
+func WithTerminal(opt *Streams) {
|
|
|
+ opt.Terminal = true
|
|
|
}
|
|
|
|
|
|
-// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
|
|
|
-func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
|
|
|
- return func(id string) (_ IO, err error) {
|
|
|
- paths, err := NewFifos(id)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- defer func() {
|
|
|
- if err != nil && paths.Dir != "" {
|
|
|
- os.RemoveAll(paths.Dir)
|
|
|
- }
|
|
|
- }()
|
|
|
- cfg := Config{
|
|
|
- Terminal: terminal,
|
|
|
- Stdout: paths.Out,
|
|
|
- Stderr: paths.Err,
|
|
|
- Stdin: paths.In,
|
|
|
- }
|
|
|
- i := &cio{config: cfg}
|
|
|
- set := &ioSet{
|
|
|
- in: stdin,
|
|
|
- out: stdout,
|
|
|
- err: stderr,
|
|
|
- }
|
|
|
- closer, err := copyIO(paths, set, cfg.Terminal)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- i.closer = closer
|
|
|
- return i, nil
|
|
|
+// WithStreams sets the stream options to the specified Reader and Writers
|
|
|
+func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
|
|
|
+ return func(opt *Streams) {
|
|
|
+ opt.Stdin = stdin
|
|
|
+ opt.Stdout = stdout
|
|
|
+ opt.Stderr = stderr
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
|
|
|
-func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
|
|
|
- return func(paths *FIFOSet) (IO, error) {
|
|
|
- if paths == nil {
|
|
|
- return nil, fmt.Errorf("cannot attach to existing fifos")
|
|
|
- }
|
|
|
- cfg := Config{
|
|
|
- Terminal: paths.Terminal,
|
|
|
- Stdout: paths.Out,
|
|
|
- Stderr: paths.Err,
|
|
|
- Stdin: paths.In,
|
|
|
- }
|
|
|
- i := &cio{config: cfg}
|
|
|
- set := &ioSet{
|
|
|
- in: stdin,
|
|
|
- out: stdout,
|
|
|
- err: stderr,
|
|
|
- }
|
|
|
- closer, err := copyIO(paths, set, cfg.Terminal)
|
|
|
+// NewCreator returns an IO creator from the options
|
|
|
+func NewCreator(opts ...Opt) Creator {
|
|
|
+ streams := &Streams{}
|
|
|
+ for _, opt := range opts {
|
|
|
+ opt(streams)
|
|
|
+ }
|
|
|
+ return func(id string) (IO, error) {
|
|
|
+ // TODO: accept root as a param
|
|
|
+ root := "/run/containerd/fifo"
|
|
|
+ fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- i.closer = closer
|
|
|
- return i, nil
|
|
|
+ return copyIO(fifos, streams)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Stdio returns an IO set to be used for a task
|
|
|
-// that outputs the container's IO as the current processes Stdio
|
|
|
-func Stdio(id string) (IO, error) {
|
|
|
- return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
|
|
|
-}
|
|
|
-
|
|
|
-// StdioTerminal will setup the IO for the task to use a terminal
|
|
|
-func StdioTerminal(id string) (IO, error) {
|
|
|
- return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
|
|
|
+// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
|
|
|
+func NewAttach(opts ...Opt) Attach {
|
|
|
+ streams := &Streams{}
|
|
|
+ for _, opt := range opts {
|
|
|
+ opt(streams)
|
|
|
+ }
|
|
|
+ return func(fifos *FIFOSet) (IO, error) {
|
|
|
+ if fifos == nil {
|
|
|
+ return nil, fmt.Errorf("cannot attach, missing fifos")
|
|
|
+ }
|
|
|
+ return copyIO(fifos, streams)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// NullIO redirects the container's IO into /dev/null
|
|
|
-func NullIO(id string) (IO, error) {
|
|
|
+func NullIO(_ string) (IO, error) {
|
|
|
return &cio{}, nil
|
|
|
}
|
|
|
|
|
|
-// FIFOSet is a set of fifos for use with tasks
|
|
|
-type FIFOSet struct {
|
|
|
- // Dir is the directory holding the task fifos
|
|
|
- Dir string
|
|
|
- // In, Out, and Err fifo paths
|
|
|
- In, Out, Err string
|
|
|
- // Terminal returns true if a terminal is being used for the task
|
|
|
- Terminal bool
|
|
|
+// cio is a basic container IO implementation.
|
|
|
+type cio struct {
|
|
|
+ config Config
|
|
|
+ wg *sync.WaitGroup
|
|
|
+ closers []io.Closer
|
|
|
+ cancel context.CancelFunc
|
|
|
}
|
|
|
|
|
|
-type ioSet struct {
|
|
|
- in io.Reader
|
|
|
- out, err io.Writer
|
|
|
+func (c *cio) Config() Config {
|
|
|
+ return c.config
|
|
|
}
|
|
|
|
|
|
-type wgCloser struct {
|
|
|
- wg *sync.WaitGroup
|
|
|
- dir string
|
|
|
- set []io.Closer
|
|
|
- cancel context.CancelFunc
|
|
|
+func (c *cio) Wait() {
|
|
|
+ if c.wg != nil {
|
|
|
+ c.wg.Wait()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (g *wgCloser) Wait() {
|
|
|
- g.wg.Wait()
|
|
|
+func (c *cio) Close() error {
|
|
|
+ var lastErr error
|
|
|
+ for _, closer := range c.closers {
|
|
|
+ if closer == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if err := closer.Close(); err != nil {
|
|
|
+ lastErr = err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return lastErr
|
|
|
}
|
|
|
|
|
|
-func (g *wgCloser) Close() error {
|
|
|
- for _, f := range g.set {
|
|
|
- f.Close()
|
|
|
- }
|
|
|
- if g.dir != "" {
|
|
|
- return os.RemoveAll(g.dir)
|
|
|
+func (c *cio) Cancel() {
|
|
|
+ if c.cancel != nil {
|
|
|
+ c.cancel()
|
|
|
}
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
-func (g *wgCloser) Cancel() {
|
|
|
- g.cancel()
|
|
|
+type pipes struct {
|
|
|
+ Stdin io.WriteCloser
|
|
|
+ Stdout io.ReadCloser
|
|
|
+ Stderr io.ReadCloser
|
|
|
}
|
|
|
+
|
|
|
+// DirectIO allows task IO to be handled externally by the caller
|
|
|
+type DirectIO struct {
|
|
|
+ pipes
|
|
|
+ cio
|
|
|
+}
|
|
|
+
|
|
|
+var _ IO = &DirectIO{}
|