浏览代码

Refactor container monitor into type

Signed-off-by: Michael Crosby <michael@docker.com>
Michael Crosby 11 年之前
父节点
当前提交
2b0776c883
共有 2 个文件被更改,包括 204 次插入127 次删除
  1. 15 127
      daemon/container.go
  2. 189 0
      daemon/monitor.go

+ 15 - 127
daemon/container.go

@@ -7,7 +7,6 @@ import (
 	"io"
 	"io/ioutil"
 	"os"
-	"os/exec"
 	"path"
 	"path/filepath"
 	"strings"
@@ -84,8 +83,8 @@ type Container struct {
 	VolumesRW  map[string]bool
 	hostConfig *runconfig.HostConfig
 
-	activeLinks   map[string]*links.Link
-	requestedStop bool
+	activeLinks map[string]*links.Link
+	monitor     *containerMonitor
 }
 
 func (container *Container) FromDisk() error {
@@ -496,110 +495,6 @@ func (container *Container) releaseNetwork() {
 	container.NetworkSettings = &NetworkSettings{}
 }
 
-func (container *Container) monitor(callback execdriver.StartCallback) error {
-	var (
-		err       error
-		exitCode  int
-		failCount int
-		exit      bool
-
-		policy = container.hostConfig.RestartPolicy
-	)
-
-	if err := container.startLoggingToDisk(); err != nil {
-		// TODO: crosbymichael cleanup IO, network, and mounts
-		return err
-	}
-
-	// reset the restart count
-	container.RestartCount = -1
-	container.requestedStop = false
-
-	for {
-		container.RestartCount++
-
-		pipes := execdriver.NewPipes(container.stdin, container.stdout, container.stderr, container.Config.OpenStdin)
-
-		if exitCode, err = container.daemon.Run(container, pipes, callback); err != nil {
-			failCount++
-
-			if failCount == policy.MaximumRetryCount {
-				exit = true
-			}
-
-			utils.Errorf("Error running container: %s", err)
-		}
-
-		// We still wait to set the state as stopped and ensure that the locks were released
-		container.State.SetStopped(exitCode)
-
-		if container.Config.OpenStdin {
-			if err := container.stdin.Close(); err != nil {
-				utils.Errorf("%s: Error close stdin: %s", container.ID, err)
-			}
-		}
-
-		if err := container.stdout.Clean(); err != nil {
-			utils.Errorf("%s: Error close stdout: %s", container.ID, err)
-		}
-
-		if err := container.stderr.Clean(); err != nil {
-			utils.Errorf("%s: Error close stderr: %s", container.ID, err)
-		}
-
-		if container.command != nil && container.command.Terminal != nil {
-			if err := container.command.Terminal.Close(); err != nil {
-				utils.Errorf("%s: Error closing terminal: %s", container.ID, err)
-			}
-		}
-
-		// Re-create a brand new stdin pipe once the container exited
-		if container.Config.OpenStdin {
-			container.stdin, container.stdinPipe = io.Pipe()
-		}
-
-		if container.daemon != nil && container.daemon.srv != nil {
-			container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image))
-		}
-
-		if (policy.Name == "always" || (policy.Name == "on-failure" && exitCode != 0)) && !container.requestedStop || !exit {
-			container.command.Cmd = copyCmd(&container.command.Cmd)
-
-			time.Sleep(1 * time.Second)
-
-			continue
-		}
-
-		// Cleanup networking and mounts
-		container.cleanup()
-
-		if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
-			// FIXME: here is race condition between two RUN instructions in Dockerfile
-			// because they share same runconfig and change image. Must be fixed
-			// in builder/builder.go
-			if err := container.toDisk(); err != nil {
-				utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err)
-			}
-		}
-
-		return err
-	}
-}
-
-func copyCmd(c *exec.Cmd) exec.Cmd {
-	return exec.Cmd{
-		Stdin:       c.Stdin,
-		Stdout:      c.Stdout,
-		Stderr:      c.Stderr,
-		Path:        c.Path,
-		Env:         c.Env,
-		ExtraFiles:  c.ExtraFiles,
-		Args:        c.Args,
-		Dir:         c.Dir,
-		SysProcAttr: c.SysProcAttr,
-	}
-}
-
 // cleanup releases any network resources allocated to the container along with any rules
 // around how containers are linked together.  It also unmounts the container's root filesystem.
 func (container *Container) cleanup() {
@@ -630,7 +525,10 @@ func (container *Container) KillSig(sig int) error {
 	if !container.State.IsRunning() {
 		return nil
 	}
-	container.requestedStop = true
+
+	// signal to the monitor that it should not restart the container
+	// after we send the kill signal
+	container.monitor.ExitOnNext()
 
 	return container.daemon.Kill(container, sig)
 }
@@ -1174,27 +1072,17 @@ func (container *Container) startLoggingToDisk() error {
 }
 
 func (container *Container) waitForStart() error {
-	waitStart := make(chan struct{})
-	callback := func(command *execdriver.Command) {
-		if command.Tty {
-			// The callback is called after the process Start()
-			// so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace
-			// which we close here.
-			if c, ok := command.Stdout.(io.Closer); ok {
-				c.Close()
-			}
-		}
+	container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy)
 
-		container.State.SetRunning(command.Pid())
-		if err := container.toDisk(); err != nil {
-			log.Debugf("%s", err)
-		}
-		close(waitStart)
-	}
+	var (
+		cErr      = utils.Go(container.monitor.Start)
+		waitStart = make(chan struct{})
+	)
 
-	// We use a callback here instead of a goroutine and an chan for
-	// syncronization purposes
-	cErr := utils.Go(func() error { return container.monitor(callback) })
+	go func() {
+		container.State.WaitRunning(-1 * time.Second)
+		close(waitStart)
+	}()
 
 	// Start should not return until the process is actually running
 	select {

+ 189 - 0
daemon/monitor.go

@@ -0,0 +1,189 @@
+package daemon
+
+import (
+	"io"
+	"os/exec"
+	"sync"
+	"time"
+
+	"github.com/docker/docker/daemon/execdriver"
+	"github.com/docker/docker/runconfig"
+	"github.com/docker/docker/utils"
+)
+
+// containerMonitor monitors the execution of a container's main process.
+// If a restart policy is specified for the cotnainer the monitor will ensure that the
+// process is restarted based on the rules of the policy.  When the container is finally stopped
+// the monitor will reset and cleanup any of the container resources such as networking allocations
+// and the rootfs
+type containerMonitor struct {
+	mux sync.Mutex
+
+	container     *Container
+	restartPolicy runconfig.RestartPolicy
+	failureCount  int
+	shouldStop    bool
+}
+
+func newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor {
+	return &containerMonitor{
+		container:     container,
+		restartPolicy: policy,
+	}
+}
+
+// Stop signals to the container monitor that it should stop monitoring the container
+// for exits the next time the process dies
+func (m *containerMonitor) ExitOnNext() {
+	m.mux.Lock()
+	m.shouldStop = true
+	m.mux.Unlock()
+}
+
+// Close closes the container's resources such as networking allocations and
+// unmounts the contatiner's root filesystem
+func (m *containerMonitor) Close() error {
+	// Cleanup networking and mounts
+	m.container.cleanup()
+
+	if m.container.daemon != nil && m.container.daemon.srv != nil && m.container.daemon.srv.IsRunning() {
+		// FIXME: here is race condition between two RUN instructions in Dockerfile
+		// because they share same runconfig and change image. Must be fixed
+		// in builder/builder.go
+		if err := m.container.toDisk(); err != nil {
+			utils.Errorf("Error dumping container %s state to disk: %s\n", m.container.ID, err)
+
+			return err
+		}
+	}
+
+	return nil
+}
+
+// reset resets the container's IO and ensures that the command is able to be executed again
+// by copying the data into a new struct
+func (m *containerMonitor) reset() {
+	container := m.container
+
+	if container.Config.OpenStdin {
+		if err := container.stdin.Close(); err != nil {
+			utils.Errorf("%s: Error close stdin: %s", container.ID, err)
+		}
+	}
+
+	if err := container.stdout.Clean(); err != nil {
+		utils.Errorf("%s: Error close stdout: %s", container.ID, err)
+	}
+
+	if err := container.stderr.Clean(); err != nil {
+		utils.Errorf("%s: Error close stderr: %s", container.ID, err)
+	}
+
+	if container.command != nil && container.command.Terminal != nil {
+		if err := container.command.Terminal.Close(); err != nil {
+			utils.Errorf("%s: Error closing terminal: %s", container.ID, err)
+		}
+	}
+
+	// Re-create a brand new stdin pipe once the container exited
+	if container.Config.OpenStdin {
+		container.stdin, container.stdinPipe = io.Pipe()
+	}
+
+	if container.daemon != nil && container.daemon.srv != nil {
+		container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image))
+	}
+
+	c := container.command.Cmd
+
+	container.command.Cmd = exec.Cmd{
+		Stdin:       c.Stdin,
+		Stdout:      c.Stdout,
+		Stderr:      c.Stderr,
+		Path:        c.Path,
+		Env:         c.Env,
+		ExtraFiles:  c.ExtraFiles,
+		Args:        c.Args,
+		Dir:         c.Dir,
+		SysProcAttr: c.SysProcAttr,
+	}
+}
+
+// Start starts the containers process and monitors it according to the restart policy
+func (m *containerMonitor) Start() error {
+	var (
+		err      error
+		exitCode int
+	)
+	defer m.Close()
+
+	// reset the restart count
+	m.container.RestartCount = -1
+
+	for !m.shouldStop {
+		m.container.RestartCount++
+		if err := m.container.startLoggingToDisk(); err != nil {
+			m.reset()
+
+			return err
+		}
+
+		pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin)
+
+		if exitCode, err = m.container.daemon.Run(m.container, pipes, m.callback); err != nil {
+			m.failureCount++
+
+			if m.failureCount == m.restartPolicy.MaximumRetryCount {
+				m.ExitOnNext()
+			}
+
+			utils.Errorf("Error running container: %s", err)
+		}
+
+		// We still wait to set the state as stopped and ensure that the locks were released
+		m.container.State.SetStopped(exitCode)
+
+		m.reset()
+
+		if m.shouldRestart(exitCode) {
+			time.Sleep(1 * time.Second)
+
+			continue
+		}
+
+		break
+	}
+
+	return err
+}
+
+func (m *containerMonitor) shouldRestart(exitCode int) bool {
+	m.mux.Lock()
+
+	shouldRestart := (m.restartPolicy.Name == "always" ||
+		(m.restartPolicy.Name == "on-failure" && exitCode != 0)) &&
+		!m.shouldStop
+
+	m.mux.Unlock()
+
+	return shouldRestart
+}
+
+// callback ensures that the container's state is properly updated after we
+// received ack from the execution drivers
+func (m *containerMonitor) callback(command *execdriver.Command) {
+	if command.Tty {
+		// The callback is called after the process Start()
+		// so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace
+		// which we close here.
+		if c, ok := command.Stdout.(io.Closer); ok {
+			c.Close()
+		}
+	}
+
+	m.container.State.SetRunning(command.Pid())
+
+	if err := m.container.ToDisk(); err != nil {
+		utils.Debugf("%s", err)
+	}
+}