Bläddra i källkod

Merge pull request #20729 from estesp/pipework

Add synchronization and closure to IO pipes in userns path
Alexander Morozov 9 år sedan
förälder
incheckning
51302c29ed
2 ändrade filer med 54 tillägg och 21 borttagningar
  1. 44 20
      daemon/execdriver/native/driver.go
  2. 10 1
      daemon/execdriver/native/exec.go

+ 44 - 20
daemon/execdriver/native/driver.go

@@ -152,7 +152,9 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
 		User: c.ProcessConfig.User,
 	}
 
-	if err := setupPipes(container, &c.ProcessConfig, p, pipes); err != nil {
+	wg := sync.WaitGroup{}
+	writers, err := setupPipes(container, &c.ProcessConfig, p, pipes, &wg)
+	if err != nil {
 		return execdriver.ExitStatus{ExitCode: -1}, err
 	}
 
@@ -174,6 +176,10 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
 		return execdriver.ExitStatus{ExitCode: -1}, err
 	}
 
+	//close the write end of any opened pipes now that they are dup'ed into the container
+	for _, writer := range writers {
+		writer.Close()
+	}
 	// 'oom' is used to emit 'oom' events to the eventstream, 'oomKilled' is used
 	// to set the 'OOMKilled' flag in state
 	oom := notifyOnOOM(cont)
@@ -202,6 +208,9 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, hooks execd
 		}
 		ps = execErr.ProcessState
 	}
+	// wait for all IO goroutine copiers to finish
+	wg.Wait()
+
 	cont.Destroy()
 	destroyed = true
 	// oomKilled will have an oom event if any process within the container was
@@ -480,24 +489,26 @@ func (t *TtyConsole) Close() error {
 	return t.console.Close()
 }
 
-func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConfig, p *libcontainer.Process, pipes *execdriver.Pipes) error {
+func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConfig, p *libcontainer.Process, pipes *execdriver.Pipes, wg *sync.WaitGroup) ([]io.WriteCloser, error) {
+
+	writers := []io.WriteCloser{}
 
 	rootuid, err := container.HostUID()
 	if err != nil {
-		return err
+		return writers, err
 	}
 
 	if processConfig.Tty {
 		cons, err := p.NewConsole(rootuid)
 		if err != nil {
-			return err
+			return writers, err
 		}
 		term, err := NewTtyConsole(cons, pipes)
 		if err != nil {
-			return err
+			return writers, err
 		}
 		processConfig.Terminal = term
-		return nil
+		return writers, nil
 	}
 	// not a tty--set up stdio pipes
 	term := &execdriver.StdConsole{}
@@ -512,7 +523,7 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
 
 		r, w, err := os.Pipe()
 		if err != nil {
-			return err
+			return writers, err
 		}
 		if pipes.Stdin != nil {
 			go func() {
@@ -521,23 +532,32 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
 			}()
 			p.Stdin = r
 		}
-		return nil
+		return writers, nil
 	}
 
 	// if we have user namespaces enabled (rootuid != 0), we will set
 	// up os pipes for stderr, stdout, stdin so we can chown them to
 	// the proper ownership to allow for proper access to the underlying
 	// fds
-	var fds []int
+	var fds []uintptr
+
+	copyPipes := func(out io.Writer, in io.ReadCloser) {
+		defer wg.Done()
+		io.Copy(out, in)
+		in.Close()
+	}
 
 	//setup stdout
 	r, w, err := os.Pipe()
 	if err != nil {
-		return err
+		w.Close()
+		return writers, err
 	}
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
+	writers = append(writers, w)
+	fds = append(fds, r.Fd(), w.Fd())
 	if pipes.Stdout != nil {
-		go io.Copy(pipes.Stdout, r)
+		wg.Add(1)
+		go copyPipes(pipes.Stdout, r)
 	}
 	term.Closers = append(term.Closers, r)
 	p.Stdout = w
@@ -545,11 +565,14 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
 	//setup stderr
 	r, w, err = os.Pipe()
 	if err != nil {
-		return err
+		w.Close()
+		return writers, err
 	}
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
+	writers = append(writers, w)
+	fds = append(fds, r.Fd(), w.Fd())
 	if pipes.Stderr != nil {
-		go io.Copy(pipes.Stderr, r)
+		wg.Add(1)
+		go copyPipes(pipes.Stderr, r)
 	}
 	term.Closers = append(term.Closers, r)
 	p.Stderr = w
@@ -557,9 +580,10 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
 	//setup stdin
 	r, w, err = os.Pipe()
 	if err != nil {
-		return err
+		r.Close()
+		return writers, err
 	}
-	fds = append(fds, int(r.Fd()), int(w.Fd()))
+	fds = append(fds, r.Fd(), w.Fd())
 	if pipes.Stdin != nil {
 		go func() {
 			io.Copy(w, pipes.Stdin)
@@ -568,11 +592,11 @@ func setupPipes(container *configs.Config, processConfig *execdriver.ProcessConf
 		p.Stdin = r
 	}
 	for _, fd := range fds {
-		if err := syscall.Fchown(fd, rootuid, rootuid); err != nil {
-			return fmt.Errorf("Failed to chown pipes fd: %v", err)
+		if err := syscall.Fchown(int(fd), rootuid, rootuid); err != nil {
+			return writers, fmt.Errorf("Failed to chown pipes fd: %v", err)
 		}
 	}
-	return nil
+	return writers, nil
 }
 
 // SupportsHooks implements the execdriver Driver interface.

+ 10 - 1
daemon/execdriver/native/exec.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"syscall"
 
 	"github.com/docker/docker/daemon/execdriver"
@@ -52,13 +53,19 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
 	}
 
 	config := active.Config()
-	if err := setupPipes(&config, processConfig, p, pipes); err != nil {
+	wg := sync.WaitGroup{}
+	writers, err := setupPipes(&config, processConfig, p, pipes, &wg)
+	if err != nil {
 		return -1, err
 	}
 
 	if err := active.Start(p); err != nil {
 		return -1, err
 	}
+	//close the write end of any opened pipes now that they are dup'ed into the container
+	for _, writer := range writers {
+		writer.Close()
+	}
 
 	if hooks.Start != nil {
 		pid, err := p.Pid()
@@ -83,5 +90,7 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
 		}
 		ps = exitErr.ProcessState
 	}
+	// wait for all IO goroutine copiers to finish
+	wg.Wait()
 	return utils.ExitStatus(ps.Sys().(syscall.WaitStatus)), nil
 }