Browse Source

Move stdio attach from libcontainerd backend to callback

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 37a3be2449d2a314305615ffcc287a598a829dba)
Tonis Tiigi 8 years ago
parent
commit
e136d3ef93

+ 44 - 0
container/container.go

@@ -22,6 +22,7 @@ import (
 	"github.com/docker/docker/daemon/network"
 	"github.com/docker/docker/daemon/network"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
 	"github.com/docker/docker/layer"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/promise"
 	"github.com/docker/docker/pkg/promise"
@@ -972,3 +973,46 @@ func (container *Container) CancelAttachContext() {
 	}
 	}
 	container.attachContext.mu.Unlock()
 	container.attachContext.mu.Unlock()
 }
 }
+
+func (container *Container) startLogging() error {
+	if container.HostConfig.LogConfig.Type == "none" {
+		return nil // do not start logging routines
+	}
+
+	l, err := container.StartLogger(container.HostConfig.LogConfig)
+	if err != nil {
+		return fmt.Errorf("Failed to initialize logging driver: %v", err)
+	}
+
+	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
+	container.LogCopier = copier
+	copier.Run()
+	container.LogDriver = l
+
+	// set LogPath field only for json-file logdriver
+	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
+		container.LogPath = jl.LogPath()
+	}
+
+	return nil
+}
+
+// InitializeStdio is called by libcontainerd to connect the stdio.
+func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
+	if err := container.startLogging(); err != nil {
+		container.Reset(false)
+		return err
+	}
+
+	container.StreamConfig.CopyToPipe(iop)
+
+	if container.Stdin() == nil && !container.Config.Tty {
+		if iop.Stdin != nil {
+			if err := iop.Stdin.Close(); err != nil {
+				logrus.Error("error closing stdin: %+v", err)
+			}
+		}
+	}
+
+	return nil
+}

+ 1 - 1
daemon/daemon.go

@@ -175,7 +175,7 @@ func (daemon *Daemon) restore() error {
 			defer wg.Done()
 			defer wg.Done()
 			rm := c.RestartManager(false)
 			rm := c.RestartManager(false)
 			if c.IsRunning() || c.IsPaused() {
 			if c.IsRunning() || c.IsPaused() {
-				if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil {
+				if err := daemon.containerd.Restore(c.ID, c.InitializeStdio, libcontainerd.WithRestartManager(rm)); err != nil {
 					logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
 					logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
 					return
 					return
 				}
 				}

+ 1 - 1
daemon/exec.go

@@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
 
 
 	attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
 	attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
 
 
-	if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil {
+	if err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio); err != nil {
 		return err
 		return err
 	}
 	}
 
 

+ 18 - 0
daemon/exec/exec.go

@@ -1,8 +1,11 @@
 package exec
 package exec
 
 
 import (
 import (
+	"runtime"
 	"sync"
 	"sync"
 
 
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/runconfig"
 	"github.com/docker/docker/runconfig"
 )
 )
@@ -37,6 +40,21 @@ func NewConfig() *Config {
 	}
 	}
 }
 }
 
 
