Просмотр исходного кода

Merge pull request #13000 from runcom/refactor-server-to-use-daemon-service-followup

Refactor server to use the daemon as a service
Alexander Morozov 10 лет назад
Родитель
Сommit
3b13e56fd7
4 измененных файлов с 290 добавлено и 246 удалено
  1. 21 39
      api/server/server.go
  2. 39 207
      daemon/attach.go
  3. 218 0
      daemon/container.go
  4. 12 0
      daemon/wait.go

+ 21 - 39
api/server/server.go

@@ -1068,14 +1068,11 @@ func (s *Server) postContainersWait(version version.Version, w http.ResponseWrit
 		return fmt.Errorf("Missing parameter")
 	}
 
-	name := vars["name"]
-	cont, err := s.daemon.Get(name)
+	status, err := s.daemon.ContainerWait(vars["name"], -1*time.Second)
 	if err != nil {
 		return err
 	}
 
-	status, _ := cont.WaitStop(-1 * time.Second)
-
 	return writeJSON(w, http.StatusOK, &types.ContainerWaitResponse{
 		StatusCode: status,
 	})
@@ -1109,50 +1106,33 @@ func (s *Server) postContainersAttach(version version.Version, w http.ResponseWr
 		return fmt.Errorf("Missing parameter")
 	}
 
-	cont, err := s.daemon.Get(vars["name"])
-	if err != nil {
-		return err
-	}
-
 	inStream, outStream, err := hijackServer(w)
 	if err != nil {
 		return err
 	}
 	defer closeStreams(inStream, outStream)
 
