Merge pull request #29095 from tonistiigi/1.12-io-fixes

[v1.12] backported IO fixes for v1.12
This commit is contained in:
Victor Vieux 2016-12-05 00:45:03 -08:00 committed by GitHub
commit 7b9deca3e0
30 changed files with 732 additions and 227 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/promise"
@ -972,3 +973,46 @@ func (container *Container) CancelAttachContext() {
}
container.attachContext.mu.Unlock()
}
func (container *Container) startLogging() error {
if container.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}
l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {
return fmt.Errorf("Failed to initialize logging driver: %v", err)
}
copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l
// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
}
return nil
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
if err := container.startLogging(); err != nil {
container.Reset(false)
return err
}
container.StreamConfig.CopyToPipe(iop)
if container.Stdin() == nil && !container.Config.Tty {
if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil {
logrus.Warnf("error closing stdin: %+v", err)
}
}
}
return nil
}

View file

@ -175,7 +175,7 @@ func (daemon *Daemon) restore() error {
defer wg.Done()
rm := c.RestartManager(false)
if c.IsRunning() || c.IsPaused() {
if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil {
if err := daemon.containerd.Restore(c.ID, c.InitializeStdio, libcontainerd.WithRestartManager(rm)); err != nil {
logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
return
}

View file

@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil {
if err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio); err != nil {
return err
}

View file

@ -1,8 +1,11 @@
package exec
import (
"runtime"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/runconfig"
)
@ -37,6 +40,21 @@ func NewConfig() *Config {
}
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
c.StreamConfig.CopyToPipe(iop)
if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil {
logrus.Errorf("error closing exec stdin: %+v", err)
}
}
}
return nil
}
// Store keeps track of the exec configurations.
type Store struct {
commands map[string]*Config

View file

@ -12,7 +12,6 @@ import (
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy"
containertypes "github.com/docker/engine-api/types/container"
@ -120,30 +119,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger,
return container.StartLogger(container.HostConfig.LogConfig)
}
// StartLogging initializes and starts the container logging stream.
func (daemon *Daemon) StartLogging(container *container.Container) error {
if container.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}
l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {
return fmt.Errorf("Failed to initialize logging driver: %v", err)
}
copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l
// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
}
return nil
}
// mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.
func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error {
if cfg.Type == "" {

View file

@ -3,13 +3,11 @@ package daemon
import (
"errors"
"fmt"
"io"
"runtime"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/runconfig"
)
// StateChanged updates daemon state changes from containerd
@ -100,57 +98,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
return nil
}
// AttachStreams is called by libcontainerd to connect the stdio.
func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
var s *runconfig.StreamConfig
c := daemon.containers.Get(id)
if c == nil {
ec, err := daemon.getExecConfig(id)
if err != nil {
return fmt.Errorf("no such exec/container: %s", id)
}
s = ec.StreamConfig
} else {
s = c.StreamConfig
if err := daemon.StartLogging(c); err != nil {
c.Reset(false)
return err
}
}
copyFunc := func(w io.Writer, r io.Reader) {
s.Add(1)
go func() {
if _, err := io.Copy(w, r); err != nil {
logrus.Errorf("%v stream copy error: %v", id, err)
}
s.Done()
}()
}
if iop.Stdout != nil {
copyFunc(s.Stdout(), iop.Stdout)
}
if iop.Stderr != nil {
copyFunc(s.Stderr(), iop.Stderr)
}
if stdin := s.Stdin(); stdin != nil {
if iop.Stdin != nil {
go func() {
io.Copy(iop.Stdin, stdin)
iop.Stdin.Close()
}()
}
} else {
if c != nil && !c.Config.Tty {
// tty is enabled, so dont close containerd's iopipe stdin.
if iop.Stdin != nil {
iop.Stdin.Close()
}
}
}
return nil
}

View file

@ -28,7 +28,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon
// Create a new servicing container, which will start, complete the update, and merge back the
// results if it succeeded, all as part of the below function call.
if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, servicingOption); err != nil {
if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, container.InitializeStdio, servicingOption); err != nil {
container.SetExitCode(-1)
return fmt.Errorf("Post-run update servicing failed: %s", err)
}

View file

