Bläddra i källkod

Merge pull request #18135 from calavera/exec_store

Move exec store to its own package inside the daemon.
Alexander Morozov 9 år sedan
förälder
incheckning
afe6c1f30c
7 ändrade filer med 174 tillägg och 135 borttagningar
  1. 3 2
      daemon/container.go
  2. 3 2
      daemon/daemon.go
  3. 48 127
      daemon/exec.go
  4. 115 0
      daemon/exec/exec.go
  5. 2 1
      daemon/inspect.go
  6. 1 1
      daemon/resize.go
  7. 2 2
      daemon/start.go

+ 3 - 2
daemon/container.go

@@ -14,6 +14,7 @@ import (
 	"github.com/opencontainers/runc/libcontainer/label"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/execdriver"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
@@ -62,7 +63,7 @@ type CommonContainer struct {
 	hostConfig             *runconfig.HostConfig
 	command                *execdriver.Command
 	monitor                *containerMonitor
-	execCommands           *execStore
+	execCommands           *exec.Store
 	// logDriver for closing
 	logDriver logger.Logger
 	logCopier *logger.Copier
@@ -75,7 +76,7 @@ func newBaseContainer(id, root string) *Container {
 		CommonContainer: CommonContainer{
 			ID:           id,
 			State:        NewState(),
-			execCommands: newExecStore(),
+			execCommands: exec.NewStore(),
 			root:         root,
 			MountPoints:  make(map[string]*volume.MountPoint),
 			StreamConfig: runconfig.NewStreamConfig(),

+ 3 - 2
daemon/daemon.go

@@ -22,6 +22,7 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/cliconfig"
 	"github.com/docker/docker/daemon/events"
+	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/execdriver"
 	"github.com/docker/docker/daemon/execdriver/execdrivers"
 	"github.com/docker/docker/daemon/graphdriver"
@@ -106,7 +107,7 @@ type Daemon struct {
 	repository       string
 	sysInitPath      string
 	containers       *contStore
-	execCommands     *execStore
+	execCommands     *exec.Store
 	graph            *graph.Graph
 	repositories     *graph.TagStore
 	idIndex          *truncindex.TruncIndex
@@ -790,7 +791,7 @@ func NewDaemon(config *Config, registryService *registry.Service) (daemon *Daemo
 	d.ID = trustKey.PublicKey().KeyID()
 	d.repository = daemonRepo
 	d.containers = &contStore{s: make(map[string]*Container)}
-	d.execCommands = newExecStore()
+	d.execCommands = exec.NewStore()
 	d.graph = g
 	d.repositories = repositories
 	d.idIndex = truncindex.NewTruncIndex([]string{})

+ 48 - 127
daemon/exec.go

@@ -3,91 +3,23 @@ package daemon
 import (
 	"io"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/execdriver"
 	derr "github.com/docker/docker/errors"
 	"github.com/docker/docker/pkg/pools"
 	"github.com/docker/docker/pkg/promise"
-	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/stringutils"
 	"github.com/docker/docker/runconfig"
 )
 
-// ExecConfig holds the configurations for execs. The Daemon keeps
-// track of both running and finished execs so that they can be
-// examined both during and after completion.
-type ExecConfig struct {
-	sync.Mutex
-	ID            string
-	Running       bool
-	ExitCode      int
-	ProcessConfig *execdriver.ProcessConfig
-	OpenStdin     bool
-	OpenStderr    bool
-	OpenStdout    bool
-	streamConfig  *runconfig.StreamConfig
-	Container     *Container
-	canRemove     bool
-
-	// waitStart will be closed immediately after the exec is really started.
-	waitStart chan struct{}
-}
-
-type execStore struct {
-	s map[string]*ExecConfig
-	sync.RWMutex
-}
-
-func newExecStore() *execStore {
-	return &execStore{s: make(map[string]*ExecConfig, 0)}
-}
-
-func (e *execStore) Add(id string, ExecConfig *ExecConfig) {
-	e.Lock()
-	e.s[id] = ExecConfig
-	e.Unlock()
-}
-
-func (e *execStore) Get(id string) *ExecConfig {
-	e.RLock()
-	res := e.s[id]
-	e.RUnlock()
-	return res
-}
-
-func (e *execStore) Delete(id string) {
-	e.Lock()
-	delete(e.s, id)
-	e.Unlock()
-}
-
-func (e *execStore) List() []string {
-	var IDs []string
-	e.RLock()
-	for id := range e.s {
-		IDs = append(IDs, id)
-	}
-	e.RUnlock()
-	return IDs
-}
-
-func (ExecConfig *ExecConfig) resize(h, w int) error {
-	select {
-	case <-ExecConfig.waitStart:
-	case <-time.After(time.Second):
-		return derr.ErrorCodeExecResize.WithArgs(ExecConfig.ID)
-	}
-	return ExecConfig.ProcessConfig.Terminal.Resize(h, w)
-}
-
-func (d *Daemon) registerExecCommand(ExecConfig *ExecConfig) {
+func (d *Daemon) registerExecCommand(container *Container, config *exec.Config) {
 	// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
-	ExecConfig.Container.execCommands.Add(ExecConfig.ID, ExecConfig)
+	container.execCommands.Add(config.ID, config)
 	// Storing execs in daemon for easy access via remote API.
-	d.execCommands.Add(ExecConfig.ID, ExecConfig)
+	d.execCommands.Add(config.ID, config)
 }
 
 // ExecExists looks up the exec instance and returns a bool if it exists or not.
@@ -101,7 +33,7 @@ func (d *Daemon) ExecExists(name string) (bool, error) {
 
 // getExecConfig looks up the exec instance by name. If the container associated
 // with the exec instance is stopped or paused, it will return an error.
-func (d *Daemon) getExecConfig(name string) (*ExecConfig, error) {
+func (d *Daemon) getExecConfig(name string) (*exec.Config, error) {
 	ec := d.execCommands.Get(name)
 
 	// If the exec is found but its container is not in the daemon's list of
@@ -110,22 +42,24 @@ func (d *Daemon) getExecConfig(name string) (*ExecConfig, error) {
 	// the user sees the same error now that they will after the
 	// 5 minute clean-up loop is run which erases old/dead execs.
 
-	if ec != nil && d.containers.Get(ec.Container.ID) != nil {
-		if !ec.Container.IsRunning() {
-			return nil, derr.ErrorCodeContainerNotRunning.WithArgs(ec.Container.ID, ec.Container.State.String())
-		}
-		if ec.Container.isPaused() {
-			return nil, derr.ErrorCodeExecPaused.WithArgs(ec.Container.ID)
+	if ec != nil {
+		if container := d.containers.Get(ec.ContainerID); container != nil {
+			if !container.IsRunning() {
+				return nil, derr.ErrorCodeContainerNotRunning.WithArgs(container.ID, container.State.String())
+			}
+			if container.isPaused() {
+				return nil, derr.ErrorCodeExecPaused.WithArgs(container.ID)
+			}
+			return ec, nil
 		}
-		return ec, nil
 	}
 
 	return nil, derr.ErrorCodeNoExecID.WithArgs(name)
 }
 
-func (d *Daemon) unregisterExecCommand(ExecConfig *ExecConfig) {
-	ExecConfig.Container.execCommands.Delete(ExecConfig.ID)
-	d.execCommands.Delete(ExecConfig.ID)
+func (d *Daemon) unregisterExecCommand(container *Container, execConfig *exec.Config) {
+	container.execCommands.Delete(execConfig.ID)
+	d.execCommands.Delete(execConfig.ID)
 }
 
 func (d *Daemon) getActiveContainer(name string) (*Container, error) {
@@ -162,23 +96,18 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro
 	}
 	setPlatformSpecificExecProcessConfig(config, container, processConfig)
 
-	ExecConfig := &ExecConfig{
-		ID:            stringid.GenerateNonCryptoID(),
-		OpenStdin:     config.AttachStdin,
-		OpenStdout:    config.AttachStdout,
-		OpenStderr:    config.AttachStderr,
-		streamConfig:  runconfig.NewStreamConfig(),
-		ProcessConfig: processConfig,
-		Container:     container,
-		Running:       false,
-		waitStart:     make(chan struct{}),
-	}
+	execConfig := exec.NewConfig()
+	execConfig.OpenStdin = config.AttachStdin
+	execConfig.OpenStdout = config.AttachStdout
+	execConfig.OpenStderr = config.AttachStderr
+	execConfig.ProcessConfig = processConfig
+	execConfig.ContainerID = container.ID
 
-	d.registerExecCommand(ExecConfig)
+	d.registerExecCommand(container, execConfig)
 
-	d.LogContainerEvent(container, "exec_create: "+ExecConfig.ProcessConfig.Entrypoint+" "+strings.Join(ExecConfig.ProcessConfig.Arguments, " "))
+	d.LogContainerEvent(container, "exec_create: "+execConfig.ProcessConfig.Entrypoint+" "+strings.Join(execConfig.ProcessConfig.Arguments, " "))
 
-	return ExecConfig.ID, nil
+	return execConfig.ID, nil
 }
 
 // ContainerExecStart starts a previously set up exec instance. The
@@ -202,8 +131,8 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
 	ec.Running = true
 	ec.Unlock()
 
-	logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID)
-	container := ec.Container
+	container := d.containers.Get(ec.ContainerID)
+	logrus.Debugf("starting exec command %s in container %s", ec.ID, container.ID)
 	d.LogContainerEvent(container, "exec_start: "+ec.ProcessConfig.Entrypoint+" "+strings.Join(ec.ProcessConfig.Arguments, " "))
 
 	if ec.OpenStdin {
@@ -223,12 +152,12 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
 	}
 
 	if ec.OpenStdin {
-		ec.streamConfig.NewInputPipes()
+		ec.NewInputPipes()
 	} else {
-		ec.streamConfig.NewNopInputPipe()
+		ec.NewNopInputPipe()
 	}
 
-	attachErr := attach(ec.streamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
+	attachErr := attach(ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr)
 
 	execErr := make(chan error)
 
@@ -263,19 +192,19 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
 }
 
 // Exec calls the underlying exec driver to run
-func (d *Daemon) Exec(c *Container, ExecConfig *ExecConfig, pipes *execdriver.Pipes, startCallback execdriver.DriverCallback) (int, error) {
+func (d *Daemon) Exec(c *Container, execConfig *exec.Config, pipes *execdriver.Pipes, startCallback execdriver.DriverCallback) (int, error) {
 	hooks := execdriver.Hooks{
 		Start: startCallback,
 	}
-	exitStatus, err := d.execDriver.Exec(c.command, ExecConfig.ProcessConfig, pipes, hooks)
+	exitStatus, err := d.execDriver.Exec(c.command, execConfig.ProcessConfig, pipes, hooks)
 
 	// On err, make sure we don't leave ExitCode at zero
 	if err != nil && exitStatus == 0 {
 		exitStatus = 128
 	}
 
-	ExecConfig.ExitCode = exitStatus
-	ExecConfig.Running = false
+	execConfig.ExitCode = exitStatus
+	execConfig.Running = false
 
 	return exitStatus, err
 }
@@ -288,13 +217,13 @@ func (d *Daemon) execCommandGC() {
 			cleaned          int
 			liveExecCommands = d.containerExecIds()
 		)
-		for id, config := range d.execCommands.s {
-			if config.canRemove {
+		for id, config := range d.execCommands.Commands() {
+			if config.CanRemove {
 				cleaned++
 				d.execCommands.Delete(id)
 			} else {
 				if _, exists := liveExecCommands[id]; !exists {
-					config.canRemove = true
+					config.CanRemove = true
 				}
 			}
 		}
@@ -316,7 +245,7 @@ func (d *Daemon) containerExecIds() map[string]struct{} {
 	return ids
 }
 
-func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error {
+func (d *Daemon) containerExec(container *Container, ec *exec.Config) error {
 	container.Lock()
 	defer container.Unlock()
 
@@ -329,43 +258,35 @@ func (d *Daemon) containerExec(container *Container, ec *ExecConfig) error {
 				c.Close()
 			}
 		}
-		close(ec.waitStart)
+		ec.Close()
 		return nil
 	}
 
 	// We use a callback here instead of a goroutine and an chan for
 	// synchronization purposes
 	cErr := promise.Go(func() error { return d.monitorExec(container, ec, callback) })
-
-	// Exec should not return until the process is actually running
-	select {
-	case <-ec.waitStart:
-	case err := <-cErr:
-		return err
-	}
-
-	return nil
+	return ec.Wait(cErr)
 }
 
-func (d *Daemon) monitorExec(container *Container, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error {
-	pipes := execdriver.NewPipes(ExecConfig.streamConfig.Stdin(), ExecConfig.streamConfig.Stdout(), ExecConfig.streamConfig.Stderr(), ExecConfig.OpenStdin)
-	exitCode, err := d.Exec(container, ExecConfig, pipes, callback)
+func (d *Daemon) monitorExec(container *Container, execConfig *exec.Config, callback execdriver.DriverCallback) error {
+	pipes := execdriver.NewPipes(execConfig.Stdin(), execConfig.Stdout(), execConfig.Stderr(), execConfig.OpenStdin)
+	exitCode, err := d.Exec(container, execConfig, pipes, callback)
 	if err != nil {
 		logrus.Errorf("Error running command in existing container %s: %s", container.ID, err)
 	}
 	logrus.Debugf("Exec task in container %s exited with code %d", container.ID, exitCode)
 
-	if err := ExecConfig.streamConfig.CloseStreams(); err != nil {
+	if err := execConfig.CloseStreams(); err != nil {
 		logrus.Errorf("%s: %s", container.ID, err)
 	}
 
-	if ExecConfig.ProcessConfig.Terminal != nil {
-		if err := ExecConfig.ProcessConfig.Terminal.Close(); err != nil {
+	if execConfig.ProcessConfig.Terminal != nil {
+		if err := execConfig.ProcessConfig.Terminal.Close(); err != nil {
 			logrus.Errorf("Error closing terminal while running in container %s: %s", container.ID, err)
 		}
 	}
 	// remove the exec command from the container's store only and not the
 	// daemon's store so that the exec command can be inspected.
-	container.execCommands.Delete(ExecConfig.ID)
+	container.execCommands.Delete(execConfig.ID)
 	return err
 }

+ 115 - 0
daemon/exec/exec.go

@@ -0,0 +1,115 @@
+package exec
+
+import (
+	"sync"
+	"time"
+
+	"github.com/docker/docker/daemon/execdriver"
+	derr "github.com/docker/docker/errors"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/docker/runconfig"
+)
+
+// Config holds the configurations for execs. The Daemon keeps
+// track of both running and finished execs so that they can be
+// examined both during and after completion.
+type Config struct {
+	sync.Mutex
+	*runconfig.StreamConfig
+	ID            string
+	Running       bool
+	ExitCode      int
+	ProcessConfig *execdriver.ProcessConfig
+	OpenStdin     bool
+	OpenStderr    bool
+	OpenStdout    bool
+	CanRemove     bool
+	ContainerID   string
+
+	// waitStart will be closed immediately after the exec is really started.
+	waitStart chan struct{}
+}
+
+// NewConfig initializes the a new exec configuration
+func NewConfig() *Config {
+	return &Config{
+		ID:           stringid.GenerateNonCryptoID(),
+		StreamConfig: runconfig.NewStreamConfig(),
+		waitStart:    make(chan struct{}),
+	}
+}
+
+// Store keeps track of the exec configurations.
+type Store struct {
+	commands map[string]*Config
+	sync.RWMutex
+}
+
+// NewStore initializes a new exec store.
+func NewStore() *Store {
+	return &Store{commands: make(map[string]*Config, 0)}
+}
+
+// Commands returns the exec configurations in the store.
+func (e *Store) Commands() map[string]*Config {
+	return e.commands
+}
+
+// Add adds a new exec configuration to the store.
+func (e *Store) Add(id string, Config *Config) {
+	e.Lock()
+	e.commands[id] = Config
+	e.Unlock()
+}
+
+// Get returns an exec configuration by its id.
+func (e *Store) Get(id string) *Config {
+	e.RLock()
+	res := e.commands[id]
+	e.RUnlock()
+	return res
+}
+
+// Delete removes an exec configuration from the store.
+func (e *Store) Delete(id string) {
+	e.Lock()
+	delete(e.commands, id)
+	e.Unlock()
+}
+
+// List returns the list of exec ids in the store.
+func (e *Store) List() []string {
+	var IDs []string
+	e.RLock()
+	for id := range e.commands {
+		IDs = append(IDs, id)
+	}
+	e.RUnlock()
+	return IDs
+}
+
+// Wait waits until the exec process finishes or there is an error in the error channel.
+func (c *Config) Wait(cErr chan error) error {
+	// Exec should not return until the process is actually running
+	select {
+	case <-c.waitStart:
+	case err := <-cErr:
+		return err
+	}
+	return nil
+}
+
+// Close closes the wait channel for the progress.
+func (c *Config) Close() {
+	close(c.waitStart)
+}
+
+// Resize changes the size of the terminal for the exec process.
+func (c *Config) Resize(h, w int) error {
+	select {
+	case <-c.waitStart:
+	case <-time.After(time.Second):
+		return derr.ErrorCodeExecResize.WithArgs(c.ID)
+	}
+	return c.ProcessConfig.Terminal.Resize(h, w)
+}

+ 2 - 1
daemon/inspect.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/versions/v1p20"
+	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/network"
 )
 
@@ -159,7 +160,7 @@ func (daemon *Daemon) getInspectData(container *Container, size bool) (*types.Co
 
 // ContainerExecInspect returns low-level information about the exec
 // command. An error is returned if the exec cannot be found.
-func (daemon *Daemon) ContainerExecInspect(id string) (*ExecConfig, error) {
+func (daemon *Daemon) ContainerExecInspect(id string) (*exec.Config, error) {
 	eConfig, err := daemon.getExecConfig(id)
 	if err != nil {
 		return nil, err

+ 1 - 1
daemon/resize.go

@@ -29,5 +29,5 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
 		return err
 	}
 
-	return ExecConfig.resize(height, width)
+	return ExecConfig.Resize(height, width)
 }

+ 2 - 2
daemon/start.go

@@ -160,8 +160,8 @@ func (daemon *Daemon) Cleanup(container *Container) {
 
 	daemon.conditionalUnmountOnCleanup(container)
 
-	for _, eConfig := range container.execCommands.s {
-		daemon.unregisterExecCommand(eConfig)
+	for _, eConfig := range container.execCommands.Commands() {
+		daemon.unregisterExecCommand(container, eConfig)
 	}
 
 	if err := container.unmountVolumes(false); err != nil {