+// InitializeStdio is called by libcontainerd to connect the stdio.
+func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
+	c.StreamConfig.CopyToPipe(iop)
+
+	if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
+		if iop.Stdin != nil {
+			if err := iop.Stdin.Close(); err != nil {
+				logrus.Error("error closing exec stdin: %+v", err)
+			}
+		}
+	}
+
+	return nil
+}
+
 // Store keeps track of the exec configurations.
 // Store keeps track of the exec configurations.
 type Store struct {
 type Store struct {
 	commands map[string]*Config
 	commands map[string]*Config

+ 0 - 25
daemon/logs.go

@@ -12,7 +12,6 @@ import (
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/daemon/logger/jsonfilelog"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/stdcopy"
 	"github.com/docker/docker/pkg/stdcopy"
 	containertypes "github.com/docker/engine-api/types/container"
 	containertypes "github.com/docker/engine-api/types/container"
@@ -120,30 +119,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger,
 	return container.StartLogger(container.HostConfig.LogConfig)
 	return container.StartLogger(container.HostConfig.LogConfig)
 }
 }
 
 
-// StartLogging initializes and starts the container logging stream.
-func (daemon *Daemon) StartLogging(container *container.Container) error {
-	if container.HostConfig.LogConfig.Type == "none" {
-		return nil // do not start logging routines
-	}
-
-	l, err := container.StartLogger(container.HostConfig.LogConfig)
-	if err != nil {
-		return fmt.Errorf("Failed to initialize logging driver: %v", err)
-	}
-
-	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
-	container.LogCopier = copier
-	copier.Run()
-	container.LogDriver = l
-
-	// set LogPath field only for json-file logdriver
-	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
-		container.LogPath = jl.LogPath()
-	}
-
-	return nil
-}
-
 // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.
 // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.
 func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error {
 func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error {
 	if cfg.Type == "" {
 	if cfg.Type == "" {

+ 0 - 56
daemon/monitor.go

@@ -3,13 +3,11 @@ package daemon
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
-	"io"
 	"runtime"
 	"runtime"
 	"strconv"
 	"strconv"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/libcontainerd"
-	"github.com/docker/docker/runconfig"
 )
 )
 
 
 // StateChanged updates daemon state changes from containerd
 // StateChanged updates daemon state changes from containerd
@@ -100,57 +98,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
 
 
 	return nil
 	return nil
 }
 }
-
-// AttachStreams is called by libcontainerd to connect the stdio.
-func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
-	var s *runconfig.StreamConfig
-	c := daemon.containers.Get(id)
-	if c == nil {
-		ec, err := daemon.getExecConfig(id)
-		if err != nil {
-			return fmt.Errorf("no such exec/container: %s", id)
-		}
-		s = ec.StreamConfig
-	} else {
-		s = c.StreamConfig
-		if err := daemon.StartLogging(c); err != nil {
-			c.Reset(false)
-			return err
-		}
-	}
-
-	copyFunc := func(w io.Writer, r io.Reader) {
-		s.Add(1)
-		go func() {
-			if _, err := io.Copy(w, r); err != nil {
-				logrus.Errorf("%v stream copy error: %v", id, err)
-			}
-			s.Done()
-		}()
-	}
-
-	if iop.Stdout != nil {
-		copyFunc(s.Stdout(), iop.Stdout)
-	}
-	if iop.Stderr != nil {
-		copyFunc(s.Stderr(), iop.Stderr)
-	}
-
-	if stdin := s.Stdin(); stdin != nil {
-		if iop.Stdin != nil {
-			go func() {
-				io.Copy(iop.Stdin, stdin)
-				iop.Stdin.Close()
-			}()
-		}
-	} else {
-		if c != nil && !c.Config.Tty {
-			// tty is enabled, so dont close containerd's iopipe stdin.
-			if iop.Stdin != nil {
-				iop.Stdin.Close()
-			}
-		}
-	}
-
-	return nil
-}

+ 1 - 1
daemon/monitor_windows.go

@@ -28,7 +28,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon
 
 
 		// Create a new servicing container, which will start, complete the update, and merge back the
 		// Create a new servicing container, which will start, complete the update, and merge back the
 		// results if it succeeded, all as part of the below function call.
 		// results if it succeeded, all as part of the below function call.
-		if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, servicingOption); err != nil {
+		if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, container.InitializeStdio, servicingOption); err != nil {
 			container.SetExitCode(-1)
 			container.SetExitCode(-1)
 			return fmt.Errorf("Post-run update servicing failed: %s", err)
 			return fmt.Errorf("Post-run update servicing failed: %s", err)
 		}
 		}

+ 1 - 1
daemon/start.go

@@ -141,7 +141,7 @@ func (daemon *Daemon) containerStart(container *container.Container) (err error)
 		createOptions = append(createOptions, *copts...)
 		createOptions = append(createOptions, *copts...)
 	}
 	}
 
 