@ -141,7 +141,7 @@ func (daemon *Daemon) containerStart(container *container.Container) (err error)
createOptions = append(createOptions, *copts...)
}
if err := daemon.containerd.Create(container.ID, *spec, createOptions...); err != nil {
if err := daemon.containerd.Create(container.ID, *spec, container.InitializeStdio, createOptions...); err != nil {
errDesc := grpc.ErrorDesc(err)
logrus.Errorf("Create container failed with error: %s", errDesc)
// if we receive an internal error from the initial start of a container then lets

View file

@ -137,6 +137,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
# containerd
clone git github.com/docker/containerd 0366d7e9693c930cf18c0f50cc16acec064e96c5
clone git github.com/tonistiigi/fifo fe870ccf293940774c2b44e23f6c71fff8f7547d
# cluster
clone git github.com/docker/swarmkit 0cf248feec033f46dc09db40d69fd5128082b79a

View file

@ -4455,3 +4455,15 @@ func (s *DockerSuite) TestRunAddHostInHostMode(c *check.C) {
out, _ := dockerCmd(c, "run", "--add-host=extra:1.2.3.4", "--net=host", "busybox", "cat", "/etc/hosts")
c.Assert(out, checker.Contains, expectedOutput, check.Commentf("Expected '%s', but got %q", expectedOutput, out))
}
func (s *DockerSuite) TestRunStoppedLoggingDriverNoLeak(c *check.C) {
nroutines, err := getGoroutineNumber()
c.Assert(err, checker.IsNil)
out, _, err := dockerCmdWithError("run", "--name=fail", "--log-driver=splunk", "busybox", "true")
c.Assert(err, checker.NotNil)
c.Assert(out, checker.Contains, "Failed to initialize logging driver", check.Commentf("error should be about logging driver, got output %s", out))
// NGoroutines is not updated right away, so we need to wait before failing
c.Assert(waitForGoroutines(nroutines), checker.IsNil)
}

View file

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/mount"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
@ -30,7 +31,10 @@ type client struct {
liveRestore bool
}
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec. It returns the system pid of the
// exec'd process.
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
@ -96,15 +100,25 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
return err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
if err2 := p.sendCloseStdin(); err == nil {
err = err2
}
})
return err
})
container.processes[processFriendlyName] = p
clnt.unlock(containerID)
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
if err := attachStdio(*iopipe); err != nil {
p.closeFifos(iopipe)
return err
}
clnt.lock(containerID)
return nil
}
@ -134,7 +148,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
return p, nil
}
func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)
@ -160,6 +174,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
if err := container.clean(); err != nil {
return err
}
container.attachStdio = attachStdio // hack for v1.12 backport
defer func() {
if err != nil {
@ -181,7 +196,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
return err
}
return container.start()
return container.start(attachStdio)
}
func (clnt *client) Signal(containerID string, sig int) error {
@ -390,7 +405,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
return w
}
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
clnt.lock(cont.Id)
defer clnt.unlock(cont.Id)
@ -421,8 +436,18 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
if err != nil {
return err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
})
return err
})
if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
if err := attachStdio(*iopipe); err != nil {
container.closeFifos(iopipe)
return err
}
@ -435,6 +460,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
}})
if err != nil {
container.closeFifos(iopipe)
return err
}
@ -512,7 +538,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error)
return ev, err
}
func (clnt *client) Restore(containerID string, options ...CreateOption) error {
func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
// Synchronize with live events
clnt.remote.Lock()
defer clnt.remote.Unlock()
@ -560,7 +586,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error {
// container is still alive
if clnt.liveRestore {
if err := clnt.restore(cont, ev, options...); err != nil {
if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
}
return nil

View file

@ -8,11 +8,11 @@ type client struct {
// Platform specific properties below here.
}
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error {
return nil
}
func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
return nil
}
@ -37,7 +37,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
}
// Restore is the handler for restoring a container
func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
func (clnt *client) Restore(containerID string, attachStdio StdioCallback, unusedOnWindows ...CreateOption) error {
return nil
}

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"strings"
"syscall"
@ -37,7 +38,7 @@ const defaultOwner = "docker"
// Create is the entrypoint to create a container from a spec, and if successfully
// created, start it too.
func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) error {
func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error {
logrus.Debugln("libcontainerd: client.Create() with spec", spec)
configuration := &hcsshim.ContainerConfig{
@ -142,7 +143,8 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
},
commandLine: strings.Join(spec.Process.Args, " "),
},
processes: make(map[string]*process),
processes: make(map[string]*process),
attachStdio: attachStdio,
},
ociSpec: spec,
hcsContainer: hcsContainer,
@ -159,7 +161,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
// internal structure, start will keep HCS in sync by deleting the
// container there.
logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID)
if err := container.start(); err != nil {
if err := container.start(attachStdio); err != nil {
clnt.deleteContainer(containerID)
return err
}
@ -171,7 +173,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec.
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
@ -228,10 +230,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = openReaderFromPipe(stdout)
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = openReaderFromPipe(stderr)
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
pid := newProcess.Pid()
@ -250,18 +252,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
// Add the process to the container's list of processes
container.processes[processFriendlyName] = proc
// Make sure the lock is not held while calling back into the daemon
clnt.unlock(containerID)
// Tell the engine to attach streams back to the client
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
if err := attachStdio(*iopipe); err != nil {
return err
}
// Lock again so that the defer unlock doesn't fail. (I really don't like this code)
clnt.lock(containerID)
// Spin up a go routine waiting for exit to handle cleanup
go container.waitExit(proc, false)
@ -370,7 +365,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
}
// Restore is the handler for restoring a container
func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error {
// TODO Windows: Implement this. For now, just tell the backend the container exited.
logrus.Debugf("libcontainerd: Restore(%s)", containerID)
return clnt.backend.StateChanged(containerID, StateInfo{

View file

@ -20,6 +20,7 @@ type containerCommon struct {
restarting bool
processes map[string]*process
startedAt time.Time
attachStdio StdioCallback // hack for v1.12 backport
}
// WithRestartManager sets the restartmanager to be used with the container.

View file

@ -6,13 +6,16 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/restartmanager"
"github.com/opencontainers/specs/specs-go"
specs "github.com/opencontainers/specs/specs-go"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
@ -66,7 +69,7 @@ func (ctr *container) clean() error {
func (ctr *container) cleanProcess(id string) {
if p, ok := ctr.processes[id]; ok {
for _, i := range []int{syscall.Stdin, syscall.Stdout, syscall.Stderr} {
if err := os.Remove(p.fifo(i)); err != nil {
if err := os.Remove(p.fifo(i)); err != nil && !os.IsNotExist(err) {
logrus.Warnf("libcontainerd: failed to remove %v for process %v: %v", p.fifo(i), id, err)
}
}
@ -86,16 +89,44 @@ func (ctr *container) spec() (*specs.Spec, error) {
return &spec, nil
}
func (ctr *container) start() error {
func (ctr *container) start(attachStdio StdioCallback) error {
spec, err := ctr.spec()
if err != nil {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready := make(chan struct{})
iopipe, err := ctr.openFifos(spec.Process.Terminal)
if err != nil {
return err
}
var stdinOnce sync.Once
// we need to delay stdin closure after container start or else "stdin close"
// event will be rejected by containerd.
// stdin closure happens in attachStdio
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
go func() {
select {
case <-ready:
if err := ctr.sendCloseStdin(); err != nil {
logrus.Warnf("failed to close stdin: %+v", err)
}
case <-ctx.Done():
}
}()
})
return err
})
r := &containerd.CreateContainerRequest{
Id: ctr.containerID,
BundlePath: ctr.dir,
@ -109,17 +140,19 @@ func (ctr *container) start() error {
}
ctr.client.appendContainer(ctr)
if err := attachStdio(*iopipe); err != nil {
ctr.closeFifos(iopipe)
return err
}
resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r)
if err != nil {
ctr.closeFifos(iopipe)
return err
}
ctr.startedAt = time.Now()
if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
return err
}
ctr.systemPid = systemPid(resp.Container)
close(ready)
return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{
CommonStateInfo: CommonStateInfo{
@ -191,7 +224,7 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
defer ctr.client.unlock(ctr.containerID)
ctr.restarting = false
if err == nil {
if err = ctr.start(); err != nil {
if err = ctr.start(ctr.attachStdio); err != nil {
logrus.Errorf("libcontainerd: error restarting %v", err)
}
}
@ -229,15 +262,15 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
// discardFifos attempts to fully read the container fifos to unblock processes
// that may be blocked on the writer side.
func (ctr *container) discardFifos() {
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
for _, i := range []int{syscall.Stdout, syscall.Stderr} {
f := ctr.fifo(i)
c := make(chan struct{})
f, err := fifo.OpenFifo(ctx, ctr.fifo(i), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
logrus.Warnf("error opening fifo %v for discarding: %+v", f, err)
continue
}
go func() {
r := openReaderFromFifo(f)
close(c) // this channel is used to not close the writer too early, before readonly open has been called.
io.Copy(ioutil.Discard, r)
io.Copy(ioutil.Discard, f)
}()
<-c
closeReaderFifo(f) // avoid blocking permanently on open if there is no writer side
}
}

View file

@ -2,6 +2,7 @@ package libcontainerd
import (
"io"
"io/ioutil"
"strings"
"syscall"
"time"
@ -35,7 +36,7 @@ func (ctr *container) newProcess(friendlyName string) *process {
}
}
func (ctr *container) start() error {
func (ctr *container) start(attachStdio StdioCallback) error {
var err error
isServicing := false
@ -127,10 +128,10 @@ func (ctr *container) start() error {
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = openReaderFromPipe(stdout)
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = openReaderFromPipe(stderr)
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
// Save the PID
@ -142,7 +143,7 @@ func (ctr *container) start() error {
ctr.client.appendContainer(ctr)
if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
if err := attachStdio(*iopipe); err != nil {
// OK to return the error here, as waitExit will handle tear-down in HCS
return err
}
@ -257,7 +258,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
ctr.restarting = false
ctr.client.deleteContainer(ctr.friendlyName)
if err == nil {
if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.options...); err != nil {
if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.attachStdio, ctr.options...); err != nil {
logrus.Errorf("libcontainerd: error restarting %v", err)
}
}

View file

@ -1,14 +1,15 @@
package libcontainerd
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
@ -26,49 +27,67 @@ type process struct {
dir string
}
func (p *process) openFifos(terminal bool) (*IOPipe, error) {
bundleDir := p.dir
if err := os.MkdirAll(bundleDir, 0700); err != nil {
func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) {
if err := os.MkdirAll(p.dir, 0700); err != nil {
return nil, err
}
for i := 0; i < 3; i++ {
f := p.fifo(i)
if err := syscall.Mkfifo(f, 0700); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("mkfifo: %s %v", f, err)
}
}
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
io := &IOPipe{}
stdinf, err := os.OpenFile(p.fifo(syscall.Stdin), syscall.O_RDWR, 0)
io.Stdin, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
io.Stdout = openReaderFromFifo(p.fifo(syscall.Stdout))
if !terminal {
io.Stderr = openReaderFromFifo(p.fifo(syscall.Stderr))
} else {
io.Stderr = emptyReader{}
defer func() {
if err != nil {
io.Stdin.Close()
}
}()
io.Stdout, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdout), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
io.Stdin = ioutils.NewWriteCloserWrapper(stdinf, func() error {
stdinf.Close()
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
})
defer func() {
if err != nil {
io.Stdout.Close()
}
}()
if !terminal {
io.Stderr, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stderr), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
io.Stderr.Close()
}
}()
} else {
io.Stderr = ioutil.NopCloser(emptyReader{})
}
return io, nil
}
func (p *process) sendCloseStdin() error {
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
}
func (p *process) closeFifos(io *IOPipe) {
io.Stdin.Close()
closeReaderFifo(p.fifo(syscall.Stdout))
closeReaderFifo(p.fifo(syscall.Stderr))
io.Stdout.Close()
io.Stderr.Close()
}
type emptyReader struct{}
@ -77,34 +96,6 @@ func (r emptyReader) Read(b []byte) (int, error) {
return 0, io.EOF
}
func openReaderFromFifo(fn string) io.Reader {
r, w := io.Pipe()
c := make(chan struct{})
go func() {
close(c)
stdoutf, err := os.OpenFile(fn, syscall.O_RDONLY, 0)
if err != nil {
r.CloseWithError(err)
}
if _, err := io.Copy(w, stdoutf); err != nil {
r.CloseWithError(err)
}
w.Close()
stdoutf.Close()
}()
<-c // wait for the goroutine to get scheduled and syscall to block
return r
}
// closeReaderFifo closes fifo that may be blocked on open by opening the write side.
func closeReaderFifo(fn string) {
f, err := os.OpenFile(fn, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return
}
f.Close()
}
func (p *process) fifo(index int) string {
return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index])
}

View file

@ -4,6 +4,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"github.com/Microsoft/hcsshim"
)
@ -19,16 +20,17 @@ type process struct {
hcsProcess hcsshim.Process
}
func openReaderFromPipe(p io.ReadCloser) io.Reader {
r, w := io.Pipe()
go func() {
if _, err := io.Copy(w, p); err != nil {
r.CloseWithError(err)
}
w.Close()
p.Close()
}()
return r
type autoClosingReader struct {
io.ReadCloser
sync.Once
}
func (r *autoClosingReader) Read(b []byte) (n int, err error) {
n, err = r.ReadCloser.Read(b)
if err == io.EOF {
r.Once.Do(func() { r.ReadCloser.Close() })
}
return
}
// fixStdinBackspaceBehavior works around a bug in Windows before build 14350

View file

@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event?
// Backend defines callbacks that the client of the library needs to implement.
type Backend interface {
StateChanged(containerID string, state StateInfo) error
AttachStreams(processFriendlyName string, io IOPipe) error
}
// Client provides access to containerd features.
type Client interface {
Create(containerID string, spec Spec, options ...CreateOption) error
Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error
Signal(containerID string, sig int) error
SignalProcess(containerID string, processFriendlyName string, sig int) error
AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error
AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) error
Resize(containerID, processFriendlyName string, width, height int) error
Pause(containerID string) error
Resume(containerID string) error
Restore(containerID string, options ...CreateOption) error
Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error
Stats(containerID string) (*Stats, error)
GetPidsForContainer(containerID string) ([]int, error)
Summary(containerID string) ([]Summary, error)
@ -55,10 +54,13 @@ type CreateOption interface {
Apply(interface{}) error
}
// StdioCallback is called to connect a container or process stdio.
type StdioCallback func(IOPipe) error
// IOPipe contains the stdio streams.
type IOPipe struct {
Stdin io.WriteCloser
Stdout io.Reader
Stderr io.Reader
Stdout io.ReadCloser
Stderr io.ReadCloser
Terminal bool // Whether stderr is connected on Windows
}

View file

@ -273,24 +273,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
return nil
}
// AttachStreams attaches io streams to the plugin
func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error {
iop.Stdin.Close()
logger := logrus.New()
logger.Hooks.Add(logHook{id})
// TODO: cache writer per id
w := logger.Writer()
go func() {
io.Copy(w, iop.Stdout)
}()
go func() {
// TODO: update logrus and use logger.WriterLevel
io.Copy(w, iop.Stderr)
}()
return nil
}
func (pm *Manager) init() error {
dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
if err != nil {
@ -447,3 +429,22 @@ func computePrivileges(m *types.PluginManifest) types.PluginPrivileges {
}
return privileges
}
func attachToLog(id string) func(libcontainerd.IOPipe) error {
return func(iop libcontainerd.IOPipe) error {
iop.Stdin.Close()
logger := logrus.New()
logger.Hooks.Add(logHook{id})
// TODO: cache writer per id
w := logger.Writer()
go func() {
io.Copy(w, iop.Stdout)
}()
go func() {
// TODO: update logrus and use logger.WriterLevel
io.Copy(w, iop.Stderr)
}()
return nil
}
}

View file

@ -30,7 +30,7 @@ func (pm *Manager) enable(p *plugin, force bool) error {
}
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
if err := p.restartManager.Cancel(); err != nil {
logrus.Errorf("enable: restartManager.Cancel failed due to %v", err)
}
@ -62,7 +62,7 @@ func (pm *Manager) enable(p *plugin, force bool) error {
func (pm *Manager) restore(p *plugin) error {
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
return pm.containerdClient.Restore(p.PluginObj.ID, libcontainerd.WithRestartManager(p.restartManager))
return pm.containerdClient.Restore(p.PluginObj.ID, attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager))
}
func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {

View file

@ -7,8 +7,11 @@ import (
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools"
)
// StreamConfig holds information about I/O streams managed together.
@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error {
return nil
}
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) {
copyFunc := func(w io.Writer, r io.Reader) {
streamConfig.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %+v", err)
}
streamConfig.Done()
}()
}
if iop.Stdout != nil {
copyFunc(streamConfig.Stdout(), iop.Stdout)
}
if iop.Stderr != nil {
copyFunc(streamConfig.Stderr(), iop.Stderr)
}
if stdin := streamConfig.Stdin(); stdin != nil {
if iop.Stdin != nil {
go func() {
pools.Copy(iop.Stdin, stdin)
if err := iop.Stdin.Close(); err != nil {
logrus.Errorf("failed to close stdin: %+v", err)
}
}()
}
}
}

View file

@ -0,0 +1,21 @@
MIT
Copyright (C) 2016 Tõnis Tiigi <tonistiigi@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -0,0 +1,13 @@
.PHONY: fmt vet test deps
test: deps
go test -v ./...
deps:
go get -d -t ./...
fmt:
gofmt -s -l .
vet:
go vet ./...

View file

@ -0,0 +1,216 @@
package fifo
import (
"io"
"os"
"runtime"
"sync"
"syscall"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
type fifo struct {
flag int
opened chan struct{}
closed chan struct{}
closing chan struct{}
err error
file *os.File
closingOnce sync.Once // close has been called
closedOnce sync.Once // fifo is closed
handle *handle
}
var leakCheckWg *sync.WaitGroup
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
// Context can be used to cancel this function until open(2) has not returned.
// Accepted flags:
// - syscall.O_CREAT - create new fifo if one doesn't exist
// - syscall.O_RDONLY - open fifo only from reader side
// - syscall.O_WRONLY - open fifo only from writer side
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
// fifo isn't open. read/write will be connected after the actual fifo is
// open or after fifo is closed.
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
if _, err := os.Stat(fn); err != nil {
if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
if err := mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
return nil, errors.Wrapf(err, "error creating fifo %v", fn)
}
} else {
return nil, err
}
}
block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
flag &= ^syscall.O_CREAT
flag &= ^syscall.O_NONBLOCK
h, err := getHandle(fn)
if err != nil {
return nil, err
}
f := &fifo{
handle: h,
flag: flag,
opened: make(chan struct{}),
closed: make(chan struct{}),
closing: make(chan struct{}),
}
wg := leakCheckWg
if wg != nil {
wg.Add(2)
}
go func() {
if wg != nil {
defer wg.Done()
}
select {
case <-ctx.Done():
f.Close()
case <-f.opened:
case <-f.closed:
}
}()
go func() {
if wg != nil {
defer wg.Done()
}
var file *os.File
fn, err := h.Path()
if err == nil {
file, err = os.OpenFile(fn, flag, 0)
}
select {
case <-f.closing:
if err == nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = errors.Errorf("fifo %v was closed before opening", h.Name())
}
if file != nil {
file.Close()
}
}
default:
}
if err != nil {
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
return
}
f.file = file
close(f.opened)
}()
if block {
select {
case <-f.opened:
case <-f.closed:
return nil, f.err
}
}
return f, nil
}
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error) {
if f.flag&syscall.O_WRONLY > 0 {
return 0, errors.New("reading from write-only fifo")
}
select {
case <-f.opened:
return f.file.Read(b)
default:
}
select {
case <-f.opened:
return f.file.Read(b)
case <-f.closed:
return 0, errors.New("reading from a closed fifo")
}
}
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error) {
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return 0, errors.New("writing to read-only fifo")
}
select {
case <-f.opened:
return f.file.Write(b)
default:
}
select {
case <-f.opened:
return f.file.Write(b)
case <-f.closed:
return 0, errors.New("writing to a closed fifo")
}
}
// Close the fifo. Next reads/writes will error. This method can also be used
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() (retErr error) {
for {
select {
case <-f.closed:
f.handle.Close()
return
default:
select {
case <-f.opened:
f.closedOnce.Do(func() {
retErr = f.file.Close()
f.err = retErr
close(f.closed)
})
default:
if f.flag&syscall.O_RDWR != 0 {
runtime.Gosched()
break
}
f.closingOnce.Do(func() {
close(f.closing)
})
reverseMode := syscall.O_WRONLY
if f.flag&syscall.O_WRONLY > 0 {
reverseMode = syscall.O_RDONLY
}
fn, err := f.handle.Path()
// if Close() is called concurrently(shouldn't) it may cause error
// because handle is closed
select {
case <-f.closed:
default:
if err != nil {
// Path has become invalid. We will leak a goroutine.
// This case should not happen in linux.
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
<-f.closed
break
}
f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
if err == nil {
f.Close()
}
runtime.Gosched()
}
}
}
}
}

