Remove libcontainerd.IOPipe

replaced with cio.DirectIO

Signed-off-by: Daniel Nephin <dnephin@docker.com>
This commit is contained in:
Daniel Nephin 2017-12-07 14:26:27 -05:00
parent 4f5c47aae4
commit 3fec7c0858
13 changed files with 48 additions and 421 deletions

View file

@ -27,7 +27,6 @@ import (
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
@ -1004,7 +1003,7 @@ func (container *Container) CloseStreams() error {
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) {
func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
if err := container.startLogging(); err != nil {
container.Reset(false)
return nil, err

View file

@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/docker/docker/libcontainerd"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools"
@ -114,7 +114,7 @@ func (c *Config) CloseStreams() error {
}
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *libcontainerd.IOPipe) {
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.Add(1)
go func() {

View file

@ -6,7 +6,6 @@ import (
"github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)
@ -63,7 +62,7 @@ func (i *rio) Wait() {
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) {
func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop)
if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {

View file

@ -121,7 +121,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
c.Lock()
defer c.Unlock()
var rio cio.IO
var rio *cio.DirectIO
defer func() {
err = wrapError(err)
}()
@ -139,13 +139,12 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
}()
t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) {
io, err := newIOPipe(fifos)
rio, err = cio.NewDirectIO(ctx, fifos)
if err != nil {
return nil, err
}
rio, err = attachStdio(io)
return rio, err
return attachStdio(rio)
})
if err != nil && !errdefs.IsNotFound(errors.Cause(err)) {
return false, -1, err
@ -255,7 +254,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
uid, gid := getSpecUser(spec)
t, err = ctr.ctr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(ctr.bundleDir, id, InitProcessName, withStdin, spec.Process.Terminal)
fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
return rio, err
},
@ -315,7 +314,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
stdinCloseSync = make(chan struct{})
)
fifos := newFIFOSet(ctr.bundleDir, containerID, processID, withStdin, spec.Terminal)
fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
@ -612,7 +611,7 @@ func (c *client) getProcess(containerID, processID string) (containerd.Process,
// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
io, err := newIOPipe(fifos)
io, err := cio.NewDirectIO(context.Background(), fifos)
if err != nil {
return nil, err
}
@ -687,7 +686,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
"container": ei.ContainerID,
}).Error("failed to find container")
} else {
rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ContainerID, ei.ProcessID, true, false))
rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false))
}
}
})

View file

