Merge pull request #1842 from dotcloud/split_stdout_stderr

* Runtime: Split stdout stderr
This commit is contained in:
Guillaume J. Charmes 2013-09-26 18:05:24 -07:00
commit f435970695
7 changed files with 1541 additions and 40 deletions

41
api.go
View file

@ -21,10 +21,12 @@ import (
"strings"
)
const APIVERSION = 1.5
const DEFAULTHTTPHOST = "127.0.0.1"
const DEFAULTHTTPPORT = 4243
const DEFAULTUNIXSOCKET = "/var/run/docker.sock"
const (
APIVERSION = 1.6
DEFAULTHTTPHOST = "127.0.0.1"
DEFAULTHTTPPORT = 4243
DEFAULTUNIXSOCKET = "/var/run/docker.sock"
)
type HttpApiFunc func(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error
@ -710,32 +712,43 @@ func postContainersAttach(srv *Server, version float64, w http.ResponseWriter, r
}
name := vars["name"]
if _, err := srv.ContainerInspect(name); err != nil {
c, err := srv.ContainerInspect(name)
if err != nil {
return err
}
in, out, err := hijackServer(w)
inStream, outStream, err := hijackServer(w)
if err != nil {
return err
}
defer func() {
if tcpc, ok := in.(*net.TCPConn); ok {
if tcpc, ok := inStream.(*net.TCPConn); ok {
tcpc.CloseWrite()
} else {
in.Close()
inStream.Close()
}
}()
defer func() {
if tcpc, ok := out.(*net.TCPConn); ok {
if tcpc, ok := outStream.(*net.TCPConn); ok {
tcpc.CloseWrite()
} else if closer, ok := out.(io.Closer); ok {
} else if closer, ok := outStream.(io.Closer); ok {
closer.Close()
}
}()
fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, in, out); err != nil {
fmt.Fprintf(out, "Error: %s\n", err)
var errStream io.Writer
fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
if !c.Config.Tty && version >= 1.6 {
errStream = utils.NewStdWriter(outStream, utils.Stderr)
outStream = utils.NewStdWriter(outStream, utils.Stdout)
} else {
errStream = outStream
}
if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, inStream, outStream, errStream); err != nil {
fmt.Fprintf(outStream, "Error: %s\n", err)
}
return nil
}
@ -778,7 +791,7 @@ func wsContainersAttach(srv *Server, version float64, w http.ResponseWriter, r *
h := websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, ws, ws); err != nil {
if err := srv.ContainerAttach(name, logs, stream, stdin, stdout, stderr, ws, ws, ws); err != nil {
utils.Debugf("Error: %s", err)
}
})

View file

@ -566,7 +566,6 @@ func TestPostCommit(t *testing.T) {
srv := &Server{runtime: runtime}
// Create a container and remove a file
container, err := runtime.Create(
&Config{
@ -952,7 +951,96 @@ func TestPostContainersAttach(t *testing.T) {
})
setTimeout(t, "read/write assertion timed out", 2*time.Second, func() {
if err := assertPipe("hello\n", "hello", stdout, stdinPipe, 15); err != nil {
if err := assertPipe("hello\n", string([]byte{1, 0, 0, 0, 0, 0, 0, 6})+"hello", stdout, stdinPipe, 15); err != nil {
t.Fatal(err)
}
})
// Close pipes (client disconnects)
if err := closeWrap(stdin, stdinPipe, stdout, stdoutPipe); err != nil {
t.Fatal(err)
}
// Wait for attach to finish, the client disconnected, therefore, Attach finished his job
setTimeout(t, "Waiting for CmdAttach timed out", 10*time.Second, func() {
<-c1
})
// We closed stdin, expect /bin/cat to still be running
// Wait a little bit to make sure container.monitor() did his thing
err = container.WaitTimeout(500 * time.Millisecond)
if err == nil || !container.State.Running {
t.Fatalf("/bin/cat is not running after closing stdin")
}
// Try to avoid the timeout in destroy. Best effort, don't check error
cStdin, _ := container.StdinPipe()
cStdin.Close()
container.Wait()
}
func TestPostContainersAttachStderr(t *testing.T) {
runtime := mkRuntime(t)
defer nuke(runtime)
srv := &Server{runtime: runtime}
container, err := runtime.Create(
&Config{
Image: GetTestImage(runtime).ID,
Cmd: []string{"/bin/sh", "-c", "/bin/cat >&2"},
OpenStdin: true,
},
)
if err != nil {
t.Fatal(err)
}
defer runtime.Destroy(container)
// Start the process
hostConfig := &HostConfig{}
if err := container.Start(hostConfig); err != nil {
t.Fatal(err)
}
stdin, stdinPipe := io.Pipe()
stdout, stdoutPipe := io.Pipe()
// Try to avoid the timeout in destroy. Best effort, don't check error
defer func() {
closeWrap(stdin, stdinPipe, stdout, stdoutPipe)
container.Kill()
}()
// Attach to it
c1 := make(chan struct{})
go func() {
defer close(c1)
r := &hijackTester{
ResponseRecorder: httptest.NewRecorder(),
in: stdin,
out: stdoutPipe,
}
req, err := http.NewRequest("POST", "/containers/"+container.ID+"/attach?stream=1&stdin=1&stdout=1&stderr=1", bytes.NewReader([]byte{}))
if err != nil {
t.Fatal(err)
}
if err := postContainersAttach(srv, APIVERSION, r, req, map[string]string{"name": container.ID}); err != nil {
t.Fatal(err)
}
}()
// Acknowledge hijack
setTimeout(t, "hijack acknowledge timed out", 2*time.Second, func() {
stdout.Read([]byte{})
stdout.Read(make([]byte, 4096))
})
setTimeout(t, "read/write assertion timed out", 2*time.Second, func() {
if err := assertPipe("hello\n", string([]byte{2, 0, 0, 0, 0, 0, 0, 6})+"hello", stdout, stdinPipe, 15); err != nil {
t.Fatal(err)
}
})

View file

@ -1230,7 +1230,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
return nil
}
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1&stderr=1", false, nil, cli.out); err != nil {
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1&stderr=1", false, nil, cli.out, cli.err); err != nil {
return err
}
return nil
@ -1273,7 +1273,7 @@ func (cli *DockerCli) CmdAttach(args ...string) error {
v.Set("stdout", "1")
v.Set("stderr", "1")
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?"+v.Encode(), container.Config.Tty, cli.in, cli.out); err != nil {
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?"+v.Encode(), container.Config.Tty, cli.in, cli.out, cli.err); err != nil {
return err
}
return nil
@ -1537,7 +1537,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
v := url.Values{}
v.Set("logs", "1")
v.Set("stream", "1")
var out io.Writer
var out, stderr io.Writer
if config.AttachStdin {
v.Set("stdin", "1")
@ -1548,7 +1548,11 @@ func (cli *DockerCli) CmdRun(args ...string) error {
}
if config.AttachStderr {
v.Set("stderr", "1")
out = cli.out
if config.Tty {
stderr = cli.out
} else {
stderr = cli.err
}
}
signals := make(chan os.Signal, 1)
@ -1562,7 +1566,7 @@ func (cli *DockerCli) CmdRun(args ...string) error {
}
}()
if err := cli.hijack("POST", "/containers/"+runResult.ID+"/attach?"+v.Encode(), config.Tty, cli.in, out); err != nil {
if err := cli.hijack("POST", "/containers/"+runResult.ID+"/attach?"+v.Encode(), config.Tty, cli.in, out, stderr); err != nil {
utils.Debugf("Error hijack: %s", err)
return err
}
@ -1729,7 +1733,7 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer, h
return nil
}
func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.ReadCloser, out io.Writer) error {
func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.ReadCloser, stdout, stderr io.Writer) error {
req, err := http.NewRequest(method, fmt.Sprintf("/v%g%s", APIVERSION, path), nil)
if err != nil {
@ -1755,10 +1759,16 @@ func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.Rea
rwc, br := clientconn.Hijack()
defer rwc.Close()
var receiveStdout (chan error)
if out != nil {
receiveStdout = utils.Go(func() error {
_, err := io.Copy(out, br)
var receiveStdout chan error
if stdout != nil {
receiveStdout = utils.Go(func() (err error) {
// When TTY is ON, use regular copy
if setRawTerminal {
_, err = io.Copy(stdout, br)
} else {
_, err = utils.StdCopy(stdout, stderr, br)
}
utils.Debugf("[hijack] End of stdout")
return err
})
@ -1790,7 +1800,7 @@ func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.Rea
return nil
})
if out != nil {
if stdout != nil {
if err := <-receiveStdout; err != nil {
utils.Debugf("Error receiveStdout: %s", err)
return err

View file

@ -27,14 +27,29 @@ Docker Remote API
2. Versions
===========
The current version of the API is 1.5
The current version of the API is 1.6
Calling /images/<name>/insert is the same as calling
/v1.5/images/<name>/insert
/v1.6/images/<name>/insert
You can still call an old version of the api using
/v1.0/images/<name>/insert
:doc:`docker_remote_api_v1.6`
*****************************
What's new
----------
.. http:post:: /containers/(id)/attach
**New!** You can now split stderr from stdout. This is done by prefixing
a header to each transmition. See :http:post:`/containers/(id)/attach`.
The WebSocket attach is unchanged.
Note that attach calls on the previous API version didn't change. Stdout and
stderr are merged.
:doc:`docker_remote_api_v1.5`
*****************************

File diff suppressed because it is too large Load diff

View file

@ -1177,11 +1177,12 @@ func (srv *Server) ContainerResize(name string, h, w int) error {
return fmt.Errorf("No such container: %s", name)
}
func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, in io.ReadCloser, out io.Writer) error {
func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, stderr bool, inStream io.ReadCloser, outStream, errStream io.Writer) error {
container := srv.runtime.Get(name)
if container == nil {
return fmt.Errorf("No such container: %s", name)
}
//logs
if logs {
cLog, err := container.ReadLog("json")
@ -1192,7 +1193,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
cLog, err := container.ReadLog("stdout")
if err != nil {
utils.Debugf("Error reading logs (stdout): %s", err)
} else if _, err := io.Copy(out, cLog); err != nil {
} else if _, err := io.Copy(outStream, cLog); err != nil {
utils.Debugf("Error streaming logs (stdout): %s", err)
}
}
@ -1200,7 +1201,7 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
cLog, err := container.ReadLog("stderr")
if err != nil {
utils.Debugf("Error reading logs (stderr): %s", err)
} else if _, err := io.Copy(out, cLog); err != nil {
} else if _, err := io.Copy(errStream, cLog); err != nil {
utils.Debugf("Error streaming logs (stderr): %s", err)
}
}
@ -1209,15 +1210,19 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
} else {
dec := json.NewDecoder(cLog)
for {
var l utils.JSONLog
if err := dec.Decode(&l); err == io.EOF {
l := &utils.JSONLog{}
if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
utils.Debugf("Error streaming logs: %s", err)
break
}
if (l.Stream == "stdout" && stdout) || (l.Stream == "stderr" && stderr) {
fmt.Fprintf(out, "%s", l.Log)
if l.Stream == "stdout" && stdout {
fmt.Fprintf(outStream, "%s", l.Log)
}
if l.Stream == "stderr" && stderr {
fmt.Fprintf(errStream, "%s", l.Log)
}
}
}
@ -1240,16 +1245,16 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
go func() {
defer w.Close()
defer utils.Debugf("Closing buffered stdin pipe")
io.Copy(w, in)
io.Copy(w, inStream)
}()
cStdin = r
cStdinCloser = in
cStdinCloser = inStream
}
if stdout {
cStdout = out
cStdout = outStream
}
if stderr {
cStderr = out
cStderr = errStream
}
<-container.Attach(cStdin, cStdinCloser, cStdout, cStderr)

152
utils/stdcopy.go Normal file
View file

@ -0,0 +1,152 @@
package utils
import (
"encoding/binary"
"errors"
"io"
)
const (
StdWriterPrefixLen = 8
StdWriterFdIndex = 0
StdWriterSizeIndex = 4
)
type StdType [StdWriterPrefixLen]byte
var (
Stdin StdType = StdType{0: 0}
Stdout StdType = StdType{0: 1}
Stderr StdType = StdType{0: 2}
)
type StdWriter struct {
io.Writer
prefix StdType
sizeBuf []byte
}
func (w *StdWriter) Write(buf []byte) (n int, err error) {
if w == nil || w.Writer == nil {
return 0, errors.New("Writer not instanciated")
}
binary.BigEndian.PutUint32(w.prefix[4:], uint32(len(buf)))
buf = append(w.prefix[:], buf...)
n, err = w.Writer.Write(buf)
return n - StdWriterPrefixLen, err
}
// NewStdWriter instanciate a new Writer based on the given type `t`.
// the utils package contains the valid parametres for `t`:
func NewStdWriter(w io.Writer, t StdType) *StdWriter {
if len(t) != StdWriterPrefixLen {
return nil
}
return &StdWriter{
Writer: w,
prefix: t,
sizeBuf: make([]byte, 4),
}
}
var ErrInvalidStdHeader = errors.New("Unrecognized input header")
// StdCopy is a modified version of io.Copy.
//
// StdCopy copies from src to dstout or dsterr until either EOF is reached
// on src or an error occurs. It returns the number of bytes
// copied and the first error encountered while copying, if any.
//
// A successful Copy returns err == nil, not err == EOF.
// Because Copy is defined to read from src until EOF, it does
// not treat an EOF from Read as an error to be reported.
//
// The source needs to be writter via StdWriter, dstout or dsterr is selected
// based on the prefix added by StdWriter
func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) {
var (
buf = make([]byte, 32*1024+StdWriterPrefixLen+1)
bufLen = len(buf)
nr, nw int
er, ew error
out io.Writer
frameSize int
)
for {
// Make sure we have at least a full header
for nr < StdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
if er == io.EOF {
return written, nil
}
if er != nil {
return 0, er
}
nr += nr2
}
// Check the first byte to know where to write
switch buf[StdWriterFdIndex] {
case 0:
fallthrough
case 1:
// Write on stdout
out = dstout
case 2:
// Write on stderr
out = dsterr
default:
Debugf("Error selecting output fd: (%d)", buf[StdWriterFdIndex])
return 0, ErrInvalidStdHeader
}
// Retrieve the size of the frame
frameSize = int(binary.BigEndian.Uint32(buf[StdWriterSizeIndex : StdWriterSizeIndex+4]))
// Check if the buffer is big enough to read the frame.
// Extend it if necessary.
if frameSize+StdWriterPrefixLen > bufLen {
Debugf("Extending buffer cap.")
buf = append(buf, make([]byte, frameSize-len(buf)+1)...)
bufLen = len(buf)
}
// While the amount of bytes read is less than the size of the frame + header, we keep reading
for nr < frameSize+StdWriterPrefixLen {
var nr2 int
nr2, er = src.Read(buf[nr:])
if er == io.EOF {
return written, nil
}
if er != nil {
Debugf("Error reading frame: %s", er)
return 0, er
}
nr += nr2
}
// Write the retrieved frame (without header)
nw, ew = out.Write(buf[StdWriterPrefixLen : frameSize+StdWriterPrefixLen])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
Debugf("Error writing frame: %s", ew)
return 0, ew
}
// If the frame has not been fully written: error
if nw != frameSize {
Debugf("Error Short Write: (%d on %d)", nw, frameSize)
return 0, io.ErrShortWrite
}
// Move the rest of the buffer to the beginning
copy(buf, buf[frameSize+StdWriterPrefixLen:])
// Move the index
nr -= frameSize + StdWriterPrefixLen
}
}