Переглянути джерело

Merge pull request #27467 from tonistiigi/attach-cb

Move stdio attach from libcontainerd backend to callback
Antonio Murdaca 8 роки тому
батько
коміт
8ed31089c0

+ 44 - 0
container/container.go

@@ -25,6 +25,7 @@ import (
 	"github.com/docker/docker/daemon/network"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/promise"
@@ -1012,3 +1013,46 @@ func (container *Container) CancelAttachContext() {
 	}
 	container.attachContext.mu.Unlock()
 }
+
+func (container *Container) startLogging() error {
+	if container.HostConfig.LogConfig.Type == "none" {
+		return nil // do not start logging routines
+	}
+
+	l, err := container.StartLogger(container.HostConfig.LogConfig)
+	if err != nil {
+		return fmt.Errorf("Failed to initialize logging driver: %v", err)
+	}
+
+	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
+	container.LogCopier = copier
+	copier.Run()
+	container.LogDriver = l
+
+	// set LogPath field only for json-file logdriver
+	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
+		container.LogPath = jl.LogPath()
+	}
+
+	return nil
+}
+
+// InitializeStdio is called by libcontainerd to connect the stdio.
+func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
+	if err := container.startLogging(); err != nil {
+		container.Reset(false)
+		return err
+	}
+
+	container.StreamConfig.CopyToPipe(iop)
+
+	if container.Stdin() == nil && !container.Config.Tty {
+		if iop.Stdin != nil {
+			if err := iop.Stdin.Close(); err != nil {
+				logrus.Error("error closing stdin: %+v", err)
+			}
+		}
+	}
+
+	return nil
+}

+ 1 - 1
daemon/daemon.go

@@ -193,7 +193,7 @@ func (daemon *Daemon) restore() error {
 
 			if c.IsRunning() || c.IsPaused() {
 				c.RestartManager().Cancel() // manually start containers because some need to wait for swarm networking
-				if err := daemon.containerd.Restore(c.ID); err != nil {
+				if err := daemon.containerd.Restore(c.ID, c.InitializeStdio); err != nil {
 					logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
 					return
 				}

+ 1 - 1
daemon/exec.go

@@ -212,7 +212,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
 
 	attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
 
-	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p)
+	systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
 	if err != nil {
 		return err
 	}

+ 18 - 0
daemon/exec/exec.go

@@ -1,8 +1,11 @@
 package exec
 
 import (
+	"runtime"
 	"sync"
 
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/runconfig"
 )
@@ -39,6 +42,21 @@ func NewConfig() *Config {
 	}
 }
 
+// InitializeStdio is called by libcontainerd to connect the stdio.
+func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
+	c.StreamConfig.CopyToPipe(iop)
+
+	if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
+		if iop.Stdin != nil {
+			if err := iop.Stdin.Close(); err != nil {
+				logrus.Error("error closing exec stdin: %+v", err)
+			}
+		}
+	}
+
+	return nil
+}
+
 // Store keeps track of the exec configurations.
 type Store struct {
 	commands map[string]*Config

+ 0 - 25
daemon/logs.go

@@ -14,7 +14,6 @@ import (
 	timetypes "github.com/docker/docker/api/types/time"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/daemon/logger/jsonfilelog"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/stdcopy"
 )
@@ -121,30 +120,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger,
 	return container.StartLogger(container.HostConfig.LogConfig)
 }
 
