浏览代码

move attach to a job

Docker-DCO-1.1-Signed-off-by: Victor Vieux <victor.vieux@docker.com> (github: vieux)
Victor Vieux 11 年之前
父节点
当前提交
e1d8543c78
共有 4 个文件被更改,包括 78 次插入74 次删除
  1. 11 11
      REMOTE_TODO.md
  2. 23 50
      api.go
  3. 11 0
      engine/streams.go
  4. 33 13
      server.go

+ 11 - 11
REMOTE_TODO.md

@@ -4,26 +4,26 @@
 TODO    "/events":                              getEvents,              N
 ok      "/info":                                getInfo,                1
 ok      "/version":                             getVersion,             1
-ok     "/images/json":                         getImagesJSON,          N
+ok      "/images/json":                         getImagesJSON,          N
 ok      "/images/viz":                          getImagesViz,           0                       yes
-#3615   "/images/search":                       getImagesSearch,        N
-ok    "/images/{name:.*}/get":                getImagesGet,           0
-ok    "/images/{name:.*}/history":            getImagesHistory,       N
-TODO    "/images/{name:.*}/json":               getImagesByName,        1
+ok      "/images/search":                       getImagesSearch,        N
+ok      "/images/{name:.*}/get":                getImagesGet,           0
+ok      "/images/{name:.*}/history":            getImagesHistory,       N
+...     "/images/{name:.*}/json":               getImagesByName,        1
 TODO    "/containers/ps":                       getContainersJSON,      N
 TODO    "/containers/json":                     getContainersJSON,      1
 ok      "/containers/{name:.*}/export":         getContainersExport,    0
-...     "/containers/{name:.*}/changes":        getContainersChanges,   N
-TODO    "/containers/{name:.*}/json":           getContainersByName,    1
+#3616   "/containers/{name:.*}/changes":        getContainersChanges,   N
+...     "/containers/{name:.*}/json":           getContainersByName,    1
 TODO    "/containers/{name:.*}/top":            getContainersTop,       N
-#3512   "/containers/{name:.*}/attach/ws":      wsContainersAttach,     0                                       yes
+ok      "/containers/{name:.*}/attach/ws":      wsContainersAttach,     0                                       yes
 
 **POST**
 TODO    "/auth":                                postAuth,               0                       yes
 ok      "/commit":                              postCommit,             0
 TODO    "/build":                               postBuild,              0                       yes
 TODO    "/images/create":                       postImagesCreate,       N                       yes             yes (pull)
-#3559   "/images/{name:.*}/insert":             postImagesInsert,       N                       yes             yes
+ok      "/images/{name:.*}/insert":             postImagesInsert,       N                       yes             yes
 TODO    "/images/load":                         postImagesLoad,         1                                       yes (stdin)
 TODO    "/images/{name:.*}/push":               postImagesPush,         N                                       yes
 ok      "/images/{name:.*}/tag":                postImagesTag,          0
@@ -34,8 +34,8 @@ ok      "/containers/{name:.*}/start":          postContainersStart,    0
 ok      "/containers/{name:.*}/stop":           postContainersStop,     0
 ok      "/containers/{name:.*}/wait":           postContainersWait,     0
 ok      "/containers/{name:.*}/resize":         postContainersResize,   0
-#3512   "/containers/{name:.*}/attach":         postContainersAttach,   0                                       yes
-#3560   "/containers/{name:.*}/copy":           postContainersCopy,     0                       yes
+ok      "/containers/{name:.*}/attach":         postContainersAttach,   0                                       yes
+ok   "/containers/{name:.*}/copy":           postContainersCopy,     0                       yes
 
 **DELETE**
 ok      "/containers/{name:.*}":                deleteContainers,       0

+ 23 - 50
api.go

@@ -769,33 +769,11 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
 	if err := parseForm(r); err != nil {
 		return err
 	}