-	if err := daemon.containerd.Create(container.ID, *spec, createOptions...); err != nil {
+	if err := daemon.containerd.Create(container.ID, *spec, container.InitializeStdio, createOptions...); err != nil {
 		errDesc := grpc.ErrorDesc(err)
 		errDesc := grpc.ErrorDesc(err)
 		logrus.Errorf("Create container failed with error: %s", errDesc)
 		logrus.Errorf("Create container failed with error: %s", errDesc)
 		// if we receive an internal error from the initial start of a container then lets
 		// if we receive an internal error from the initial start of a container then lets

+ 12 - 12
libcontainerd/client_linux.go

@@ -31,7 +31,10 @@ type client struct {
 	liveRestore   bool
 	liveRestore   bool
 }
 }
 
 
-func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
+// AddProcess is the handler for adding a process to an already running
+// container. It's called through docker exec. It returns the system pid of the
+// exec'd process.
+func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error {
 	clnt.lock(containerID)
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
 	container, err := clnt.getContainer(containerID)
@@ -112,14 +115,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
 
 
 	container.processes[processFriendlyName] = p
 	container.processes[processFriendlyName] = p
 
 
-	clnt.unlock(containerID)
-
-	if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
-		clnt.lock(containerID)
+	if err := attachStdio(*iopipe); err != nil {
 		p.closeFifos(iopipe)
 		p.closeFifos(iopipe)
 		return err
 		return err
 	}
 	}
-	clnt.lock(containerID)
 
 
 	return nil
 	return nil
 }
 }
@@ -149,7 +148,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
 	return p, nil
 	return p, nil
 }
 }
 
 
-func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
+func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
 	clnt.lock(containerID)
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	defer clnt.unlock(containerID)
 
 
@@ -175,6 +174,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
 	if err := container.clean(); err != nil {
 	if err := container.clean(); err != nil {
 		return err
 		return err
 	}
 	}
+	container.attachStdio = attachStdio // hack for v1.12 backport
 
 
 	defer func() {
 	defer func() {
 		if err != nil {
 		if err != nil {
@@ -196,7 +196,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
 		return err
 		return err
 	}
 	}
 
 
-	return container.start()
+	return container.start(attachStdio)
 }
 }
 
 
 func (clnt *client) Signal(containerID string, sig int) error {
 func (clnt *client) Signal(containerID string, sig int) error {
@@ -405,7 +405,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
 	return w
 	return w
 }
 }
 
 
-func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
+func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
 	clnt.lock(cont.Id)
 	clnt.lock(cont.Id)
 	defer clnt.unlock(cont.Id)
 	defer clnt.unlock(cont.Id)
 
 
@@ -446,7 +446,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
 		return err
 		return err
 	})
 	})
 
 
-	if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		container.closeFifos(iopipe)
 		container.closeFifos(iopipe)
 		return err
 		return err
 	}
 	}
@@ -538,7 +538,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error)
 	return ev, err
 	return ev, err
 }
 }
 
 
-func (clnt *client) Restore(containerID string, options ...CreateOption) error {
+func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
 	// Synchronize with live events
 	// Synchronize with live events
 	clnt.remote.Lock()
 	clnt.remote.Lock()
 	defer clnt.remote.Unlock()
 	defer clnt.remote.Unlock()
@@ -586,7 +586,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error {
 
 
 	// container is still alive
 	// container is still alive
 	if clnt.liveRestore {
 	if clnt.liveRestore {
-		if err := clnt.restore(cont, ev, options...); err != nil {
+		if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
 			logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
 			logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
 		}
 		}
 		return nil
 		return nil

+ 3 - 3
libcontainerd/client_solaris.go

@@ -8,11 +8,11 @@ type client struct {
 	// Platform specific properties below here.
 	// Platform specific properties below here.
 }
 }
 
 
-func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
+func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error {
 	return nil
 	return nil
 }
 }
 
 
-func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
+func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
 	return nil
 	return nil
 }
 }
 
 
@@ -37,7 +37,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
 }
 }
 
 
 // Restore is the handler for restoring a container
 // Restore is the handler for restoring a container
-func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
+func (clnt *client) Restore(containerID string, attachStdio StdioCallback, unusedOnWindows ...CreateOption) error {
 	return nil
 	return nil
 }
 }
 
 

+ 7 - 13
libcontainerd/client_windows.go

@@ -38,7 +38,7 @@ const defaultOwner = "docker"
 
 
 // Create is the entrypoint to create a container from a spec, and if successfully
 // Create is the entrypoint to create a container from a spec, and if successfully
 // created, start it too.
 // created, start it too.