-// StartLogging initializes and starts the container logging stream.
-func (daemon *Daemon) StartLogging(container *container.Container) error {
-	if container.HostConfig.LogConfig.Type == "none" {
-		return nil // do not start logging routines
-	}
-
-	l, err := container.StartLogger(container.HostConfig.LogConfig)
-	if err != nil {
-		return fmt.Errorf("Failed to initialize logging driver: %v", err)
-	}
-
-	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
-	container.LogCopier = copier
-	copier.Run()
-	container.LogDriver = l
-
-	// set LogPath field only for json-file logdriver
-	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
-		container.LogPath = jl.LogPath()
-	}
-
-	return nil
-}
-
 // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.
 func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error {
 	if cfg.Type == "" {

+ 0 - 69
daemon/monitor.go

@@ -3,17 +3,14 @@ package daemon
 import (
 	"errors"
 	"fmt"
-	"io"
 	"runtime"
 	"strconv"
 	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/api/types"
-	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/restartmanager"
-	"github.com/docker/docker/runconfig"
 )
 
 // StateChanged updates daemon state changes from containerd
@@ -133,69 +130,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
 
 	return nil
 }
-
-// AttachStreams is called by libcontainerd to connect the stdio.
-func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
-	var (
-		s  *runconfig.StreamConfig
-		ec *exec.Config
-	)
-
-	c := daemon.containers.Get(id)
-	if c == nil {
-		var err error
-		ec, err = daemon.getExecConfig(id)
-		if err != nil {
-			return fmt.Errorf("no such exec/container: %s", id)
-		}
-		s = ec.StreamConfig
-	} else {
-		s = c.StreamConfig
-		if err := daemon.StartLogging(c); err != nil {
-			c.Reset(false)
-			return err
-		}
-	}
-
-	copyFunc := func(w io.Writer, r io.Reader) {
-		s.Add(1)
-		go func() {
-			if _, err := io.Copy(w, r); err != nil {
-				logrus.Errorf("%v stream copy error: %v", id, err)
-			}
-			s.Done()
-		}()
-	}
-
-	if iop.Stdout != nil {
-		copyFunc(s.Stdout(), iop.Stdout)
-	}
-	if iop.Stderr != nil {
-		copyFunc(s.Stderr(), iop.Stderr)
-	}
-
-	if stdin := s.Stdin(); stdin != nil {
-		if iop.Stdin != nil {
-			go func() {
-				io.Copy(iop.Stdin, stdin)
-				if err := iop.Stdin.Close(); err != nil {
-					logrus.Error(err)
-				}
-			}()
-		}
-	} else {
-		//TODO(swernli): On Windows, not closing stdin when no tty is requested by the exec Config
-		// results in a hang. We should re-evaluate generalizing this fix for all OSes if
-		// we can determine that is the right thing to do more generally.
-		if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") {
-			// tty is enabled, so dont close containerd's iopipe stdin.
-			if iop.Stdin != nil {
-				if err := iop.Stdin.Close(); err != nil {
-					logrus.Error(err)
-				}
-			}
-		}
-	}
-
-	return nil
-}

+ 1 - 1
daemon/monitor_windows.go

@@ -37,7 +37,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon
 
 		// Create a new servicing container, which will start, complete the update, and merge back the
 		// results if it succeeded, all as part of the below function call.
-		if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, newOpts...); err != nil {
+		if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, container.InitializeStdio, newOpts...); err != nil {
 			container.SetExitCode(-1)
 			return fmt.Errorf("Post-run update servicing failed: %s", err)
 		}

+ 1 - 1
daemon/start.go

@@ -149,7 +149,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
 		container.ResetRestartManager(true)
 	}
 
-	if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, createOptions...); err != nil {
+	if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, container.InitializeStdio, createOptions...); err != nil {
 		errDesc := grpc.ErrorDesc(err)
 		logrus.Errorf("Create container failed with error: %s", errDesc)
 		// if we receive an internal error from the initial start of a container then lets

+ 8 - 12
libcontainerd/client_linux.go

@@ -34,7 +34,7 @@ type client struct {
 // AddProcess is the handler for adding a process to an already running
 // container. It's called through docker exec. It returns the system pid of the
 // exec'd process.
-func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) (int, error) {
+func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
@@ -116,14 +116,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
 
 	container.processes[processFriendlyName] = p
 
-	clnt.unlock(containerID)
-
-	if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
-		clnt.lock(containerID)
+	if err := attachStdio(*iopipe); err != nil {
 		p.closeFifos(iopipe)
 		return -1, err
 	}
-	clnt.lock(containerID)
 
 	return int(resp.SystemPid), nil
 }
@@ -153,7 +149,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
 	return p, nil
 }
 
-func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) (err error) {
+func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 
@@ -195,7 +191,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
 		return err
 	}
 
-	return container.start(checkpoint, checkpointDir)
+	return container.start(checkpoint, checkpointDir, attachStdio)
 }
 
 func (clnt *client) Signal(containerID string, sig int) error {
@@ -404,7 +400,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
 	return w
 }
 
-func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
+func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
 	clnt.lock(cont.Id)
 	defer clnt.unlock(cont.Id)
 
