diff --git a/hack/vendor.sh b/hack/vendor.sh index 30cbbe7e3d..c47b1f17c0 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -144,6 +144,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 # containerd clone git github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7 +clone git github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d # cluster clone git github.com/docker/swarmkit 3b221eb0391d34ae0b9dac65df02b5b64de6dff2 diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index 39d6296dac..a315e9a577 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -13,6 +13,7 @@ import ( "github.com/Sirupsen/logrus" containerd "github.com/docker/containerd/api/grpc/types" "github.com/docker/docker/pkg/idtools" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/mount" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" @@ -100,12 +101,26 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly return -1, err } + var stdinOnce sync.Once + stdin := iopipe.Stdin + iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + if err2 := p.sendCloseStdin(); err == nil { + err = err2 + } + }) + return err + }) + container.processes[processFriendlyName] = p clnt.unlock(containerID) if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { clnt.lock(containerID) + p.closeFifos(iopipe) return -1, err } clnt.lock(containerID) @@ -420,8 +435,18 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev if err != nil { return err } + var stdinOnce sync.Once + stdin := iopipe.Stdin + iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + }) + return err + }) if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + container.closeFifos(iopipe) return err } @@ -434,6 +459,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev }}) if err != nil { + container.closeFifos(iopipe) return err } diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 979806e599..9e3e1043a0 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -322,10 +323,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly // Convert io.ReadClosers to io.Readers if stdout != nil { - iopipe.Stdout = openReaderFromPipe(stdout) + iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) } if stderr != nil { - iopipe.Stderr = openReaderFromPipe(stderr) + iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) } proc := &process{ diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index 67db34d7a4..16ca72f297 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -6,12 +6,15 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "syscall" + "time" "github.com/Sirupsen/logrus" containerd "github.com/docker/containerd/api/grpc/types" "github.com/docker/docker/pkg/ioutils" "github.com/opencontainers/runtime-spec/specs-go" + "github.com/tonistiigi/fifo" "golang.org/x/net/context" ) @@ -90,22 +93,37 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error { if err != nil { return nil } - createChan := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ready := make(chan struct{}) + iopipe, err := ctr.openFifos(spec.Process.Terminal) if err != nil { return err } + var stdinOnce sync.Once + // we need to delay stdin closure after container start or else "stdin close" // event will be rejected by containerd. // stdin closure happens in AttachStreams stdin := iopipe.Stdin iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { - go func() { - <-createChan - stdin.Close() - }() - return nil + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + go func() { + select { + case <-ready: + if err := ctr.sendCloseStdin(); err != nil { + logrus.Warnf("failed to close stdin: %+v") + } + case <-ctx.Done(): + } + }() + }) + return err }) r := &containerd.CreateContainerRequest{ @@ -124,19 +142,17 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error { ctr.client.appendContainer(ctr) if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { - close(createChan) ctr.closeFifos(iopipe) return err } resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r) if err != nil { - close(createChan) ctr.closeFifos(iopipe) return err } ctr.systemPid = systemPid(resp.Container) - close(createChan) + close(ready) return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{ CommonStateInfo: CommonStateInfo{ @@ -207,15 +223,15 @@ func (ctr *container) handleEvent(e *containerd.Event) error { // discardFifos attempts to fully read the container fifos to unblock processes // that may be blocked on the writer side. func (ctr *container) discardFifos() { + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) for _, i := range []int{syscall.Stdout, syscall.Stderr} { - f := ctr.fifo(i) - c := make(chan struct{}) + f, err := fifo.OpenFifo(ctx, ctr.fifo(i), syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + logrus.Warnf("error opening fifo %v for discarding: %+v", f, err) + continue + } go func() { - r := openReaderFromFifo(f) - close(c) // this channel is used to not close the writer too early, before readonly open has been called. - io.Copy(ioutil.Discard, r) + io.Copy(ioutil.Discard, f) }() - <-c - closeReaderFifo(f) // avoid blocking permanently on open if there is no writer side } } diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index cf058f689d..5f7ddb4939 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -3,6 +3,7 @@ package libcontainerd import ( "fmt" "io" + "io/ioutil" "strings" "syscall" "time" @@ -131,10 +132,10 @@ func (ctr *container) start() error { // Convert io.ReadClosers to io.Readers if stdout != nil { - iopipe.Stdout = openReaderFromPipe(stdout) + iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) } if stderr != nil { - iopipe.Stderr = openReaderFromPipe(stderr) + iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) } // Save the PID diff --git a/libcontainerd/process_linux.go b/libcontainerd/process_linux.go index 3c48576fe2..135a11e754 100644 --- a/libcontainerd/process_linux.go +++ b/libcontainerd/process_linux.go @@ -1,14 +1,15 @@ package libcontainerd import ( - "fmt" "io" + "io/ioutil" "os" "path/filepath" "syscall" + "time" containerd "github.com/docker/containerd/api/grpc/types" - "github.com/docker/docker/pkg/ioutils" + "github.com/tonistiigi/fifo" "golang.org/x/net/context" ) @@ -26,49 +27,67 @@ type process struct { dir string } -func (p *process) openFifos(terminal bool) (*IOPipe, error) { - bundleDir := p.dir - if err := os.MkdirAll(bundleDir, 0700); err != nil { +func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) { + if err := os.MkdirAll(p.dir, 0700); err != nil { return nil, err } - for i := 0; i < 3; i++ { - f := p.fifo(i) - if err := syscall.Mkfifo(f, 0700); err != nil && !os.IsExist(err) { - return nil, fmt.Errorf("mkfifo: %s %v", f, err) - } - } + ctx, _ := context.WithTimeout(context.Background(), 15*time.Second) io := &IOPipe{} - stdinf, err := os.OpenFile(p.fifo(syscall.Stdin), syscall.O_RDWR, 0) + + io.Stdin, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) if err != nil { return nil, err } - io.Stdout = openReaderFromFifo(p.fifo(syscall.Stdout)) - if !terminal { - io.Stderr = openReaderFromFifo(p.fifo(syscall.Stderr)) - } else { - io.Stderr = emptyReader{} + defer func() { + if err != nil { + io.Stdin.Close() + } + }() + + io.Stdout, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdout), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err } - io.Stdin = ioutils.NewWriteCloserWrapper(stdinf, func() error { - stdinf.Close() - _, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ - Id: p.containerID, - Pid: p.friendlyName, - CloseStdin: true, - }) - return err - }) + defer func() { + if err != nil { + io.Stdout.Close() + } + }() + + if !terminal { + io.Stderr, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stderr), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + io.Stderr.Close() + } + }() + } else { + io.Stderr = ioutil.NopCloser(emptyReader{}) + } return io, nil } +func (p *process) sendCloseStdin() error { + _, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ + Id: p.containerID, + Pid: p.friendlyName, + CloseStdin: true, + }) + return err +} + func (p *process) closeFifos(io *IOPipe) { io.Stdin.Close() - closeReaderFifo(p.fifo(syscall.Stdout)) - closeReaderFifo(p.fifo(syscall.Stderr)) + io.Stdout.Close() + io.Stderr.Close() } type emptyReader struct{} @@ -77,34 +96,6 @@ func (r emptyReader) Read(b []byte) (int, error) { return 0, io.EOF } -func openReaderFromFifo(fn string) io.Reader { - r, w := io.Pipe() - c := make(chan struct{}) - go func() { - close(c) - stdoutf, err := os.OpenFile(fn, syscall.O_RDONLY, 0) - if err != nil { - r.CloseWithError(err) - } - if _, err := io.Copy(w, stdoutf); err != nil { - r.CloseWithError(err) - } - w.Close() - stdoutf.Close() - }() - <-c // wait for the goroutine to get scheduled and syscall to block - return r -} - -// closeReaderFifo closes fifo that may be blocked on open by opening the write side. -func closeReaderFifo(fn string) { - f, err := os.OpenFile(fn, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return - } - f.Close() -} - func (p *process) fifo(index int) string { return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index]) } diff --git a/libcontainerd/process_windows.go b/libcontainerd/process_windows.go index c18dab3ff7..e435b3d22a 100644 --- a/libcontainerd/process_windows.go +++ b/libcontainerd/process_windows.go @@ -2,6 +2,7 @@ package libcontainerd import ( "io" + "sync" "github.com/Microsoft/hcsshim" "github.com/docker/docker/pkg/ioutils" @@ -18,16 +19,17 @@ type process struct { hcsProcess hcsshim.Process } -func openReaderFromPipe(p io.ReadCloser) io.Reader { - r, w := io.Pipe() - go func() { - if _, err := io.Copy(w, p); err != nil { - r.CloseWithError(err) - } - w.Close() - p.Close() - }() - return r +type autoClosingReader struct { + io.ReadCloser + sync.Once +} + +func (r *autoClosingReader) Read(b []byte) (n int, err error) { + n, err = r.ReadCloser.Read(b) + if err == io.EOF { + r.Once.Do(func() { r.ReadCloser.Close() }) + } + return } func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser { diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 0efc4154f5..f808a08151 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -61,7 +61,7 @@ type CreateOption interface { // IOPipe contains the stdio streams. type IOPipe struct { Stdin io.WriteCloser - Stdout io.Reader - Stderr io.Reader + Stdout io.ReadCloser + Stderr io.ReadCloser Terminal bool // Whether stderr is connected on Windows } diff --git a/vendor/src/github.com/tonistiigi/fifo/LICENSE b/vendor/src/github.com/tonistiigi/fifo/LICENSE new file mode 100644 index 0000000000..8d318c1c0e --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/LICENSE @@ -0,0 +1,21 @@ +MIT + +Copyright (C) 2016 Tõnis Tiigi + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/vendor/src/github.com/tonistiigi/fifo/Makefile b/vendor/src/github.com/tonistiigi/fifo/Makefile new file mode 100644 index 0000000000..c1c1cd58ce --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/Makefile @@ -0,0 +1,13 @@ +.PHONY: fmt vet test deps + +test: deps + go test -v ./... + +deps: + go get -d -t ./... + +fmt: + gofmt -s -l . + +vet: + go vet ./... diff --git a/vendor/src/github.com/tonistiigi/fifo/fifo.go b/vendor/src/github.com/tonistiigi/fifo/fifo.go new file mode 100644 index 0000000000..a173e814cf --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/fifo.go @@ -0,0 +1,215 @@ +package fifo + +import ( + "context" + "io" + "os" + "runtime" + "sync" + "syscall" + + "github.com/pkg/errors" +) + +type fifo struct { + flag int + opened chan struct{} + closed chan struct{} + closing chan struct{} + err error + file *os.File + closingOnce sync.Once // close has been called + closedOnce sync.Once // fifo is closed + handle *handle +} + +var leakCheckWg *sync.WaitGroup + +// OpenFifo opens a fifo. Returns io.ReadWriteCloser. +// Context can be used to cancel this function until open(2) has not returned. +// Accepted flags: +// - syscall.O_CREAT - create new fifo if one doesn't exist +// - syscall.O_RDONLY - open fifo only from reader side +// - syscall.O_WRONLY - open fifo only from writer side +// - syscall.O_RDWR - open fifo from both sides, never block on syscall level +// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the +// fifo isn't open. read/write will be connected after the actual fifo is +// open or after fifo is closed. +func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + if _, err := os.Stat(fn); err != nil { + if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 { + if err := syscall.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) { + return nil, errors.Wrapf(err, "error creating fifo %v", fn) + } + } else { + return nil, err + } + } + + block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0 + + flag &= ^syscall.O_CREAT + flag &= ^syscall.O_NONBLOCK + + h, err := getHandle(fn) + if err != nil { + return nil, err + } + + f := &fifo{ + handle: h, + flag: flag, + opened: make(chan struct{}), + closed: make(chan struct{}), + closing: make(chan struct{}), + } + + wg := leakCheckWg + if wg != nil { + wg.Add(2) + } + + go func() { + if wg != nil { + defer wg.Done() + } + select { + case <-ctx.Done(): + f.Close() + case <-f.opened: + case <-f.closed: + } + }() + go func() { + if wg != nil { + defer wg.Done() + } + var file *os.File + fn, err := h.Path() + if err == nil { + file, err = os.OpenFile(fn, flag, 0) + } + select { + case <-f.closing: + if err == nil { + select { + case <-ctx.Done(): + err = ctx.Err() + default: + err = errors.Errorf("fifo %v was closed before opening", fn) + } + if file != nil { + file.Close() + } + } + default: + } + if err != nil { + f.closedOnce.Do(func() { + f.err = err + close(f.closed) + }) + return + } + f.file = file + close(f.opened) + }() + if block { + select { + case <-f.opened: + case <-f.closed: + return nil, f.err + } + } + return f, nil +} + +// Read from a fifo to a byte array. +func (f *fifo) Read(b []byte) (int, error) { + if f.flag&syscall.O_WRONLY > 0 { + return 0, errors.New("reading from write-only fifo") + } + select { + case <-f.opened: + return f.file.Read(b) + default: + } + select { + case <-f.opened: + return f.file.Read(b) + case <-f.closed: + return 0, errors.New("reading from a closed fifo") + } +} + +// Write from byte array to a fifo. +func (f *fifo) Write(b []byte) (int, error) { + if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 { + return 0, errors.New("writing to read-only fifo") + } + select { + case <-f.opened: + return f.file.Write(b) + default: + } + select { + case <-f.opened: + return f.file.Write(b) + case <-f.closed: + return 0, errors.New("writing to a closed fifo") + } +} + +// Close the fifo. Next reads/writes will error. This method can also be used +// before open(2) has returned and fifo was never opened. +func (f *fifo) Close() error { + for { + select { + case <-f.closed: + f.handle.Close() + return f.err + default: + select { + case <-f.opened: + f.closedOnce.Do(func() { + f.err = f.file.Close() + close(f.closed) + }) + default: + if f.flag&syscall.O_RDWR != 0 { + runtime.Gosched() + break + } + f.closingOnce.Do(func() { + close(f.closing) + }) + reverseMode := syscall.O_WRONLY + if f.flag&syscall.O_WRONLY > 0 { + reverseMode = syscall.O_RDONLY + } + fn, err := f.handle.Path() + // if Close() is called concurrently(shouldn't) it may cause error + // because handle is closed + select { + case <-f.closed: + default: + if err != nil { + // Path has become invalid. We will leak a goroutine. + // This case should not happen in linux. + f.closedOnce.Do(func() { + f.err = err + close(f.closed) + }) + <-f.closed + break + } + f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0) + if err == nil { + f.Close() + } + runtime.Gosched() + } + } + } + } +} diff --git a/vendor/src/github.com/tonistiigi/fifo/handle_linux.go b/vendor/src/github.com/tonistiigi/fifo/handle_linux.go new file mode 100644 index 0000000000..7a3a57c655 --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/handle_linux.go @@ -0,0 +1,70 @@ +// +build linux + +package fifo + +import ( + "fmt" + "os" + "sync" + "syscall" + + "github.com/pkg/errors" +) + +const O_PATH = 010000000 + +type handle struct { + f *os.File + dev uint64 + ino uint64 + closeOnce sync.Once +} + +func getHandle(fn string) (*handle, error) { + f, err := os.OpenFile(fn, O_PATH, 0) + if err != nil { + return nil, errors.Wrapf(err, "failed to open %v with O_PATH", fn) + } + + var stat syscall.Stat_t + if err := syscall.Fstat(int(f.Fd()), &stat); err != nil { + f.Close() + return nil, errors.Wrapf(err, "failed to stat handle %v", f.Fd()) + } + + h := &handle{ + f: f, + dev: stat.Dev, + ino: stat.Ino, + } + + // check /proc just in case + if _, err := os.Stat(h.procPath()); err != nil { + f.Close() + return nil, errors.Wrapf(err, "couldn't stat %v", h.procPath()) + } + + return h, nil +} + +func (h *handle) procPath() string { + return fmt.Sprintf("/proc/self/fd/%d", h.f.Fd()) +} + +func (h *handle) Path() (string, error) { + var stat syscall.Stat_t + if err := syscall.Stat(h.procPath(), &stat); err != nil { + return "", errors.Wrapf(err, "path %v could not be statted", h.procPath()) + } + if stat.Dev != h.dev || stat.Ino != h.ino { + return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino) + } + return h.procPath(), nil +} + +func (h *handle) Close() error { + h.closeOnce.Do(func() { + h.f.Close() + }) + return nil +} diff --git a/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go b/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go new file mode 100644 index 0000000000..8c20e0746a --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go @@ -0,0 +1,45 @@ +// +build !linux + +package fifo + +import ( + "syscall" + + "github.com/pkg/errors" +) + +type handle struct { + fn string + dev uint64 + ino uint64 +} + +func getHandle(fn string) (*handle, error) { + var stat syscall.Stat_t + if err := syscall.Stat(fn, &stat); err != nil { + return nil, errors.Wrapf(err, "failed to stat %v", fn) + } + + h := &handle{ + fn: fn, + dev: uint64(stat.Dev), + ino: stat.Ino, + } + + return h, nil +} + +func (h *handle) Path() (string, error) { + var stat syscall.Stat_t + if err := syscall.Stat(h.fn, &stat); err != nil { + return "", errors.Wrapf(err, "path %v could not be statted", h.fn) + } + if uint64(stat.Dev) != h.dev || stat.Ino != h.ino { + return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino) + } + return h.fn, nil +} + +func (h *handle) Close() error { + return nil +} diff --git a/vendor/src/github.com/tonistiigi/fifo/readme.md b/vendor/src/github.com/tonistiigi/fifo/readme.md new file mode 100644 index 0000000000..1c82669060 --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/readme.md @@ -0,0 +1,30 @@ +### fifo + +Go package for handling fifos in a sane way. + +``` +// OpenFifo opens a fifo. Returns io.ReadWriteCloser. +// Context can be used to cancel this function until open(2) has not returned. +// Accepted flags: +// - syscall.O_CREAT - create new fifo if one doesn't exist +// - syscall.O_RDONLY - open fifo only from reader side +// - syscall.O_WRONLY - open fifo only from writer side +// - syscall.O_RDWR - open fifo from both sides, never block on syscall level +// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the +// fifo isn't open. read/write will be connected after the actual fifo is +// open or after fifo is closed. +func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) + + +// Read from a fifo to a byte array. +func (f *fifo) Read(b []byte) (int, error) + + +// Write from byte array to a fifo. +func (f *fifo) Write(b []byte) (int, error) + + +// Close the fifo. Next reads/writes will error. This method can also be used +// before open(2) has returned and fifo was never opened. +func (f *fifo) Close() error +``` \ No newline at end of file