Merge pull request #1211 from dotcloud/new_logs
*Runtime: Logs are now synchronised
This commit is contained in:
commit
0bd534adcf
6 changed files with 88 additions and 45 deletions
|
@ -1105,10 +1105,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1", false, nil, cli.out); err != nil {
|
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1&stderr=1", false, nil, cli.out); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stderr=1", false, nil, cli.err); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -59,7 +59,6 @@ func assertPipe(input, output string, r io.Reader, w io.Writer, count int) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TestRunHostname checks that 'docker run -h' correctly sets a custom hostname
|
// TestRunHostname checks that 'docker run -h' correctly sets a custom hostname
|
||||||
func TestRunHostname(t *testing.T) {
|
func TestRunHostname(t *testing.T) {
|
||||||
stdout, stdoutPipe := io.Pipe()
|
stdout, stdoutPipe := io.Pipe()
|
||||||
|
@ -91,7 +90,6 @@ func TestRunHostname(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TestAttachStdin checks attaching to stdin without stdout and stderr.
|
// TestAttachStdin checks attaching to stdin without stdout and stderr.
|
||||||
// 'docker run -i -a stdin' should sends the client's stdin to the command,
|
// 'docker run -i -a stdin' should sends the client's stdin to the command,
|
||||||
// then detach from it and print the container id.
|
// then detach from it and print the container id.
|
||||||
|
@ -144,15 +142,17 @@ func TestRunAttachStdin(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Check logs
|
// Check logs
|
||||||
if cmdLogs, err := container.ReadLog("stdout"); err != nil {
|
if cmdLogs, err := container.ReadLog("json"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else {
|
} else {
|
||||||
if output, err := ioutil.ReadAll(cmdLogs); err != nil {
|
if output, err := ioutil.ReadAll(cmdLogs); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else {
|
} else {
|
||||||
expectedLog := "hello\nhi there\n"
|
expectedLogs := []string{"{\"log\":\"hello\\n\",\"stream\":\"stdout\"", "{\"log\":\"hi there\\n\",\"stream\":\"stdout\""}
|
||||||
if string(output) != expectedLog {
|
for _, expectedLog := range expectedLogs {
|
||||||
t.Fatalf("Unexpected logs: should be '%s', not '%s'\n", expectedLog, output)
|
if !strings.Contains(string(output), expectedLog) {
|
||||||
|
t.Fatalf("Unexpected logs: should contains '%s', it is not '%s'\n", expectedLog, output)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -653,10 +653,10 @@ func (container *Container) Start(hostConfig *HostConfig) error {
|
||||||
container.cmd = exec.Command("lxc-start", params...)
|
container.cmd = exec.Command("lxc-start", params...)
|
||||||
|
|
||||||
// Setup logging of stdout and stderr to disk
|
// Setup logging of stdout and stderr to disk
|
||||||
if err := container.runtime.LogToDisk(container.stdout, container.logPath("stdout")); err != nil {
|
if err := container.runtime.LogToDisk(container.stdout, container.logPath("json"), "stdout"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := container.runtime.LogToDisk(container.stderr, container.logPath("stderr")); err != nil {
|
if err := container.runtime.LogToDisk(container.stderr, container.logPath("json"), "stderr"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -715,13 +715,13 @@ func (container *Container) StdinPipe() (io.WriteCloser, error) {
|
||||||
|
|
||||||
func (container *Container) StdoutPipe() (io.ReadCloser, error) {
|
func (container *Container) StdoutPipe() (io.ReadCloser, error) {
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
container.stdout.AddWriter(writer)
|
container.stdout.AddWriter(writer, "")
|
||||||
return utils.NewBufReader(reader), nil
|
return utils.NewBufReader(reader), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (container *Container) StderrPipe() (io.ReadCloser, error) {
|
func (container *Container) StderrPipe() (io.ReadCloser, error) {
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
container.stderr.AddWriter(writer)
|
container.stderr.AddWriter(writer, "")
|
||||||
return utils.NewBufReader(reader), nil
|
return utils.NewBufReader(reader), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -167,12 +167,12 @@ func (runtime *Runtime) Register(container *Container) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (runtime *Runtime) LogToDisk(src *utils.WriteBroadcaster, dst string) error {
|
func (runtime *Runtime) LogToDisk(src *utils.WriteBroadcaster, dst, stream string) error {
|
||||||
log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
|
log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
src.AddWriter(log)
|
src.AddWriter(log, stream)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
48
server.go
48
server.go
|
@ -2,6 +2,7 @@ package docker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/dotcloud/docker/auth"
|
"github.com/dotcloud/docker/auth"
|
||||||
|
@ -1055,20 +1056,41 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std
|
||||||
}
|
}
|
||||||
//logs
|
//logs
|
||||||
if logs {
|
if logs {
|
||||||
if stdout {
|
cLog, err := container.ReadLog("json")
|
||||||
cLog, err := container.ReadLog("stdout")
|
if err != nil && os.IsNotExist(err) {
|
||||||
if err != nil {
|
// Legacy logs
|
||||||
utils.Debugf("Error reading logs (stdout): %s", err)
|
utils.Debugf("Old logs format")
|
||||||
} else if _, err := io.Copy(out, cLog); err != nil {
|
if stdout {
|
||||||
utils.Debugf("Error streaming logs (stdout): %s", err)
|
cLog, err := container.ReadLog("stdout")
|
||||||
|
if err != nil {
|
||||||
|
utils.Debugf("Error reading logs (stdout): %s", err)
|
||||||
|
} else if _, err := io.Copy(out, cLog); err != nil {
|
||||||
|
utils.Debugf("Error streaming logs (stdout): %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if stderr {
|
||||||
if stderr {
|
cLog, err := container.ReadLog("stderr")
|
||||||
cLog, err := container.ReadLog("stderr")
|
if err != nil {
|
||||||
if err != nil {
|
utils.Debugf("Error reading logs (stderr): %s", err)
|
||||||
utils.Debugf("Error reading logs (stderr): %s", err)
|
} else if _, err := io.Copy(out, cLog); err != nil {
|
||||||
} else if _, err := io.Copy(out, cLog); err != nil {
|
utils.Debugf("Error streaming logs (stderr): %s", err)
|
||||||
utils.Debugf("Error streaming logs (stderr): %s", err)
|
}
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
utils.Debugf("Error reading logs (json): %s", err)
|
||||||
|
} else {
|
||||||
|
dec := json.NewDecoder(cLog)
|
||||||
|
for {
|
||||||
|
var l utils.JSONLog
|
||||||
|
if err := dec.Decode(&l); err == io.EOF {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
utils.Debugf("Error streaming logs: %s", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if (l.Stream == "stdout" && stdout) || (l.Stream == "stderr" && stderr) {
|
||||||
|
fmt.Fprintf(out, "%s", l.Log)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,30 +248,54 @@ func (r *bufReader) Close() error {
|
||||||
|
|
||||||
type WriteBroadcaster struct {
|
type WriteBroadcaster struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
writers map[io.WriteCloser]struct{}
|
buf *bytes.Buffer
|
||||||
|
writers map[StreamWriter]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser) {
|
type StreamWriter struct {
|
||||||
|
wc io.WriteCloser
|
||||||
|
stream string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
w.writers[writer] = struct{}{}
|
sw := StreamWriter{wc: writer, stream: stream}
|
||||||
|
w.writers[sw] = true
|
||||||
w.Unlock()
|
w.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Is that function used?
|
type JSONLog struct {
|
||||||
// FIXME: This relies on the concrete writer type used having equality operator
|
Log string `json:"log,omitempty"`
|
||||||
func (w *WriteBroadcaster) RemoveWriter(writer io.WriteCloser) {
|
Stream string `json:"stream,omitempty"`
|
||||||
w.Lock()
|
Created time.Time `json:"time"`
|
||||||
delete(w.writers, writer)
|
|
||||||
w.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
|
func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
for writer := range w.writers {
|
w.buf.Write(p)
|
||||||
if n, err := writer.Write(p); err != nil || n != len(p) {
|
for sw := range w.writers {
|
||||||
|
lp := p
|
||||||
|
if sw.stream != "" {
|
||||||
|
lp = nil
|
||||||
|
for {
|
||||||
|
line, err := w.buf.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
w.buf.Write([]byte(line))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(&JSONLog{Log: line, Stream: sw.stream, Created: time.Now()})
|
||||||
|
if err != nil {
|
||||||
|
// On error, evict the writer
|
||||||
|
delete(w.writers, sw)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lp = append(lp, b...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if n, err := sw.wc.Write(lp); err != nil || n != len(lp) {
|
||||||
// On error, evict the writer
|
// On error, evict the writer
|
||||||
delete(w.writers, writer)
|
delete(w.writers, sw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
|
@ -280,15 +304,15 @@ func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
|
||||||
func (w *WriteBroadcaster) CloseWriters() error {
|
func (w *WriteBroadcaster) CloseWriters() error {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
for writer := range w.writers {
|
for sw := range w.writers {
|
||||||
writer.Close()
|
sw.wc.Close()
|
||||||
}
|
}
|
||||||
w.writers = make(map[io.WriteCloser]struct{})
|
w.writers = make(map[StreamWriter]bool)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWriteBroadcaster() *WriteBroadcaster {
|
func NewWriteBroadcaster() *WriteBroadcaster {
|
||||||
return &WriteBroadcaster{writers: make(map[io.WriteCloser]struct{})}
|
return &WriteBroadcaster{writers: make(map[StreamWriter]bool), buf: bytes.NewBuffer(nil)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTotalUsedFds() int {
|
func GetTotalUsedFds() int {
|
||||||
|
|
Loading…
Reference in a new issue