-func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) error {
+func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error {
 	logrus.Debugln("libcontainerd: client.Create() with spec", spec)
 	logrus.Debugln("libcontainerd: client.Create() with spec", spec)
 
 
 	configuration := &hcsshim.ContainerConfig{
 	configuration := &hcsshim.ContainerConfig{
@@ -143,7 +143,8 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
 				},
 				},
 				commandLine: strings.Join(spec.Process.Args, " "),
 				commandLine: strings.Join(spec.Process.Args, " "),
 			},
 			},
-			processes: make(map[string]*process),
+			processes:   make(map[string]*process),
+			attachStdio: attachStdio,
 		},
 		},
 		ociSpec:      spec,
 		ociSpec:      spec,
 		hcsContainer: hcsContainer,
 		hcsContainer: hcsContainer,
@@ -160,7 +161,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
 	// internal structure, start will keep HCS in sync by deleting the
 	// internal structure, start will keep HCS in sync by deleting the
 	// container there.
 	// container there.
 	logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID)
 	logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID)
-	if err := container.start(); err != nil {
+	if err := container.start(attachStdio); err != nil {
 		clnt.deleteContainer(containerID)
 		clnt.deleteContainer(containerID)
 		return err
 		return err
 	}
 	}
@@ -172,7 +173,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio
 
 
 // AddProcess is the handler for adding a process to an already running
 // AddProcess is the handler for adding a process to an already running
 // container. It's called through docker exec.
 // container. It's called through docker exec.
-func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error {
+func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) error {
 	clnt.lock(containerID)
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
 	container, err := clnt.getContainer(containerID)
@@ -251,18 +252,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
 	// Add the process to the container's list of processes
 	// Add the process to the container's list of processes
 	container.processes[processFriendlyName] = proc
 	container.processes[processFriendlyName] = proc
 
 
-	// Make sure the lock is not held while calling back into the daemon
-	clnt.unlock(containerID)
-
 	// Tell the engine to attach streams back to the client
 	// Tell the engine to attach streams back to the client
-	if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
-		clnt.lock(containerID)
+	if err := attachStdio(*iopipe); err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	// Lock again so that the defer unlock doesn't fail. (I really don't like this code)
-	clnt.lock(containerID)
-
 	// Spin up a go routine waiting for exit to handle cleanup
 	// Spin up a go routine waiting for exit to handle cleanup
 	go container.waitExit(proc, false)
 	go container.waitExit(proc, false)
 
 
@@ -371,7 +365,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
 }
 }
 
 
 // Restore is the handler for restoring a container
 // Restore is the handler for restoring a container