-	var errStream io.Writer
-
 	if _, ok := r.Header["Upgrade"]; ok {
 		fmt.Fprintf(outStream, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
 	} else {
 		fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
 	}
 
-	if !cont.Config.Tty && version.GreaterThanOrEqualTo("1.6") {
-		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
-		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
-	} else {
-		errStream = outStream
-	}
-	logs := boolValue(r, "logs")
-	stream := boolValue(r, "stream")
-
-	var stdin io.ReadCloser
-	var stdout, stderr io.Writer
-
-	if boolValue(r, "stdin") {
-		stdin = inStream
-	}
-	if boolValue(r, "stdout") {
-		stdout = outStream
-	}
-	if boolValue(r, "stderr") {
-		stderr = errStream
+	attachWithLogsConfig := &daemon.ContainerAttachWithLogsConfig{
+		InStream:  inStream,
+		OutStream: outStream,
+		UseStdin:  boolValue(r, "stdin"),
+		UseStdout: boolValue(r, "stdout"),
+		UseStderr: boolValue(r, "stderr"),
+		Logs:      boolValue(r, "logs"),
+		Stream:    boolValue(r, "stream"),
+		Multiplex: version.GreaterThanOrEqualTo("1.6"),
 	}
 
-	if err := cont.AttachWithLogs(stdin, stdout, stderr, logs, stream); err != nil {
+	if err := s.daemon.ContainerAttachWithLogs(vars["name"], attachWithLogsConfig); err != nil {
 		fmt.Fprintf(outStream, "Error attaching: %s\n", err)
 	}
+
 	return nil
 }
 
@@ -1163,17 +1143,19 @@ func (s *Server) wsContainersAttach(version version.Version, w http.ResponseWrit
 	if vars == nil {
 		return fmt.Errorf("Missing parameter")
 	}
-	cont, err := s.daemon.Get(vars["name"])
-	if err != nil {
-		return err
-	}
 
 	h := websocket.Handler(func(ws *websocket.Conn) {
 		defer ws.Close()
-		logs := r.Form.Get("logs") != ""
-		stream := r.Form.Get("stream") != ""
 
-		if err := cont.AttachWithLogs(ws, ws, ws, logs, stream); err != nil {
+		wsAttachWithLogsConfig := &daemon.ContainerWsAttachWithLogsConfig{
+			InStream:  ws,
+			OutStream: ws,
+			ErrStream: ws,
+			Logs:      boolValue(r, "logs"),
+			Stream:    boolValue(r, "stream"),
+		}
+
+		if err := s.daemon.ContainerWsAttachWithLogs(vars["name"], wsAttachWithLogsConfig); err != nil {
 			logrus.Errorf("Error attaching websocket: %s", err)
 		}
 	})

+ 39 - 207
daemon/attach.go

@@ -1,229 +1,61 @@
 package daemon
 
 import (
-	"encoding/json"
 	"io"
-	"os"
-	"sync"
-	"time"
 
-	"github.com/Sirupsen/logrus"
-	"github.com/docker/docker/pkg/jsonlog"
-	"github.com/docker/docker/pkg/promise"
+	"github.com/docker/docker/pkg/stdcopy"
 )
 
-func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
-	if logs {
-		cLog, err := c.ReadLog("json")
-		if err != nil && os.IsNotExist(err) {
-			// Legacy logs
-			logrus.Debugf("Old logs format")
-			if stdout != nil {
-				cLog, err := c.ReadLog("stdout")
-				if err != nil {
-					logrus.Errorf("Error reading logs (stdout): %s", err)
-				} else if _, err := io.Copy(stdout, cLog); err != nil {
-					logrus.Errorf("Error streaming logs (stdout): %s", err)
-				}
-			}
-			if stderr != nil {
-				cLog, err := c.ReadLog("stderr")
-				if err != nil {
-					logrus.Errorf("Error reading logs (stderr): %s", err)
-				} else if _, err := io.Copy(stderr, cLog); err != nil {
-					logrus.Errorf("Error streaming logs (stderr): %s", err)
-				}
-			}
-		} else if err != nil {
-			logrus.Errorf("Error reading logs (json): %s", err)
-		} else {
-			dec := json.NewDecoder(cLog)
-			for {
-				l := &jsonlog.JSONLog{}
-
-				if err := dec.Decode(l); err == io.EOF {
-					break
-				} else if err != nil {
-					logrus.Errorf("Error streaming logs: %s", err)
-					break
-				}
-				if l.Stream == "stdout" && stdout != nil {
-					io.WriteString(stdout, l.Log)
-				}
-				if l.Stream == "stderr" && stderr != nil {
-					io.WriteString(stderr, l.Log)
-				}
-			}
-		}
-	}
-
-	//stream
-	if stream {
-		var stdinPipe io.ReadCloser
-		if stdin != nil {
-			r, w := io.Pipe()
-			go func() {
-				defer w.Close()
-				defer logrus.Debugf("Closing buffered stdin pipe")
-				io.Copy(w, stdin)
-			}()
-			stdinPipe = r
-		}
-		<-c.Attach(stdinPipe, stdout, stderr)
-		// If we are in stdinonce mode, wait for the process to end
-		// otherwise, simply return
-		if c.Config.StdinOnce && !c.Config.Tty {
-			c.WaitStop(-1 * time.Second)
-		}
-	}
-	return nil
+type ContainerAttachWithLogsConfig struct {
+	InStream                       io.ReadCloser
+	OutStream                      io.Writer
+	UseStdin, UseStdout, UseStderr bool
+	Logs, Stream                   bool
+	Multiplex                      bool
 }
 
-func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
-	return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr)
-}
-
-func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
-	var (
-		cStdout, cStderr io.ReadCloser
-		cStdin           io.WriteCloser
-		wg               sync.WaitGroup
-		errors           = make(chan error, 3)
-	)
-
-	if stdin != nil && openStdin {
-		cStdin = streamConfig.StdinPipe()
-		wg.Add(1)
+func (daemon *Daemon) ContainerAttachWithLogs(name string, c *ContainerAttachWithLogsConfig) error {
+	container, err := daemon.Get(name)
+	if err != nil {
+		return err
 	}
 
-	if stdout != nil {
-		cStdout = streamConfig.StdoutPipe()
-		wg.Add(1)
-	}
+	var errStream io.Writer
 
-	if stderr != nil {
-		cStderr = streamConfig.StderrPipe()
-		wg.Add(1)
+	if !container.Config.Tty && c.Multiplex {
+		errStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stderr)
+		c.OutStream = stdcopy.NewStdWriter(c.OutStream, stdcopy.Stdout)
+	} else {
+		errStream = c.OutStream
 	}
 
-	// Connect stdin of container to the http conn.
-	go func() {
-		if stdin == nil || !openStdin {
-			return
-		}
-		logrus.Debugf("attach: stdin: begin")
-		defer func() {
-			if stdinOnce && !tty {
-				cStdin.Close()
-			} else {
-				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
-				if cStdout != nil {
-					cStdout.Close()
-				}
-				if cStderr != nil {
-					cStderr.Close()
-				}
-			}
-			wg.Done()
-			logrus.Debugf("attach: stdin: end")
-		}()
+	var stdin io.ReadCloser
+	var stdout, stderr io.Writer
 
-		var err error
-		if tty {
-			_, err = copyEscapable(cStdin, stdin)
-		} else {
-			_, err = io.Copy(cStdin, stdin)
-
-		}
-		if err == io.ErrClosedPipe {
-			err = nil
-		}
-		if err != nil {
-			logrus.Errorf("attach: stdin: %s", err)
-			errors <- err
-			return
-		}
-	}()
-
-	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
-		if stream == nil {
-			return
-		}
-		defer func() {
-			// Make sure stdin gets closed
-			if stdin != nil {
-				stdin.Close()
-			}
-			streamPipe.Close()
-			wg.Done()
-			logrus.Debugf("attach: %s: end", name)
-		}()
-
-		logrus.Debugf("attach: %s: begin", name)
-		_, err := io.Copy(stream, streamPipe)
-		if err == io.ErrClosedPipe {
-			err = nil
-		}
-		if err != nil {
-			logrus.Errorf("attach: %s: %v", name, err)
-			errors <- err
-		}
+	if c.UseStdin {
+		stdin = c.InStream
+	}
+	if c.UseStdout {
+		stdout = c.OutStream
+	}
+	if c.UseStderr {
+		stderr = errStream
 	}
 
-	go attachStream("stdout", stdout, cStdout)
-	go attachStream("stderr", stderr, cStderr)
+	return container.AttachWithLogs(stdin, stdout, stderr, c.Logs, c.Stream)
+}
 
-	return promise.Go(func() error {
-		wg.Wait()
-		close(errors)
-		for err := range errors {
-			if err != nil {
-				return err
-			}
-		}
-		return nil
-	})
+type ContainerWsAttachWithLogsConfig struct {
+	InStream             io.ReadCloser
+	OutStream, ErrStream io.Writer
+	Logs, Stream         bool
 }
 
-// Code c/c from io.Copy() modified to handle escape sequence
-func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) {
-	buf := make([]byte, 32*1024)
-	for {
-		nr, er := src.Read(buf)
-		if nr > 0 {
-			// ---- Docker addition
-			// char 16 is C-p
-			if nr == 1 && buf[0] == 16 {
-				nr, er = src.Read(buf)
-				// char 17 is C-q
-				if nr == 1 && buf[0] == 17 {
-					if err := src.Close(); err != nil {
-						return 0, err
-					}
-					return 0, nil
-				}
-			}
-			// ---- End of docker
-			nw, ew := dst.Write(buf[0:nr])
-			if nw > 0 {
-				written += int64(nw)
-			}
-			if ew != nil {
-				err = ew
-				break
-			}
-			if nr != nw {
-				err = io.ErrShortWrite
-				break
-			}
-		}
-		if er == io.EOF {
-			break
-		}
-		if er != nil {
-			err = er
-			break
-		}
+func (daemon *Daemon) ContainerWsAttachWithLogs(name string, c *ContainerWsAttachWithLogsConfig) error {
+	container, err := daemon.Get(name)
+	if err != nil {
+		return err
 	}
-	return written, err
+
+	return container.AttachWithLogs(c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream)
 }

+ 218 - 0
daemon/container.go

@@ -11,6 +11,7 @@ import (
 	"path"
 	"path/filepath"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 
@@ -34,6 +35,7 @@ import (
 	"github.com/docker/docker/pkg/directory"
 	"github.com/docker/docker/pkg/etchosts"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/promise"
 	"github.com/docker/docker/pkg/resolvconf"
 	"github.com/docker/docker/pkg/stringid"
@@ -1654,3 +1656,219 @@ func (container *Container) monitorExec(execConfig *execConfig, callback execdri
 
 	return err
 }
+
+func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
+	return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr)
+}
+
+func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
+	if logs {
+		cLog, err := c.ReadLog("json")
+		if err != nil && os.IsNotExist(err) {
+			// Legacy logs
+			logrus.Debugf("Old logs format")
+			if stdout != nil {
+				cLog, err := c.ReadLog("stdout")
+				if err != nil {
+					logrus.Errorf("Error reading logs (stdout): %s", err)
+				} else if _, err := io.Copy(stdout, cLog); err != nil {
+					logrus.Errorf("Error streaming logs (stdout): %s", err)
+				}
+			}
+			if stderr != nil {
+				cLog, err := c.ReadLog("stderr")
+				if err != nil {
+					logrus.Errorf("Error reading logs (stderr): %s", err)
+				} else if _, err := io.Copy(stderr, cLog); err != nil {
+					logrus.Errorf("Error streaming logs (stderr): %s", err)
+				}
+			}
+		} else if err != nil {
+			logrus.Errorf("Error reading logs (json): %s", err)
+		} else {
+			dec := json.NewDecoder(cLog)
+			for {
+				l := &jsonlog.JSONLog{}
+
+				if err := dec.Decode(l); err == io.EOF {
+					break
+				} else if err != nil {
+					logrus.Errorf("Error streaming logs: %s", err)
+					break
+				}
+				if l.Stream == "stdout" && stdout != nil {
+					io.WriteString(stdout, l.Log)
+				}
+				if l.Stream == "stderr" && stderr != nil {
+					io.WriteString(stderr, l.Log)
+				}
+			}
+		}
+	}
+
+	//stream
+	if stream {
+		var stdinPipe io.ReadCloser
+		if stdin != nil {
+			r, w := io.Pipe()
+			go func() {
+				defer w.Close()
+				defer logrus.Debugf("Closing buffered stdin pipe")
+				io.Copy(w, stdin)
+			}()
+			stdinPipe = r
+		}
+		<-c.Attach(stdinPipe, stdout, stderr)
+		// If we are in stdinonce mode, wait for the process to end
+		// otherwise, simply return
+		if c.Config.StdinOnce && !c.Config.Tty {
+			c.WaitStop(-1 * time.Second)
+		}
+	}
+	return nil
+}
+
+func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
+	var (
+		cStdout, cStderr io.ReadCloser
+		cStdin           io.WriteCloser
+		wg               sync.WaitGroup
+		errors           = make(chan error, 3)
+	)
+
+	if stdin != nil && openStdin {
+		cStdin = streamConfig.StdinPipe()
+		wg.Add(1)
+	}
+
+	if stdout != nil {
+		cStdout = streamConfig.StdoutPipe()
+		wg.Add(1)
+	}
+
+	if stderr != nil {
+		cStderr = streamConfig.StderrPipe()
+		wg.Add(1)
+	}
+
+	// Connect stdin of container to the http conn.
+	go func() {
+		if stdin == nil || !openStdin {
+			return
+		}
+		logrus.Debugf("attach: stdin: begin")
+		defer func() {
+			if stdinOnce && !tty {
+				cStdin.Close()
+			} else {
+				// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
+				if cStdout != nil {
+					cStdout.Close()
+				}
+				if cStderr != nil {
+					cStderr.Close()
+				}
+			}
+			wg.Done()
+			logrus.Debugf("attach: stdin: end")
+		}()
+
+		var err error
+		if tty {
+			_, err = copyEscapable(cStdin, stdin)
+		} else {
+			_, err = io.Copy(cStdin, stdin)
+
+		}
+		if err == io.ErrClosedPipe {
+			err = nil
+		}
+		if err != nil {
+			logrus.Errorf("attach: stdin: %s", err)
+			errors <- err
+			return
+		}
+	}()
+
+	attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
+		if stream == nil {
+			return
+		}
+		defer func() {
+			// Make sure stdin gets closed
+			if stdin != nil {
+				stdin.Close()
+			}
+			streamPipe.Close()
+			wg.Done()
+			logrus.Debugf("attach: %s: end", name)
+		}()
+
+		logrus.Debugf("attach: %s: begin", name)
+		_, err := io.Copy(stream, streamPipe)
+		if err == io.ErrClosedPipe {
+			err = nil
+		}
+		if err != nil {
+			logrus.Errorf("attach: %s: %v", name, err)
+			errors <- err
+		}
+	}
+
+	go attachStream("stdout", stdout, cStdout)
+	go attachStream("stderr", stderr, cStderr)
+
+	return promise.Go(func() error {
+		wg.Wait()
+		close(errors)
+		for err := range errors {
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+}
+
+// Code c/c from io.Copy() modified to handle escape sequence
+func copyEscapable(dst io.Writer, src io.ReadCloser) (written int64, err error) {
+	buf := make([]byte, 32*1024)
+	for {
+		nr, er := src.Read(buf)
+		if nr > 0 {
+			// ---- Docker addition
+			// char 16 is C-p
+			if nr == 1 && buf[0] == 16 {
+				nr, er = src.Read(buf)
+				// char 17 is C-q
+				if nr == 1 && buf[0] == 17 {
+					if err := src.Close(); err != nil {
+						return 0, err
+					}
+					return 0, nil
+				}
+			}
+			// ---- End of docker
+			nw, ew := dst.Write(buf[0:nr])
+			if nw > 0 {
+				written += int64(nw)
+			}
+			if ew != nil {
+				err = ew
+				break
+			}
+			if nr != nw {
+				err = io.ErrShortWrite
+				break
+			}
+		}
+		if er == io.EOF {
+			break
+		}
+		if er != nil {
+			err = er
+			break
+		}
+	}
+	return written, err
+}

+ 12 - 0
daemon/wait.go

@@ -0,0 +1,12 @@
+package daemon
+
+import "time"
+
+func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, error) {
+	container, err := daemon.Get(name)
+	if err != nil {
+		return -1, err
+	}
+
+	return container.WaitStop(timeout)
+}