-	logs, err := getBoolParam(r.Form.Get("logs"))
-	if err != nil {
-		return err
-	}
-	stream, err := getBoolParam(r.Form.Get("stream"))
-	if err != nil {
-		return err
-	}
-	stdin, err := getBoolParam(r.Form.Get("stdin"))
-	if err != nil {
-		return err
-	}
-	stdout, err := getBoolParam(r.Form.Get("stdout"))
-	if err != nil {
-		return err
-	}
-	stderr, err := getBoolParam(r.Form.Get("stderr"))
-	if err != nil {
-		return err
-	}
-
 	if vars == nil {
 		return fmt.Errorf("Missing parameter")
 	}
-	name := vars["name"]
 
-	c, err := srv.ContainerInspect(name)
+	c, err := srv.ContainerInspect(vars["name"])
 	if err != nil {
 		return err
 	}
@@ -830,51 +808,46 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
 		errStream = outStream
 	}
 
-	if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, inStream, outStream, errStream); err != nil {
+	job := srv.Eng.Job("attach", vars["name"])
+	job.Setenv("logs", r.Form.Get("logs"))
+	job.Setenv("stream", r.Form.Get("stream"))
+	job.Setenv("stdin", r.Form.Get("stdin"))
+	job.Setenv("stdout", r.Form.Get("stdout"))
+	job.Setenv("stderr", r.Form.Get("stderr"))
+	job.Stdin.Add(inStream)
+	job.Stdout.Add(outStream)
+	job.Stderr.Add(errStream)
+	if err := job.Run(); err != nil {
 		fmt.Fprintf(outStream, "Error: %s\n", err)
+
 	}
 	return nil
 }
 
 func wsContainersAttach(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
-
 	if err := parseForm(r); err != nil {
 		return err
 	}
-	logs, err := getBoolParam(r.Form.Get("logs"))
-	if err != nil {
-		return err
-	}
-	stream, err := getBoolParam(r.Form.Get("stream"))
-	if err != nil {
-		return err
-	}
-	stdin, err := getBoolParam(r.Form.Get("stdin"))
-	if err != nil {
-		return err
-	}
-	stdout, err := getBoolParam(r.Form.Get("stdout"))
-	if err != nil {
-		return err
-	}
-	stderr, err := getBoolParam(r.Form.Get("stderr"))
-	if err != nil {
-		return err
-	}
-
 	if vars == nil {
 		return fmt.Errorf("Missing parameter")
 	}
-	name := vars["name"]
 
-	if _, err := srv.ContainerInspect(name); err != nil {
+	if _, err := srv.ContainerInspect(vars["name"]); err != nil {
 		return err
 	}
 
 	h := websocket.Handler(func(ws *websocket.Conn) {
 		defer ws.Close()
-
-		if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, ws, ws, ws); err != nil {
+		job := srv.Eng.Job("attach", vars["name"])
+		job.Setenv("logs", r.Form.Get("logs"))
+		job.Setenv("stream", r.Form.Get("stream"))
+		job.Setenv("stdin", r.Form.Get("stdin"))
+		job.Setenv("stdout", r.Form.Get("stdout"))
+		job.Setenv("stderr", r.Form.Get("stderr"))
+		job.Stdin.Add(ws)
+		job.Stdout.Add(ws)
+		job.Stderr.Add(ws)
+		if err := job.Run(); err != nil {
 			utils.Errorf("Error: %s", err)
 		}
 	})

+ 11 - 0
engine/streams.go

@@ -141,6 +141,17 @@ func (i *Input) Read(p []byte) (n int, err error) {
 	return i.src.Read(p)
 }
 
+// Closes the src
+// Not thread safe on purpose
+func (i *Input) Close() error {
+	if i.src != nil {
+		if closer, ok := i.src.(io.WriteCloser); ok {
+			return closer.Close()
+		}
+	}
+	return nil
+}
+
 // Add attaches a new source to the input.
 // Add can only be called once per input. Subsequent calls will
 // return an error.

+ 33 - 13
server.go

@@ -147,6 +147,10 @@ func jobInitApi(job *engine.Job) engine.Status {
 		job.Error(err)
 		return engine.StatusErr
 	}
+	if err := job.Eng.Register("attach", srv.ContainerAttach); err != nil {
+		job.Error(err)
+		return engine.StatusErr
+	}
 	return engine.StatusOK
 }
 
@@ -1980,10 +1984,25 @@ func (srv *Server) ContainerResize(job *engine.Job) engine.Status {
 	return engine.StatusErr
 }
 
-func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, inStream io.ReadCloser, outStream, errStream io.Writer) error {
+func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
+	if len(job.Args) != 1 {
+		job.Errorf("Usage: %s CONTAINER\n", job.Name)
+		return engine.StatusErr
+	}
+
+	var (
+		name   = job.Args[0]
+		logs   = job.GetenvBool("logs")
+		stream = job.GetenvBool("stream")
+		stdin  = job.GetenvBool("stdin")
+		stdout = job.GetenvBool("stdout")
+		stderr = job.GetenvBool("stderr")
+	)
+
 	container := srv.runtime.Get(name)
 	if container == nil {
-		return fmt.Errorf("No such container: %s", name)
+		job.Errorf("No such container: %s", name)
+		return engine.StatusErr
 	}
 
 	//logs
@@ -1991,12 +2010,12 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 		cLog, err := container.ReadLog("json")
 		if err != nil && os.IsNotExist(err) {
 			// Legacy logs
-			utils.Errorf("Old logs format")
+			utils.Debugf("Old logs format")
 			if stdout {
 				cLog, err := container.ReadLog("stdout")
 				if err != nil {
 					utils.Errorf("Error reading logs (stdout): %s", err)
-				} else if _, err := io.Copy(outStream, cLog); err != nil {
+				} else if _, err := io.Copy(job.Stdout, cLog); err != nil {
 					utils.Errorf("Error streaming logs (stdout): %s", err)
 				}
 			}
@@ -2004,7 +2023,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 				cLog, err := container.ReadLog("stderr")
 				if err != nil {
 					utils.Errorf("Error reading logs (stderr): %s", err)
-				} else if _, err := io.Copy(errStream, cLog); err != nil {
+				} else if _, err := io.Copy(job.Stderr, cLog); err != nil {
 					utils.Errorf("Error streaming logs (stderr): %s", err)
 				}
 			}
@@ -2022,10 +2041,10 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 					break
 				}
 				if l.Stream == "stdout" && stdout {
-					fmt.Fprintf(outStream, "%s", l.Log)
+					fmt.Fprintf(job.Stdout, "%s", l.Log)
 				}
 				if l.Stream == "stderr" && stderr {
-					fmt.Fprintf(errStream, "%s", l.Log)
+					fmt.Fprintf(job.Stderr, "%s", l.Log)
 				}
 			}
 		}
@@ -2034,7 +2053,8 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 	//stream
 	if stream {
 		if container.State.IsGhost() {
-			return fmt.Errorf("Impossible to attach to a ghost container")
+			job.Errorf("Impossible to attach to a ghost container")
+			return engine.StatusErr
 		}
 
 		var (
@@ -2048,16 +2068,16 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 			go func() {
 				defer w.Close()
 				defer utils.Debugf("Closing buffered stdin pipe")
-				io.Copy(w, inStream)
+				io.Copy(w, job.Stdin)
 			}()
 			cStdin = r
-			cStdinCloser = inStream
+			cStdinCloser = job.Stdin
 		}
 		if stdout {
-			cStdout = outStream
+			cStdout = job.Stdout
 		}
 		if stderr {
-			cStderr = errStream
+			cStderr = job.Stderr
 		}
 
 		<-container.Attach(cStdin, cStdinCloser, cStdout, cStderr)
@@ -2068,7 +2088,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
 			container.Wait()
 		}
 	}
-	return nil
+	return engine.StatusOK
 }
 
 func (srv *Server) ContainerInspect(name string) (*Container, error) {