View file

@ -0,0 +1,76 @@
// +build linux
package fifo
import (
"fmt"
"os"
"sync"
"syscall"
"github.com/pkg/errors"
)
const O_PATH = 010000000
type handle struct {
f *os.File
dev uint64
ino uint64
closeOnce sync.Once
name string
}
func getHandle(fn string) (*handle, error) {
f, err := os.OpenFile(fn, O_PATH, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to open %v with O_PATH", fn)
}
var stat syscall.Stat_t
if err := syscall.Fstat(int(f.Fd()), &stat); err != nil {
f.Close()
return nil, errors.Wrapf(err, "failed to stat handle %v", f.Fd())
}
h := &handle{
f: f,
name: fn,
dev: stat.Dev,
ino: stat.Ino,
}
// check /proc just in case
if _, err := os.Stat(h.procPath()); err != nil {
f.Close()
return nil, errors.Wrapf(err, "couldn't stat %v", h.procPath())
}
return h, nil
}
func (h *handle) procPath() string {
return fmt.Sprintf("/proc/self/fd/%d", h.f.Fd())
}
func (h *handle) Name() string {
return h.name
}
func (h *handle) Path() (string, error) {
var stat syscall.Stat_t
if err := syscall.Stat(h.procPath(), &stat); err != nil {
return "", errors.Wrapf(err, "path %v could not be statted", h.procPath())
}
if stat.Dev != h.dev || stat.Ino != h.ino {
return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino)
}
return h.procPath(), nil
}
func (h *handle) Close() error {
h.closeOnce.Do(func() {
h.f.Close()
})
return nil
}

