diff --git a/container/container.go b/container/container.go index d34226922a..9e0f2558bb 100644 --- a/container/container.go +++ b/container/container.go @@ -22,6 +22,7 @@ import ( "github.com/docker/docker/daemon/network" "github.com/docker/docker/image" "github.com/docker/docker/layer" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/promise" @@ -972,3 +973,46 @@ func (container *Container) CancelAttachContext() { } container.attachContext.mu.Unlock() } + +func (container *Container) startLogging() error { + if container.HostConfig.LogConfig.Type == "none" { + return nil // do not start logging routines + } + + l, err := container.StartLogger(container.HostConfig.LogConfig) + if err != nil { + return fmt.Errorf("Failed to initialize logging driver: %v", err) + } + + copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) + container.LogCopier = copier + copier.Run() + container.LogDriver = l + + // set LogPath field only for json-file logdriver + if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { + container.LogPath = jl.LogPath() + } + + return nil +} + +// InitializeStdio is called by libcontainerd to connect the stdio. +func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { + if err := container.startLogging(); err != nil { + container.Reset(false) + return err + } + + container.StreamConfig.CopyToPipe(iop) + + if container.Stdin() == nil && !container.Config.Tty { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Warnf("error closing stdin: %+v", err) + } + } + } + + return nil +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 8c8d6dafc2..7fa200530f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -175,7 +175,7 @@ func (daemon *Daemon) restore() error { defer wg.Done() rm := c.RestartManager(false) if c.IsRunning() || c.IsPaused() { - if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil { + if err := daemon.containerd.Restore(c.ID, c.InitializeStdio, libcontainerd.WithRestartManager(rm)); err != nil { logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err) return } diff --git a/daemon/exec.go b/daemon/exec.go index d57b6875d8..5584f3b5da 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) - if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil { + if err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio); err != nil { return err } diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index bbeb1c16a6..da160d3bb2 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -1,8 +1,11 @@ package exec import ( + "runtime" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/runconfig" ) @@ -37,6 +40,21 @@ func NewConfig() *Config { } } +// InitializeStdio is called by libcontainerd to connect the stdio. +func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { + c.StreamConfig.CopyToPipe(iop) + + if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Errorf("error closing exec stdin: %+v", err) + } + } + } + + return nil +} + // Store keeps track of the exec configurations. type Store struct { commands map[string]*Config diff --git a/daemon/logs.go b/daemon/logs.go index 1b285c691d..e171c1508e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -12,7 +12,6 @@ import ( "github.com/docker/docker/api/types/backend" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/stdcopy" containertypes "github.com/docker/engine-api/types/container" @@ -120,30 +119,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, return container.StartLogger(container.HostConfig.LogConfig) } -// StartLogging initializes and starts the container logging stream. -func (daemon *Daemon) StartLogging(container *container.Container) error { - if container.HostConfig.LogConfig.Type == "none" { - return nil // do not start logging routines - } - - l, err := container.StartLogger(container.HostConfig.LogConfig) - if err != nil { - return fmt.Errorf("Failed to initialize logging driver: %v", err) - } - - copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) - container.LogCopier = copier - copier.Run() - container.LogDriver = l - - // set LogPath field only for json-file logdriver - if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { - container.LogPath = jl.LogPath() - } - - return nil -} - // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified. func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error { if cfg.Type == "" { diff --git a/daemon/monitor.go b/daemon/monitor.go index 52d333f8a8..542d6b3b9e 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -3,13 +3,11 @@ package daemon import ( "errors" "fmt" - "io" "runtime" "strconv" "github.com/Sirupsen/logrus" "github.com/docker/docker/libcontainerd" - "github.com/docker/docker/runconfig" ) // StateChanged updates daemon state changes from containerd @@ -100,57 +98,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } - -// AttachStreams is called by libcontainerd to connect the stdio. -func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { - var s *runconfig.StreamConfig - c := daemon.containers.Get(id) - if c == nil { - ec, err := daemon.getExecConfig(id) - if err != nil { - return fmt.Errorf("no such exec/container: %s", id) - } - s = ec.StreamConfig - } else { - s = c.StreamConfig - if err := daemon.StartLogging(c); err != nil { - c.Reset(false) - return err - } - } - - copyFunc := func(w io.Writer, r io.Reader) { - s.Add(1) - go func() { - if _, err := io.Copy(w, r); err != nil { - logrus.Errorf("%v stream copy error: %v", id, err) - } - s.Done() - }() - } - - if iop.Stdout != nil { - copyFunc(s.Stdout(), iop.Stdout) - } - if iop.Stderr != nil { - copyFunc(s.Stderr(), iop.Stderr) - } - - if stdin := s.Stdin(); stdin != nil { - if iop.Stdin != nil { - go func() { - io.Copy(iop.Stdin, stdin) - iop.Stdin.Close() - }() - } - } else { - if c != nil && !c.Config.Tty { - // tty is enabled, so dont close containerd's iopipe stdin. - if iop.Stdin != nil { - iop.Stdin.Close() - } - } - } - - return nil -} diff --git a/daemon/monitor_windows.go b/daemon/monitor_windows.go index b500ee60b9..9748de9201 100644 --- a/daemon/monitor_windows.go +++ b/daemon/monitor_windows.go @@ -28,7 +28,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon // Create a new servicing container, which will start, complete the update, and merge back the // results if it succeeded, all as part of the below function call. - if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, servicingOption); err != nil { + if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, container.InitializeStdio, servicingOption); err != nil { container.SetExitCode(-1) return fmt.Errorf("Post-run update servicing failed: %s", err) } diff --git a/daemon/start.go b/daemon/start.go index 9af41f8915..470cb80a1c 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -141,7 +141,7 @@ func (daemon *Daemon) containerStart(container *container.Container) (err error) createOptions = append(createOptions, *copts...) } - if err := daemon.containerd.Create(container.ID, *spec, createOptions...); err != nil { + if err := daemon.containerd.Create(container.ID, *spec, container.InitializeStdio, createOptions...); err != nil { errDesc := grpc.ErrorDesc(err) logrus.Errorf("Create container failed with error: %s", errDesc) // if we receive an internal error from the initial start of a container then lets diff --git a/hack/vendor.sh b/hack/vendor.sh index 4679584957..23861046ba 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -137,6 +137,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 # containerd clone git github.com/docker/containerd 0366d7e9693c930cf18c0f50cc16acec064e96c5 +clone git github.com/tonistiigi/fifo fe870ccf293940774c2b44e23f6c71fff8f7547d # cluster clone git github.com/docker/swarmkit 0cf248feec033f46dc09db40d69fd5128082b79a diff --git a/integration-cli/docker_cli_run_test.go b/integration-cli/docker_cli_run_test.go index 21f1faddbb..3a16abff11 100644 --- a/integration-cli/docker_cli_run_test.go +++ b/integration-cli/docker_cli_run_test.go @@ -4455,3 +4455,15 @@ func (s *DockerSuite) TestRunAddHostInHostMode(c *check.C) { out, _ := dockerCmd(c, "run", "--add-host=extra:1.2.3.4", "--net=host", "busybox", "cat", "/etc/hosts") c.Assert(out, checker.Contains, expectedOutput, check.Commentf("Expected '%s', but got %q", expectedOutput, out)) } + +func (s *DockerSuite) TestRunStoppedLoggingDriverNoLeak(c *check.C) { + nroutines, err := getGoroutineNumber() + c.Assert(err, checker.IsNil) + + out, _, err := dockerCmdWithError("run", "--name=fail", "--log-driver=splunk", "busybox", "true") + c.Assert(err, checker.NotNil) + c.Assert(out, checker.Contains, "Failed to initialize logging driver", check.Commentf("error should be about logging driver, got output %s", out)) + + // NGoroutines is not updated right away, so we need to wait before failing + c.Assert(waitForGoroutines(nroutines), checker.IsNil) +} diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index ef504cad59..39b0999d3f 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -13,6 +13,7 @@ import ( "github.com/Sirupsen/logrus" containerd "github.com/docker/containerd/api/grpc/types" "github.com/docker/docker/pkg/idtools" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/mount" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" @@ -30,7 +31,10 @@ type client struct { liveRestore bool } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +// AddProcess is the handler for adding a process to an already running +// container. It's called through docker exec. It returns the system pid of the +// exec'd process. +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -96,15 +100,25 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly return err } + var stdinOnce sync.Once + stdin := iopipe.Stdin + iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + if err2 := p.sendCloseStdin(); err == nil { + err = err2 + } + }) + return err + }) + container.processes[processFriendlyName] = p - clnt.unlock(containerID) - - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { + p.closeFifos(iopipe) return err } - clnt.lock(containerID) return nil } @@ -134,7 +148,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) { return p, nil } -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(containerID) defer clnt.unlock(containerID) @@ -160,6 +174,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio if err := container.clean(); err != nil { return err } + container.attachStdio = attachStdio // hack for v1.12 backport defer func() { if err != nil { @@ -181,7 +196,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio return err } - return container.start() + return container.start(attachStdio) } func (clnt *client) Signal(containerID string, sig int) error { @@ -390,7 +405,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { return w } -func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) { +func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(cont.Id) defer clnt.unlock(cont.Id) @@ -421,8 +436,18 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev if err != nil { return err } + var stdinOnce sync.Once + stdin := iopipe.Stdin + iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + }) + return err + }) - if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { + container.closeFifos(iopipe) return err } @@ -435,6 +460,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev }}) if err != nil { + container.closeFifos(iopipe) return err } @@ -512,7 +538,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) return ev, err } -func (clnt *client) Restore(containerID string, options ...CreateOption) error { +func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error { // Synchronize with live events clnt.remote.Lock() defer clnt.remote.Unlock() @@ -560,7 +586,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error { // container is still alive if clnt.liveRestore { - if err := clnt.restore(cont, ev, options...); err != nil { + if err := clnt.restore(cont, ev, attachStdio, options...); err != nil { logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err) } return nil diff --git a/libcontainerd/client_solaris.go b/libcontainerd/client_solaris.go index 1c14d301b5..78087ebc54 100644 --- a/libcontainerd/client_solaris.go +++ b/libcontainerd/client_solaris.go @@ -8,11 +8,11 @@ type client struct { // Platform specific properties below here. } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error { return nil } -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) { return nil } @@ -37,7 +37,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) { } // Restore is the handler for restoring a container -func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error { +func (clnt *client) Restore(containerID string, attachStdio StdioCallback, unusedOnWindows ...CreateOption) error { return nil } diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 431574a4d3..562dacf5b1 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "path/filepath" "strings" "syscall" @@ -37,7 +38,7 @@ const defaultOwner = "docker" // Create is the entrypoint to create a container from a spec, and if successfully // created, start it too. -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) error { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error { logrus.Debugln("libcontainerd: client.Create() with spec", spec) configuration := &hcsshim.ContainerConfig{ @@ -142,7 +143,8 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio }, commandLine: strings.Join(spec.Process.Args, " "), }, - processes: make(map[string]*process), + processes: make(map[string]*process), + attachStdio: attachStdio, }, ociSpec: spec, hcsContainer: hcsContainer, @@ -159,7 +161,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // internal structure, start will keep HCS in sync by deleting the // container there. logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID) - if err := container.start(); err != nil { + if err := container.start(attachStdio); err != nil { clnt.deleteContainer(containerID) return err } @@ -171,7 +173,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -228,10 +230,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly // Convert io.ReadClosers to io.Readers if stdout != nil { - iopipe.Stdout = openReaderFromPipe(stdout) + iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) } if stderr != nil { - iopipe.Stderr = openReaderFromPipe(stderr) + iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) } pid := newProcess.Pid() @@ -250,18 +252,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly // Add the process to the container's list of processes container.processes[processFriendlyName] = proc - // Make sure the lock is not held while calling back into the daemon - clnt.unlock(containerID) - // Tell the engine to attach streams back to the client - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { return err } - // Lock again so that the defer unlock doesn't fail. (I really don't like this code) - clnt.lock(containerID) - // Spin up a go routine waiting for exit to handle cleanup go container.waitExit(proc, false) @@ -370,7 +365,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) { } // Restore is the handler for restoring a container -func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error { +func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error { // TODO Windows: Implement this. For now, just tell the backend the container exited. logrus.Debugf("libcontainerd: Restore(%s)", containerID) return clnt.backend.StateChanged(containerID, StateInfo{ diff --git a/libcontainerd/container.go b/libcontainerd/container.go index 30bc95028c..f5d29f42cf 100644 --- a/libcontainerd/container.go +++ b/libcontainerd/container.go @@ -20,6 +20,7 @@ type containerCommon struct { restarting bool processes map[string]*process startedAt time.Time + attachStdio StdioCallback // hack for v1.12 backport } // WithRestartManager sets the restartmanager to be used with the container. diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index 454478b5c2..3c587139ea 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -6,13 +6,16 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "syscall" "time" "github.com/Sirupsen/logrus" containerd "github.com/docker/containerd/api/grpc/types" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/restartmanager" - "github.com/opencontainers/specs/specs-go" + specs "github.com/opencontainers/specs/specs-go" + "github.com/tonistiigi/fifo" "golang.org/x/net/context" ) @@ -66,7 +69,7 @@ func (ctr *container) clean() error { func (ctr *container) cleanProcess(id string) { if p, ok := ctr.processes[id]; ok { for _, i := range []int{syscall.Stdin, syscall.Stdout, syscall.Stderr} { - if err := os.Remove(p.fifo(i)); err != nil { + if err := os.Remove(p.fifo(i)); err != nil && !os.IsNotExist(err) { logrus.Warnf("libcontainerd: failed to remove %v for process %v: %v", p.fifo(i), id, err) } } @@ -86,16 +89,44 @@ func (ctr *container) spec() (*specs.Spec, error) { return &spec, nil } -func (ctr *container) start() error { +func (ctr *container) start(attachStdio StdioCallback) error { spec, err := ctr.spec() if err != nil { return nil } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ready := make(chan struct{}) + iopipe, err := ctr.openFifos(spec.Process.Terminal) if err != nil { return err } + var stdinOnce sync.Once + + // we need to delay stdin closure after container start or else "stdin close" + // event will be rejected by containerd. + // stdin closure happens in attachStdio + stdin := iopipe.Stdin + iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { + var err error + stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed + err = stdin.Close() + go func() { + select { + case <-ready: + if err := ctr.sendCloseStdin(); err != nil { + logrus.Warnf("failed to close stdin: %+v", err) + } + case <-ctx.Done(): + } + }() + }) + return err + }) + r := &containerd.CreateContainerRequest{ Id: ctr.containerID, BundlePath: ctr.dir, @@ -109,17 +140,19 @@ func (ctr *container) start() error { } ctr.client.appendContainer(ctr) + if err := attachStdio(*iopipe); err != nil { + ctr.closeFifos(iopipe) + return err + } + resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r) if err != nil { ctr.closeFifos(iopipe) return err } ctr.startedAt = time.Now() - - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { - return err - } ctr.systemPid = systemPid(resp.Container) + close(ready) return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{ CommonStateInfo: CommonStateInfo{ @@ -191,7 +224,7 @@ func (ctr *container) handleEvent(e *containerd.Event) error { defer ctr.client.unlock(ctr.containerID) ctr.restarting = false if err == nil { - if err = ctr.start(); err != nil { + if err = ctr.start(ctr.attachStdio); err != nil { logrus.Errorf("libcontainerd: error restarting %v", err) } } @@ -229,15 +262,15 @@ func (ctr *container) handleEvent(e *containerd.Event) error { // discardFifos attempts to fully read the container fifos to unblock processes // that may be blocked on the writer side. func (ctr *container) discardFifos() { + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) for _, i := range []int{syscall.Stdout, syscall.Stderr} { - f := ctr.fifo(i) - c := make(chan struct{}) + f, err := fifo.OpenFifo(ctx, ctr.fifo(i), syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + logrus.Warnf("error opening fifo %v for discarding: %+v", f, err) + continue + } go func() { - r := openReaderFromFifo(f) - close(c) // this channel is used to not close the writer too early, before readonly open has been called. - io.Copy(ioutil.Discard, r) + io.Copy(ioutil.Discard, f) }() - <-c - closeReaderFifo(f) // avoid blocking permanently on open if there is no writer side } } diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index 31c2227df0..b27437626f 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -2,6 +2,7 @@ package libcontainerd import ( "io" + "io/ioutil" "strings" "syscall" "time" @@ -35,7 +36,7 @@ func (ctr *container) newProcess(friendlyName string) *process { } } -func (ctr *container) start() error { +func (ctr *container) start(attachStdio StdioCallback) error { var err error isServicing := false @@ -127,10 +128,10 @@ func (ctr *container) start() error { // Convert io.ReadClosers to io.Readers if stdout != nil { - iopipe.Stdout = openReaderFromPipe(stdout) + iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout}) } if stderr != nil { - iopipe.Stderr = openReaderFromPipe(stderr) + iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr}) } // Save the PID @@ -142,7 +143,7 @@ func (ctr *container) start() error { ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { // OK to return the error here, as waitExit will handle tear-down in HCS return err } @@ -257,7 +258,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err ctr.restarting = false ctr.client.deleteContainer(ctr.friendlyName) if err == nil { - if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.options...); err != nil { + if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.attachStdio, ctr.options...); err != nil { logrus.Errorf("libcontainerd: error restarting %v", err) } } diff --git a/libcontainerd/process_linux.go b/libcontainerd/process_linux.go index 3c48576fe2..135a11e754 100644 --- a/libcontainerd/process_linux.go +++ b/libcontainerd/process_linux.go @@ -1,14 +1,15 @@ package libcontainerd import ( - "fmt" "io" + "io/ioutil" "os" "path/filepath" "syscall" + "time" containerd "github.com/docker/containerd/api/grpc/types" - "github.com/docker/docker/pkg/ioutils" + "github.com/tonistiigi/fifo" "golang.org/x/net/context" ) @@ -26,49 +27,67 @@ type process struct { dir string } -func (p *process) openFifos(terminal bool) (*IOPipe, error) { - bundleDir := p.dir - if err := os.MkdirAll(bundleDir, 0700); err != nil { +func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) { + if err := os.MkdirAll(p.dir, 0700); err != nil { return nil, err } - for i := 0; i < 3; i++ { - f := p.fifo(i) - if err := syscall.Mkfifo(f, 0700); err != nil && !os.IsExist(err) { - return nil, fmt.Errorf("mkfifo: %s %v", f, err) - } - } + ctx, _ := context.WithTimeout(context.Background(), 15*time.Second) io := &IOPipe{} - stdinf, err := os.OpenFile(p.fifo(syscall.Stdin), syscall.O_RDWR, 0) + + io.Stdin, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) if err != nil { return nil, err } - io.Stdout = openReaderFromFifo(p.fifo(syscall.Stdout)) - if !terminal { - io.Stderr = openReaderFromFifo(p.fifo(syscall.Stderr)) - } else { - io.Stderr = emptyReader{} + defer func() { + if err != nil { + io.Stdin.Close() + } + }() + + io.Stdout, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdout), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err } - io.Stdin = ioutils.NewWriteCloserWrapper(stdinf, func() error { - stdinf.Close() - _, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ - Id: p.containerID, - Pid: p.friendlyName, - CloseStdin: true, - }) - return err - }) + defer func() { + if err != nil { + io.Stdout.Close() + } + }() + + if !terminal { + io.Stderr, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stderr), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + io.Stderr.Close() + } + }() + } else { + io.Stderr = ioutil.NopCloser(emptyReader{}) + } return io, nil } +func (p *process) sendCloseStdin() error { + _, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ + Id: p.containerID, + Pid: p.friendlyName, + CloseStdin: true, + }) + return err +} + func (p *process) closeFifos(io *IOPipe) { io.Stdin.Close() - closeReaderFifo(p.fifo(syscall.Stdout)) - closeReaderFifo(p.fifo(syscall.Stderr)) + io.Stdout.Close() + io.Stderr.Close() } type emptyReader struct{} @@ -77,34 +96,6 @@ func (r emptyReader) Read(b []byte) (int, error) { return 0, io.EOF } -func openReaderFromFifo(fn string) io.Reader { - r, w := io.Pipe() - c := make(chan struct{}) - go func() { - close(c) - stdoutf, err := os.OpenFile(fn, syscall.O_RDONLY, 0) - if err != nil { - r.CloseWithError(err) - } - if _, err := io.Copy(w, stdoutf); err != nil { - r.CloseWithError(err) - } - w.Close() - stdoutf.Close() - }() - <-c // wait for the goroutine to get scheduled and syscall to block - return r -} - -// closeReaderFifo closes fifo that may be blocked on open by opening the write side. -func closeReaderFifo(fn string) { - f, err := os.OpenFile(fn, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return - } - f.Close() -} - func (p *process) fifo(index int) string { return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index]) } diff --git a/libcontainerd/process_windows.go b/libcontainerd/process_windows.go index ad6143e1de..f62e93f983 100644 --- a/libcontainerd/process_windows.go +++ b/libcontainerd/process_windows.go @@ -4,6 +4,7 @@ import ( "io" "strconv" "strings" + "sync" "github.com/Microsoft/hcsshim" ) @@ -19,16 +20,17 @@ type process struct { hcsProcess hcsshim.Process } -func openReaderFromPipe(p io.ReadCloser) io.Reader { - r, w := io.Pipe() - go func() { - if _, err := io.Copy(w, p); err != nil { - r.CloseWithError(err) - } - w.Close() - p.Close() - }() - return r +type autoClosingReader struct { + io.ReadCloser + sync.Once +} + +func (r *autoClosingReader) Read(b []byte) (n int, err error) { + n, err = r.ReadCloser.Read(b) + if err == io.EOF { + r.Once.Do(func() { r.ReadCloser.Close() }) + } + return } // fixStdinBackspaceBehavior works around a bug in Windows before build 14350 diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 6f452c1c3b..0c11c41012 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event? // Backend defines callbacks that the client of the library needs to implement. type Backend interface { StateChanged(containerID string, state StateInfo) error - AttachStreams(processFriendlyName string, io IOPipe) error } // Client provides access to containerd features. type Client interface { - Create(containerID string, spec Spec, options ...CreateOption) error + Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error Signal(containerID string, sig int) error SignalProcess(containerID string, processFriendlyName string, sig int) error - AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error + AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) error Resize(containerID, processFriendlyName string, width, height int) error Pause(containerID string) error Resume(containerID string) error - Restore(containerID string, options ...CreateOption) error + Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error Stats(containerID string) (*Stats, error) GetPidsForContainer(containerID string) ([]int, error) Summary(containerID string) ([]Summary, error) @@ -55,10 +54,13 @@ type CreateOption interface { Apply(interface{}) error } +// StdioCallback is called to connect a container or process stdio. +type StdioCallback func(IOPipe) error + // IOPipe contains the stdio streams. type IOPipe struct { Stdin io.WriteCloser - Stdout io.Reader - Stderr io.Reader + Stdout io.ReadCloser + Stderr io.ReadCloser Terminal bool // Whether stderr is connected on Windows } diff --git a/plugin/manager.go b/plugin/manager.go index 254db32f40..d351487259 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -273,24 +273,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } -// AttachStreams attaches io streams to the plugin -func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error { - iop.Stdin.Close() - - logger := logrus.New() - logger.Hooks.Add(logHook{id}) - // TODO: cache writer per id - w := logger.Writer() - go func() { - io.Copy(w, iop.Stdout) - }() - go func() { - // TODO: update logrus and use logger.WriterLevel - io.Copy(w, iop.Stderr) - }() - return nil -} - func (pm *Manager) init() error { dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json")) if err != nil { @@ -447,3 +429,22 @@ func computePrivileges(m *types.PluginManifest) types.PluginPrivileges { } return privileges } + +func attachToLog(id string) func(libcontainerd.IOPipe) error { + return func(iop libcontainerd.IOPipe) error { + iop.Stdin.Close() + + logger := logrus.New() + logger.Hooks.Add(logHook{id}) + // TODO: cache writer per id + w := logger.Writer() + go func() { + io.Copy(w, iop.Stdout) + }() + go func() { + // TODO: update logrus and use logger.WriterLevel + io.Copy(w, iop.Stderr) + }() + return nil + } +} diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index d18874d603..8e21157fcd 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -30,7 +30,7 @@ func (pm *Manager) enable(p *plugin, force bool) error { } p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0) - if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only + if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only if err := p.restartManager.Cancel(); err != nil { logrus.Errorf("enable: restartManager.Cancel failed due to %v", err) } @@ -62,7 +62,7 @@ func (pm *Manager) enable(p *plugin, force bool) error { func (pm *Manager) restore(p *plugin) error { p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0) - return pm.containerdClient.Restore(p.PluginObj.ID, libcontainerd.WithRestartManager(p.restartManager)) + return pm.containerdClient.Restore(p.PluginObj.ID, attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)) } func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) { diff --git a/runconfig/streams.go b/runconfig/streams.go index 117fd89aee..36d0810b00 100644 --- a/runconfig/streams.go +++ b/runconfig/streams.go @@ -7,8 +7,11 @@ import ( "strings" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/pools" ) // StreamConfig holds information about I/O streams managed together. @@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error { return nil } + +// CopyToPipe connects streamconfig with a libcontainerd.IOPipe +func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) { + copyFunc := func(w io.Writer, r io.Reader) { + streamConfig.Add(1) + go func() { + if _, err := pools.Copy(w, r); err != nil { + logrus.Errorf("stream copy error: %+v", err) + } + streamConfig.Done() + }() + } + + if iop.Stdout != nil { + copyFunc(streamConfig.Stdout(), iop.Stdout) + } + if iop.Stderr != nil { + copyFunc(streamConfig.Stderr(), iop.Stderr) + } + + if stdin := streamConfig.Stdin(); stdin != nil { + if iop.Stdin != nil { + go func() { + pools.Copy(iop.Stdin, stdin) + if err := iop.Stdin.Close(); err != nil { + logrus.Errorf("failed to close stdin: %+v", err) + } + }() + } + } +} diff --git a/vendor/src/github.com/tonistiigi/fifo/LICENSE b/vendor/src/github.com/tonistiigi/fifo/LICENSE new file mode 100644 index 0000000000..8d318c1c0e --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/LICENSE @@ -0,0 +1,21 @@ +MIT + +Copyright (C) 2016 Tõnis Tiigi + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/vendor/src/github.com/tonistiigi/fifo/Makefile b/vendor/src/github.com/tonistiigi/fifo/Makefile new file mode 100644 index 0000000000..c1c1cd58ce --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/Makefile @@ -0,0 +1,13 @@ +.PHONY: fmt vet test deps + +test: deps + go test -v ./... + +deps: + go get -d -t ./... + +fmt: + gofmt -s -l . + +vet: + go vet ./... diff --git a/vendor/src/github.com/tonistiigi/fifo/fifo.go b/vendor/src/github.com/tonistiigi/fifo/fifo.go new file mode 100644 index 0000000000..355944e244 --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/fifo.go @@ -0,0 +1,216 @@ +package fifo + +import ( + "io" + "os" + "runtime" + "sync" + "syscall" + + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +type fifo struct { + flag int + opened chan struct{} + closed chan struct{} + closing chan struct{} + err error + file *os.File + closingOnce sync.Once // close has been called + closedOnce sync.Once // fifo is closed + handle *handle +} + +var leakCheckWg *sync.WaitGroup + +// OpenFifo opens a fifo. Returns io.ReadWriteCloser. +// Context can be used to cancel this function until open(2) has not returned. +// Accepted flags: +// - syscall.O_CREAT - create new fifo if one doesn't exist +// - syscall.O_RDONLY - open fifo only from reader side +// - syscall.O_WRONLY - open fifo only from writer side +// - syscall.O_RDWR - open fifo from both sides, never block on syscall level +// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the +// fifo isn't open. read/write will be connected after the actual fifo is +// open or after fifo is closed. +func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + if _, err := os.Stat(fn); err != nil { + if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 { + if err := mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) { + return nil, errors.Wrapf(err, "error creating fifo %v", fn) + } + } else { + return nil, err + } + } + + block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0 + + flag &= ^syscall.O_CREAT + flag &= ^syscall.O_NONBLOCK + + h, err := getHandle(fn) + if err != nil { + return nil, err + } + + f := &fifo{ + handle: h, + flag: flag, + opened: make(chan struct{}), + closed: make(chan struct{}), + closing: make(chan struct{}), + } + + wg := leakCheckWg + if wg != nil { + wg.Add(2) + } + + go func() { + if wg != nil { + defer wg.Done() + } + select { + case <-ctx.Done(): + f.Close() + case <-f.opened: + case <-f.closed: + } + }() + go func() { + if wg != nil { + defer wg.Done() + } + var file *os.File + fn, err := h.Path() + if err == nil { + file, err = os.OpenFile(fn, flag, 0) + } + select { + case <-f.closing: + if err == nil { + select { + case <-ctx.Done(): + err = ctx.Err() + default: + err = errors.Errorf("fifo %v was closed before opening", h.Name()) + } + if file != nil { + file.Close() + } + } + default: + } + if err != nil { + f.closedOnce.Do(func() { + f.err = err + close(f.closed) + }) + return + } + f.file = file + close(f.opened) + }() + if block { + select { + case <-f.opened: + case <-f.closed: + return nil, f.err + } + } + return f, nil +} + +// Read from a fifo to a byte array. +func (f *fifo) Read(b []byte) (int, error) { + if f.flag&syscall.O_WRONLY > 0 { + return 0, errors.New("reading from write-only fifo") + } + select { + case <-f.opened: + return f.file.Read(b) + default: + } + select { + case <-f.opened: + return f.file.Read(b) + case <-f.closed: + return 0, errors.New("reading from a closed fifo") + } +} + +// Write from byte array to a fifo. +func (f *fifo) Write(b []byte) (int, error) { + if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 { + return 0, errors.New("writing to read-only fifo") + } + select { + case <-f.opened: + return f.file.Write(b) + default: + } + select { + case <-f.opened: + return f.file.Write(b) + case <-f.closed: + return 0, errors.New("writing to a closed fifo") + } +} + +// Close the fifo. Next reads/writes will error. This method can also be used +// before open(2) has returned and fifo was never opened. +func (f *fifo) Close() (retErr error) { + for { + select { + case <-f.closed: + f.handle.Close() + return + default: + select { + case <-f.opened: + f.closedOnce.Do(func() { + retErr = f.file.Close() + f.err = retErr + close(f.closed) + }) + default: + if f.flag&syscall.O_RDWR != 0 { + runtime.Gosched() + break + } + f.closingOnce.Do(func() { + close(f.closing) + }) + reverseMode := syscall.O_WRONLY + if f.flag&syscall.O_WRONLY > 0 { + reverseMode = syscall.O_RDONLY + } + fn, err := f.handle.Path() + // if Close() is called concurrently(shouldn't) it may cause error + // because handle is closed + select { + case <-f.closed: + default: + if err != nil { + // Path has become invalid. We will leak a goroutine. + // This case should not happen in linux. + f.closedOnce.Do(func() { + f.err = err + close(f.closed) + }) + <-f.closed + break + } + f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0) + if err == nil { + f.Close() + } + runtime.Gosched() + } + } + } + } +} diff --git a/vendor/src/github.com/tonistiigi/fifo/handle_linux.go b/vendor/src/github.com/tonistiigi/fifo/handle_linux.go new file mode 100644 index 0000000000..7bda64ca67 --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/handle_linux.go @@ -0,0 +1,76 @@ +// +build linux + +package fifo + +import ( + "fmt" + "os" + "sync" + "syscall" + + "github.com/pkg/errors" +) + +const O_PATH = 010000000 + +type handle struct { + f *os.File + dev uint64 + ino uint64 + closeOnce sync.Once + name string +} + +func getHandle(fn string) (*handle, error) { + f, err := os.OpenFile(fn, O_PATH, 0) + if err != nil { + return nil, errors.Wrapf(err, "failed to open %v with O_PATH", fn) + } + + var stat syscall.Stat_t + if err := syscall.Fstat(int(f.Fd()), &stat); err != nil { + f.Close() + return nil, errors.Wrapf(err, "failed to stat handle %v", f.Fd()) + } + + h := &handle{ + f: f, + name: fn, + dev: stat.Dev, + ino: stat.Ino, + } + + // check /proc just in case + if _, err := os.Stat(h.procPath()); err != nil { + f.Close() + return nil, errors.Wrapf(err, "couldn't stat %v", h.procPath()) + } + + return h, nil +} + +func (h *handle) procPath() string { + return fmt.Sprintf("/proc/self/fd/%d", h.f.Fd()) +} + +func (h *handle) Name() string { + return h.name +} + +func (h *handle) Path() (string, error) { + var stat syscall.Stat_t + if err := syscall.Stat(h.procPath(), &stat); err != nil { + return "", errors.Wrapf(err, "path %v could not be statted", h.procPath()) + } + if stat.Dev != h.dev || stat.Ino != h.ino { + return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino) + } + return h.procPath(), nil +} + +func (h *handle) Close() error { + h.closeOnce.Do(func() { + h.f.Close() + }) + return nil +} diff --git a/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go b/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go new file mode 100644 index 0000000000..d9648d8bfa --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/handle_nolinux.go @@ -0,0 +1,49 @@ +// +build !linux + +package fifo + +import ( + "syscall" + + "github.com/pkg/errors" +) + +type handle struct { + fn string + dev uint64 + ino uint64 +} + +func getHandle(fn string) (*handle, error) { + var stat syscall.Stat_t + if err := syscall.Stat(fn, &stat); err != nil { + return nil, errors.Wrapf(err, "failed to stat %v", fn) + } + + h := &handle{ + fn: fn, + dev: uint64(stat.Dev), + ino: stat.Ino, + } + + return h, nil +} + +func (h *handle) Path() (string, error) { + var stat syscall.Stat_t + if err := syscall.Stat(h.fn, &stat); err != nil { + return "", errors.Wrapf(err, "path %v could not be statted", h.fn) + } + if uint64(stat.Dev) != h.dev || stat.Ino != h.ino { + return "", errors.Errorf("failed to verify handle %v/%v %v/%v for %v", stat.Dev, h.dev, stat.Ino, h.ino, h.fn) + } + return h.fn, nil +} + +func (h *handle) Name() string { + return h.fn +} + +func (h *handle) Close() error { + return nil +} diff --git a/vendor/src/github.com/tonistiigi/fifo/mkfifo_nosolaris.go b/vendor/src/github.com/tonistiigi/fifo/mkfifo_nosolaris.go new file mode 100644 index 0000000000..8c6ea45ebc --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/mkfifo_nosolaris.go @@ -0,0 +1,9 @@ +// +build !solaris + +package fifo + +import "syscall" + +func mkfifo(path string, mode uint32) (err error) { + return syscall.Mkfifo(path, mode) +} diff --git a/vendor/src/github.com/tonistiigi/fifo/mkfifo_solaris.go b/vendor/src/github.com/tonistiigi/fifo/mkfifo_solaris.go new file mode 100644 index 0000000000..8d588a45af --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/mkfifo_solaris.go @@ -0,0 +1,11 @@ +// +build solaris + +package fifo + +import ( + "golang.org/x/sys/unix" +) + +func mkfifo(path string, mode uint32) (err error) { + return unix.Mkfifo(path, mode) +} diff --git a/vendor/src/github.com/tonistiigi/fifo/readme.md b/vendor/src/github.com/tonistiigi/fifo/readme.md new file mode 100644 index 0000000000..1c82669060 --- /dev/null +++ b/vendor/src/github.com/tonistiigi/fifo/readme.md @@ -0,0 +1,30 @@ +### fifo + +Go package for handling fifos in a sane way. + +``` +// OpenFifo opens a fifo. Returns io.ReadWriteCloser. +// Context can be used to cancel this function until open(2) has not returned. +// Accepted flags: +// - syscall.O_CREAT - create new fifo if one doesn't exist +// - syscall.O_RDONLY - open fifo only from reader side +// - syscall.O_WRONLY - open fifo only from writer side +// - syscall.O_RDWR - open fifo from both sides, never block on syscall level +// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the +// fifo isn't open. read/write will be connected after the actual fifo is +// open or after fifo is closed. +func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) + + +// Read from a fifo to a byte array. +func (f *fifo) Read(b []byte) (int, error) + + +// Write from byte array to a fifo. +func (f *fifo) Write(b []byte) (int, error) + + +// Close the fifo. Next reads/writes will error. This method can also be used +// before open(2) has returned and fifo was never opened. +func (f *fifo) Close() error +``` \ No newline at end of file