-func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
+func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error {
 	// TODO Windows: Implement this. For now, just tell the backend the container exited.
 	// TODO Windows: Implement this. For now, just tell the backend the container exited.
 	logrus.Debugf("libcontainerd: Restore(%s)", containerID)
 	logrus.Debugf("libcontainerd: Restore(%s)", containerID)
 	return clnt.backend.StateChanged(containerID, StateInfo{
 	return clnt.backend.StateChanged(containerID, StateInfo{

+ 1 - 0
libcontainerd/container.go

@@ -20,6 +20,7 @@ type containerCommon struct {
 	restarting     bool
 	restarting     bool
 	processes      map[string]*process
 	processes      map[string]*process
 	startedAt      time.Time
 	startedAt      time.Time
+	attachStdio    StdioCallback // hack for v1.12 backport
 }
 }
 
 
 // WithRestartManager sets the restartmanager to be used with the container.
 // WithRestartManager sets the restartmanager to be used with the container.

+ 4 - 4
libcontainerd/container_linux.go

@@ -89,7 +89,7 @@ func (ctr *container) spec() (*specs.Spec, error) {
 	return &spec, nil
 	return &spec, nil
 }
 }
 
 
-func (ctr *container) start() error {
+func (ctr *container) start(attachStdio StdioCallback) error {
 	spec, err := ctr.spec()
 	spec, err := ctr.spec()
 	if err != nil {
 	if err != nil {
 		return nil
 		return nil
@@ -108,7 +108,7 @@ func (ctr *container) start() error {
 
 
 	// we need to delay stdin closure after container start or else "stdin close"
 	// we need to delay stdin closure after container start or else "stdin close"
 	// event will be rejected by containerd.
 	// event will be rejected by containerd.
-	// stdin closure happens in AttachStreams
+	// stdin closure happens in attachStdio
 	stdin := iopipe.Stdin
 	stdin := iopipe.Stdin
 	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
 	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
 		var err error
 		var err error
@@ -140,7 +140,7 @@ func (ctr *container) start() error {
 	}
 	}
 	ctr.client.appendContainer(ctr)
 	ctr.client.appendContainer(ctr)
 
 
-	if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		ctr.closeFifos(iopipe)
 		ctr.closeFifos(iopipe)
 		return err
 		return err
 	}
 	}
@@ -224,7 +224,7 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
 					defer ctr.client.unlock(ctr.containerID)
 					defer ctr.client.unlock(ctr.containerID)
 					ctr.restarting = false
 					ctr.restarting = false
 					if err == nil {
 					if err == nil {
-						if err = ctr.start(); err != nil {
+						if err = ctr.start(ctr.attachStdio); err != nil {
 							logrus.Errorf("libcontainerd: error restarting %v", err)
 							logrus.Errorf("libcontainerd: error restarting %v", err)
 						}
 						}
 					}
 					}

+ 3 - 3
libcontainerd/container_windows.go

@@ -36,7 +36,7 @@ func (ctr *container) newProcess(friendlyName string) *process {
 	}
 	}
 }
 }
 
 
-func (ctr *container) start() error {
+func (ctr *container) start(attachStdio StdioCallback) error {
 	var err error
 	var err error
 	isServicing := false
 	isServicing := false
 
 
@@ -143,7 +143,7 @@ func (ctr *container) start() error {
 
 
 	ctr.client.appendContainer(ctr)
 	ctr.client.appendContainer(ctr)
 
 
-	if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		// OK to return the error here, as waitExit will handle tear-down in HCS
 		// OK to return the error here, as waitExit will handle tear-down in HCS
 		return err
 		return err
 	}
 	}
@@ -258,7 +258,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
 			ctr.restarting = false
 			ctr.restarting = false
 			ctr.client.deleteContainer(ctr.friendlyName)
 			ctr.client.deleteContainer(ctr.friendlyName)
 			if err == nil {
 			if err == nil {
-				if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.options...); err != nil {
+				if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.attachStdio, ctr.options...); err != nil {
 					logrus.Errorf("libcontainerd: error restarting %v", err)
 					logrus.Errorf("libcontainerd: error restarting %v", err)
 				}
 				}
 			}
 			}

+ 6 - 4
libcontainerd/types.go

@@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event?
 // Backend defines callbacks that the client of the library needs to implement.
 // Backend defines callbacks that the client of the library needs to implement.
 type Backend interface {
 type Backend interface {
 	StateChanged(containerID string, state StateInfo) error
 	StateChanged(containerID string, state StateInfo) error
-	AttachStreams(processFriendlyName string, io IOPipe) error
 }
 }
 
 
 // Client provides access to containerd features.
 // Client provides access to containerd features.
 type Client interface {
 type Client interface {
-	Create(containerID string, spec Spec, options ...CreateOption) error
+	Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error
 	Signal(containerID string, sig int) error
 	Signal(containerID string, sig int) error
 	SignalProcess(containerID string, processFriendlyName string, sig int) error
 	SignalProcess(containerID string, processFriendlyName string, sig int) error
-	AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error
+	AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) error
 	Resize(containerID, processFriendlyName string, width, height int) error
 	Resize(containerID, processFriendlyName string, width, height int) error
 	Pause(containerID string) error
 	Pause(containerID string) error
 	Resume(containerID string) error
 	Resume(containerID string) error
-	Restore(containerID string, options ...CreateOption) error
+	Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error
 	Stats(containerID string) (*Stats, error)
 	Stats(containerID string) (*Stats, error)
 	GetPidsForContainer(containerID string) ([]int, error)
 	GetPidsForContainer(containerID string) ([]int, error)
 	Summary(containerID string) ([]Summary, error)
 	Summary(containerID string) ([]Summary, error)
@@ -55,6 +54,9 @@ type CreateOption interface {
 	Apply(interface{}) error
 	Apply(interface{}) error
 }
 }
 
 
+// StdioCallback is called to connect a container or process stdio.
+type StdioCallback func(IOPipe) error
+
 // IOPipe contains the stdio streams.
 // IOPipe contains the stdio streams.
 type IOPipe struct {
 type IOPipe struct {
 	Stdin    io.WriteCloser
 	Stdin    io.WriteCloser

+ 19 - 18
plugin/manager.go

@@ -273,24 +273,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
 	return nil
 	return nil
 }
 }
 
 
-// AttachStreams attaches io streams to the plugin
-func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error {
-	iop.Stdin.Close()
-
-	logger := logrus.New()
-	logger.Hooks.Add(logHook{id})
-	// TODO: cache writer per id
-	w := logger.Writer()
-	go func() {
-		io.Copy(w, iop.Stdout)
-	}()
-	go func() {
-		// TODO: update logrus and use logger.WriterLevel
-		io.Copy(w, iop.Stderr)
-	}()
-	return nil
-}
-
 func (pm *Manager) init() error {
 func (pm *Manager) init() error {
 	dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
 	dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
 	if err != nil {
 	if err != nil {
@@ -447,3 +429,22 @@ func computePrivileges(m *types.PluginManifest) types.PluginPrivileges {
 	}
 	}
 	return privileges
 	return privileges
 }
 }
+
+func attachToLog(id string) func(libcontainerd.IOPipe) error {
+	return func(iop libcontainerd.IOPipe) error {
+		iop.Stdin.Close()
+
+		logger := logrus.New()
+		logger.Hooks.Add(logHook{id})
+		// TODO: cache writer per id
+		w := logger.Writer()
+		go func() {
+			io.Copy(w, iop.Stdout)
+		}()
+		go func() {
+			// TODO: update logrus and use logger.WriterLevel
+			io.Copy(w, iop.Stderr)
+		}()
+		return nil
+	}
+}

+ 2 - 2
plugin/manager_linux.go

@@ -30,7 +30,7 @@ func (pm *Manager) enable(p *plugin, force bool) error {
 	}
 	}
 
 
 	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
 	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
-	if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
+	if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
 		if err := p.restartManager.Cancel(); err != nil {
 		if err := p.restartManager.Cancel(); err != nil {
 			logrus.Errorf("enable: restartManager.Cancel failed due to %v", err)
 			logrus.Errorf("enable: restartManager.Cancel failed due to %v", err)
 		}
 		}
@@ -62,7 +62,7 @@ func (pm *Manager) enable(p *plugin, force bool) error {
 
 
 func (pm *Manager) restore(p *plugin) error {
 func (pm *Manager) restore(p *plugin) error {
 	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
 	p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
-	return pm.containerdClient.Restore(p.PluginObj.ID, libcontainerd.WithRestartManager(p.restartManager))
+	return pm.containerdClient.Restore(p.PluginObj.ID, attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager))
 }
 }
 
 
 func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
 func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {

+ 34 - 0
runconfig/streams.go

@@ -7,8 +7,11 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 
 
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/pools"
 )
 )
 
 
 // StreamConfig holds information about I/O streams managed together.
 // StreamConfig holds information about I/O streams managed together.
@@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error {
 
 
 	return nil
 	return nil
 }
 }
+
+// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
+func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) {
+	copyFunc := func(w io.Writer, r io.Reader) {
+		streamConfig.Add(1)
+		go func() {
+			if _, err := pools.Copy(w, r); err != nil {
+				logrus.Errorf("stream copy error: %+v", err)
+			}
+			streamConfig.Done()
+		}()
+	}
+
+	if iop.Stdout != nil {
+		copyFunc(streamConfig.Stdout(), iop.Stdout)
+	}
+	if iop.Stderr != nil {
+		copyFunc(streamConfig.Stderr(), iop.Stderr)
+	}
+
+	if stdin := streamConfig.Stdin(); stdin != nil {
+		if iop.Stdin != nil {
+			go func() {
+				pools.Copy(iop.Stdin, stdin)
+				if err := iop.Stdin.Close(); err != nil {
+					logrus.Error("failed to clise stdin: %+v", err)
+				}
+			}()
+		}
+	}
+}