View file

@ -0,0 +1,49 @@
// +build !linux
package fifo
import (
"syscall"
"github.com/pkg/errors"
)
type handle struct {
fn string
dev uint64
ino uint64
}
func getHandle(fn string) (*handle, error) {
var stat syscall.Stat_t
if err := syscall.Stat(fn, &stat); err != nil {
return nil, errors.Wrapf(err, "failed to stat %v", fn)
}
h := &handle{
fn: fn,
dev: uint64(stat.Dev),
ino: stat.Ino,
}
return h, nil
}
func (h *handle) Path() (string, error) {
var stat syscall.Stat_t
if err := syscall.Stat(h.fn, &stat); err != nil {
return "", errors.Wrapf(err, "path %v could not be statted", h.fn)
}
if uint64(stat.Dev) != h.dev || stat.Ino != h.ino {
return "", errors.Errorf("failed to verify handle %v/%v %v/%v for %v", stat.Dev, h.dev, stat.Ino, h.ino, h.fn)
}
return h.fn, nil
}
func (h *handle) Name() string {
return h.fn
}
func (h *handle) Close() error {
return nil
}

View file

@ -0,0 +1,9 @@
// +build !solaris
package fifo
import "syscall"
func mkfifo(path string, mode uint32) (err error) {
return syscall.Mkfifo(path, mode)
}

View file

@ -0,0 +1,11 @@
// +build solaris
package fifo
import (
"golang.org/x/sys/unix"
)
func mkfifo(path string, mode uint32) (err error) {
return unix.Mkfifo(path, mode)
}

View file

@ -0,0 +1,30 @@
### fifo
Go package for handling fifos in a sane way.
```
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
// Context can be used to cancel this function until open(2) has not returned.
// Accepted flags:
// - syscall.O_CREAT - create new fifo if one doesn't exist
// - syscall.O_RDONLY - open fifo only from reader side
// - syscall.O_WRONLY - open fifo only from writer side
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
// fifo isn't open. read/write will be connected after the actual fifo is
// open or after fifo is closed.
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error)
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error)
// Close the fifo. Next reads/writes will error. This method can also be used
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() error
```