@@ -445,7 +441,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
 		return err
 	})
 
-	if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		container.closeFifos(iopipe)
 		return err
 	}
@@ -537,7 +533,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error)
 	return ev, err
 }
 
-func (clnt *client) Restore(containerID string, options ...CreateOption) error {
+func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
 	// Synchronize with live events
 	clnt.remote.Lock()
 	defer clnt.remote.Unlock()
@@ -585,7 +581,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error {
 
 	// container is still alive
 	if clnt.liveRestore {
-		if err := clnt.restore(cont, ev, options...); err != nil {
+		if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
 			logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
 		}
 		return nil

+ 5 - 12
libcontainerd/client_windows.go

@@ -94,7 +94,7 @@ const defaultOwner = "docker"
 //	},
 //	"Servicing": false
 //}
-func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) error {
+func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) error {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	logrus.Debugln("libcontainerd: client.Create() with spec", spec)
@@ -253,7 +253,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
 	// internal structure, start will keep HCS in sync by deleting the
 	// container there.
 	logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID)
-	if err := container.start(); err != nil {
+	if err := container.start(attachStdio); err != nil {
 		clnt.deleteContainer(containerID)
 		return err
 	}
@@ -266,7 +266,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
 // AddProcess is the handler for adding a process to an already running
 // container. It's called through docker exec. It returns the system pid of the
 // exec'd process.
-func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) (int, error) {
+func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) (int, error) {
 	clnt.lock(containerID)
 	defer clnt.unlock(containerID)
 	container, err := clnt.getContainer(containerID)
@@ -343,18 +343,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
 	// Add the process to the container's list of processes
 	container.processes[processFriendlyName] = proc
 
-	// Make sure the lock is not held while calling back into the daemon
-	clnt.unlock(containerID)
-
 	// Tell the engine to attach streams back to the client
-	if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
-		clnt.lock(containerID)
+	if err := attachStdio(*iopipe); err != nil {
 		return -1, err
 	}
 
-	// Lock again so that the defer unlock doesn't fail. (I really don't like this code)
-	clnt.lock(containerID)
-
 	// Spin up a go routine waiting for exit to handle cleanup
 	go container.waitExit(proc, false)
 
@@ -544,7 +537,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
 }
 
 // Restore is the handler for restoring a container
-func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
+func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error {
 	// TODO Windows: Implement this. For now, just tell the backend the container exited.
 	logrus.Debugf("libcontainerd: Restore(%s)", containerID)
 	return clnt.backend.StateChanged(containerID, StateInfo{

+ 3 - 3
libcontainerd/container_linux.go

@@ -88,7 +88,7 @@ func (ctr *container) spec() (*specs.Spec, error) {
 	return &spec, nil
 }
 
-func (ctr *container) start(checkpoint string, checkpointDir string) error {
+func (ctr *container) start(checkpoint string, checkpointDir string, attachStdio StdioCallback) error {
 	spec, err := ctr.spec()
 	if err != nil {
 		return nil
@@ -107,7 +107,7 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error {
 
 	// we need to delay stdin closure after container start or else "stdin close"
 	// event will be rejected by containerd.
-	// stdin closure happens in AttachStreams
+	// stdin closure happens in attachStdio
 	stdin := iopipe.Stdin
 	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
 		var err error
@@ -141,7 +141,7 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error {
 	}
 	ctr.client.appendContainer(ctr)
 
-	if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		ctr.closeFifos(iopipe)
 		return err
 	}

+ 2 - 2
libcontainerd/container_windows.go

@@ -40,7 +40,7 @@ func (ctr *container) newProcess(friendlyName string) *process {
 
 // start starts a created container.
 // Caller needs to lock container ID before calling this method.
-func (ctr *container) start() error {
+func (ctr *container) start(attachStdio StdioCallback) error {
 	var err error
 	isServicing := false
 
@@ -147,7 +147,7 @@ func (ctr *container) start() error {
 
 	ctr.client.appendContainer(ctr)
 
-	if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
+	if err := attachStdio(*iopipe); err != nil {
 		// OK to return the error here, as waitExit will handle tear-down in HCS
 		return err
 	}

+ 6 - 4
libcontainerd/types.go

@@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event?
 // Backend defines callbacks that the client of the library needs to implement.
 type Backend interface {
 	StateChanged(containerID string, state StateInfo) error
-	AttachStreams(processFriendlyName string, io IOPipe) error
 }
 
 // Client provides access to containerd features.
 type Client interface {
-	Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) error
+	Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) error
 	Signal(containerID string, sig int) error
 	SignalProcess(containerID string, processFriendlyName string, sig int) error
-	AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) (int, error)
+	AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) (int, error)
 	Resize(containerID, processFriendlyName string, width, height int) error
 	Pause(containerID string) error
 	Resume(containerID string) error
-	Restore(containerID string, options ...CreateOption) error
+	Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error
 	Stats(containerID string) (*Stats, error)
 	GetPidsForContainer(containerID string) ([]int, error)
 	Summary(containerID string) ([]Summary, error)
@@ -58,6 +57,9 @@ type CreateOption interface {
 	Apply(interface{}) error
 }
 
+// StdioCallback is called to connect a container or process stdio.
+type StdioCallback func(IOPipe) error
+
 // IOPipe contains the stdio streams.
 type IOPipe struct {
 	Stdin    io.WriteCloser

+ 19 - 18
plugin/manager.go

@@ -98,24 +98,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
 	return nil
 }
 
-// AttachStreams attaches io streams to the plugin
-func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error {
-	iop.Stdin.Close()
-
-	logger := logrus.New()
-	logger.Hooks.Add(logHook{id})
-	// TODO: cache writer per id
-	w := logger.Writer()
-	go func() {
-		io.Copy(w, iop.Stdout)
-	}()
-	go func() {
-		// TODO: update logrus and use logger.WriterLevel
-		io.Copy(w, iop.Stderr)
-	}()
-	return nil
-}
-
 func (pm *Manager) init() error {
 	dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
 	if err != nil {
@@ -167,3 +149,22 @@ func (l logHook) Fire(entry *logrus.Entry) error {
 	entry.Data = logrus.Fields{"plugin": l.id}
 	return nil
 }
+
+func attachToLog(id string) func(libcontainerd.IOPipe) error {
+	return func(iop libcontainerd.IOPipe) error {
+		iop.Stdin.Close()
+
+		logger := logrus.New()
+		logger.Hooks.Add(logHook{id})
+		// TODO: cache writer per id
+		w := logger.Writer()
+		go func() {
+			io.Copy(w, iop.Stdout)
+		}()
+		go func() {
+			// TODO: update logrus and use logger.WriterLevel
+			io.Copy(w, iop.Stderr)
+		}()
+		return nil
+	}
+}

+ 2 - 2
plugin/manager_linux.go

@@ -26,7 +26,7 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
 	p.Lock()
 	p.Restart = true
 	p.Unlock()
-	if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec)); err != nil {
+	if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), attachToLog(p.GetID())); err != nil {
 		return err
 	}
 
@@ -45,7 +45,7 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
 }
 
 func (pm *Manager) restore(p *v2.Plugin) error {
-	return pm.containerdClient.Restore(p.GetID())
+	return pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID()))
 }
 
 func (pm *Manager) disable(p *v2.Plugin) error {

+ 34 - 0
runconfig/streams.go

@@ -7,8 +7,11 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/libcontainerd"
 	"github.com/docker/docker/pkg/broadcaster"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/pools"
 )
 
 // StreamConfig holds information about I/O streams managed together.
@@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error {
 
 	return nil
 }
+
+// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
+func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) {
+	copyFunc := func(w io.Writer, r io.Reader) {
+		streamConfig.Add(1)
+		go func() {
+			if _, err := pools.Copy(w, r); err != nil {
+				logrus.Errorf("stream copy error: %+v", err)
+			}
+			streamConfig.Done()
+		}()
+	}
+
+	if iop.Stdout != nil {
+		copyFunc(streamConfig.Stdout(), iop.Stdout)
+	}
+	if iop.Stderr != nil {
+		copyFunc(streamConfig.Stderr(), iop.Stderr)
+	}
+
+	if stdin := streamConfig.Stdin(); stdin != nil {
+		if iop.Stdin != nil {
+			go func() {
+				pools.Copy(iop.Stdin, stdin)
+				if err := iop.Stdin.Close(); err != nil {
+					logrus.Error("failed to clise stdin: %+v", err)
+				}
+			}()
+		}
+	}
+}