@ -80,25 +80,27 @@ func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) {
return p, nil
}
func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
fifos := &cio.FIFOSet{
Terminal: withTerminal,
Out: filepath.Join(bundleDir, processID+"-stdout"),
Config: cio.Config{
Terminal: withTerminal,
Stdout: filepath.Join(bundleDir, processID+"-stdout"),
},
}
if withStdin {
fifos.In = filepath.Join(bundleDir, processID+"-stdin")
fifos.Stdin = filepath.Join(bundleDir, processID+"-stdin")
}
if !fifos.Terminal {
fifos.Err = filepath.Join(bundleDir, processID+"-stderr")
fifos.Stderr = filepath.Join(bundleDir, processID+"-stderr")
}
return fifos
}
func rmFIFOSet(fset *cio.FIFOSet) {
for _, fn := range []string{fset.Out, fset.In, fset.Err} {
for _, fn := range []string{fset.Stdout, fset.Stdin, fset.Stderr} {
if fn != "" {
if err := os.RemoveAll(fn); err != nil {
logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", fn, err)

View file

@ -18,6 +18,7 @@ import (
"github.com/Microsoft/hcsshim"
opengcs "github.com/Microsoft/opengcs/client"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/pkg/sysinfo"
"github.com/docker/docker/pkg/system"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -670,28 +671,12 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
return p.pid, nil
}
var (
stdout, stderr io.ReadCloser
stdin io.WriteCloser
)
stdin, stdout, stderr, err = newProcess.Stdio()
dio, err := newIOFromProcess(newProcess)
if err != nil {
logger.WithError(err).Error("failed to get stdio pipes")
return -1, err
}
iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal}
iopipe.Stdin = createStdInCloser(stdin, newProcess)
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
_, err = attachStdio(iopipe)
_, err = attachStdio(dio)
if err != nil {
logger.WithError(err).Error("failed to attache stdio")
return -1, err
@ -727,6 +712,26 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
return p.pid, nil
}
func newIOFromProcess(newProcess process) (*cio.DirectIO, error) {
stdin, stdout, stderr, err := newProcess.Stdio()
if err != nil {
return nil, err
}
dio := cio.DirectIO{
Terminal: ctr.ociSpec.Process.Terminal,
Stdin: createStdInCloser(stdin, newProcess),
}
// Convert io.ReadClosers to io.Readers
if stdout != nil {
dio.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
dio.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
return dio, nil
}
// Exec adds a process in an running container
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(containerID)
@ -807,25 +812,14 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
}
}()
stdin, stdout, stderr, err = newProcess.Stdio()
dio, err := newIOFromProcess(newProcess)
if err != nil {
logger.WithError(err).Error("getting std pipes failed")
logger.WithError(err).Error("failed to get stdio pipes")
return -1, err
}
iopipe := &IOPipe{Terminal: spec.Terminal}
iopipe.Stdin = createStdInCloser(stdin, newProcess)
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
dio.Termainl = spec.Terminal
// Tell the engine to attach streams back to the client
_, err = attachStdio(iopipe)
_, err = attachStdio(dio)
if err != nil {
return -1, err
}

View file

@ -1,36 +0,0 @@
package libcontainerd
import "github.com/containerd/containerd/cio"
// Config returns the containerd.IOConfig of this pipe set
func (p *IOPipe) Config() cio.Config {
return p.config
}
// Cancel aborts ongoing operations if they have not completed yet
func (p *IOPipe) Cancel() {
p.cancel()
}
// Wait waits for io operations to finish
func (p *IOPipe) Wait() {
}
// Close closes the underlying pipes
func (p *IOPipe) Close() error {
p.cancel()
if p.Stdin != nil {
p.Stdin.Close()
}
if p.Stdout != nil {
p.Stdout.Close()
}
if p.Stderr != nil {
p.Stderr.Close()
}
return nil
}

View file

@ -1,60 +0,0 @@
// +build !windows
package libcontainerd
import (
"context"
"io"
"syscall"
"github.com/containerd/containerd/cio"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) {
var (
err error
ctx, cancel = context.WithCancel(context.Background())
f io.ReadWriteCloser
iop = &IOPipe{
Terminal: fifos.Terminal,
cancel: cancel,
config: cio.Config{
Terminal: fifos.Terminal,
Stdin: fifos.In,
Stdout: fifos.Out,
Stderr: fifos.Err,
},
}
)
defer func() {
if err != nil {
cancel()
iop.Close()
}
}()
if fifos.In != "" {
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stdin = f
}
if fifos.Out != "" {
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stdout = f
}
if fifos.Err != "" {
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stderr = f
}
return iop, nil
}

View file

@ -1,138 +0,0 @@
package libcontainerd
import (
"context"
"io"
"net"
"sync"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/cio"
"github.com/pkg/errors"
)
type winpipe struct {
sync.Mutex
ctx context.Context
listener net.Listener
readyCh chan struct{}
readyErr error
client net.Conn
}
func newWinpipe(ctx context.Context, pipe string) (*winpipe, error) {
l, err := winio.ListenPipe(pipe, nil)
if err != nil {
return nil, errors.Wrapf(err, "%q pipe creation failed", pipe)
}
wp := &winpipe{
ctx: ctx,
listener: l,
readyCh: make(chan struct{}),
}
go func() {
go func() {
defer close(wp.readyCh)
defer wp.listener.Close()
c, err := wp.listener.Accept()
if err != nil {
wp.Lock()
if wp.readyErr == nil {
wp.readyErr = err
}
wp.Unlock()
return
}
wp.client = c
}()
select {
case <-wp.readyCh:
case <-ctx.Done():
wp.Lock()
if wp.readyErr == nil {
wp.listener.Close()
wp.readyErr = ctx.Err()
}
wp.Unlock()
}
}()
return wp, nil
}
func (wp *winpipe) Read(b []byte) (int, error) {
select {
case <-wp.ctx.Done():
return 0, wp.ctx.Err()
case <-wp.readyCh:
return wp.client.Read(b)
}
}
func (wp *winpipe) Write(b []byte) (int, error) {
select {
case <-wp.ctx.Done():
return 0, wp.ctx.Err()
case <-wp.readyCh:
return wp.client.Write(b)
}
}
func (wp *winpipe) Close() error {
select {
case <-wp.readyCh:
return wp.client.Close()
default:
return nil
}
}
func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) {
var (
err error
ctx, cancel = context.WithCancel(context.Background())
p io.ReadWriteCloser
iop = &IOPipe{
Terminal: fifos.Terminal,
cancel: cancel,
config: cio.Config{
Terminal: fifos.Terminal,
Stdin: fifos.In,
Stdout: fifos.Out,
Stderr: fifos.Err,
},
}
)
defer func() {
if err != nil {
cancel()
iop.Close()
}
}()
if fifos.In != "" {
if p, err = newWinpipe(ctx, fifos.In); err != nil {
return nil, err
}
iop.Stdin = p
}
if fifos.Out != "" {
if p, err = newWinpipe(ctx, fifos.Out); err != nil {
return nil, err
}
iop.Stdout = p
}
if fifos.Err != "" {
if p, err = newWinpipe(ctx, fifos.Err); err != nil {
return nil, err
}
iop.Stderr = p
}
return iop, nil
}

View file

@ -1,56 +0,0 @@
// +build !windows
package libcontainerd
import "github.com/pkg/errors"
// process represents the state for the main container process or an exec.
type process struct {
// id is the logical name of the process
id string
// cid is the container id to which this process belongs
cid string
// pid is the identifier of the process
pid uint32
// io holds the io reader/writer associated with the process
io *IOPipe
// root is the state directory for the process
root string
}
func (p *process) ID() string {
return p.id
}
func (p *process) Pid() uint32 {
return p.pid
}
func (p *process) SetPid(pid uint32) error {
if p.pid != 0 {
return errors.Errorf("pid is already set to %d", pid)
}
p.pid = pid
return nil
}
func (p *process) IOPipe() *IOPipe {
return p.io
}
func (p *process) CloseIO() {
if p.io.Stdin != nil {
p.io.Stdin.Close()
}
if p.io.Stdout != nil {
p.io.Stdout.Close()
}
if p.io.Stderr != nil {
p.io.Stderr.Close()
}
}

View file

@ -1,59 +0,0 @@
package libcontainerd
import (
"os"
"path/filepath"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
var fdNames = map[int]string{
unix.Stdin: "stdin",
unix.Stdout: "stdout",
unix.Stderr: "stderr",
}
func (p *process) pipeName(index int) string {
return filepath.Join(p.root, p.id+"-"+fdNames[index])
}
func (p *process) IOPaths() (string, string, string) {
var (
stdin = p.pipeName(unix.Stdin)
stdout = p.pipeName(unix.Stdout)
stderr = p.pipeName(unix.Stderr)
)
// TODO: debug why we're having zombies when I don't unset those
if p.io.Stdin == nil {
stdin = ""
}
if p.io.Stderr == nil {
stderr = ""
}
return stdin, stdout, stderr
}
func (p *process) Cleanup() error {
var retErr error
// Ensure everything was closed
p.CloseIO()
for _, i := range [3]string{
p.pipeName(unix.Stdin),
p.pipeName(unix.Stdout),
p.pipeName(unix.Stderr),
} {
err := os.Remove(i)
if err != nil {
if retErr == nil {
retErr = errors.Wrapf(err, "failed to remove %s", i)
} else {
retErr = errors.Wrapf(retErr, "failed to remove %s", i)
}
}
}
return retErr
}

View file

@ -2,7 +2,6 @@ package libcontainerd
import (
"context"
"io"
"time"
"github.com/containerd/containerd"
@ -107,20 +106,4 @@ type Client interface {
}
// StdioCallback is called to connect a container or process stdio.
type StdioCallback func(*IOPipe) (cio.IO, error)
// IOPipe contains the stdio streams.
type IOPipe struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
Terminal bool // Whether stderr is connected on Windows
cancel context.CancelFunc
config cio.Config
}
// ServerVersion contains version information as retrieved from the
// server
type ServerVersion struct {
}
type StdioCallback func(io *cio.DirectIO) (cio.IO, error)

View file

@ -122,7 +122,7 @@ func (c *rio) Wait() {
}
func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerd.StdioCallback {
return func(iop *libcontainerd.IOPipe) (cio.IO, error) {
return func(iop *cio.DirectIO) (cio.IO, error) {
if iop.Stdin != nil {
iop.Stdin.Close()
// closing stdin shouldn't be needed here, it should never be open