diff --git a/daemon/logs.go b/daemon/logs.go index a5fac2c3d9..6c9373f737 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -7,6 +7,7 @@ import ( "io" "os" "strconv" + "sync" log "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" @@ -112,24 +113,36 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status { } if follow && container.IsRunning() { errors := make(chan error, 2) + wg := sync.WaitGroup{} + if stdout { + wg.Add(1) stdoutPipe := container.StdoutLogPipe() defer stdoutPipe.Close() go func() { errors <- jsonlog.WriteLog(stdoutPipe, job.Stdout, format) + wg.Done() }() } if stderr { + wg.Add(1) stderrPipe := container.StderrLogPipe() defer stderrPipe.Close() go func() { errors <- jsonlog.WriteLog(stderrPipe, job.Stderr, format) + wg.Done() }() } - err := <-errors - if err != nil { - log.Errorf("%s", err) + + wg.Wait() + close(errors) + + for err := range errors { + if err != nil { + log.Errorf("%s", err) + } } + } return engine.StatusOK } diff --git a/integration-cli/docker_cli_logs_test.go b/integration-cli/docker_cli_logs_test.go index d6d3f9320f..b86a50480d 100644 --- a/integration-cli/docker_cli_logs_test.go +++ b/integration-cli/docker_cli_logs_test.go @@ -284,3 +284,54 @@ func TestLogsFollowStopped(t *testing.T) { deleteContainer(cleanedContainerID) logDone("logs - logs follow stopped container") } + +// Regression test for #8832 +func TestLogsFollowSlowStdoutConsumer(t *testing.T) { + runCmd := exec.Command(dockerBinary, "run", "-d", "busybox", "/bin/sh", "-c", `usleep 200000;yes X | head -c 200000`) + + out, _, _, err := runCommandWithStdoutStderr(runCmd) + if err != nil { + t.Fatalf("run failed with errors: %s, %v", out, err) + } + + cleanedContainerID := stripTrailingCharacters(out) + defer deleteContainer(cleanedContainerID) + + stopSlowRead := make(chan bool) + + go func() { + exec.Command(dockerBinary, "wait", cleanedContainerID).Run() + stopSlowRead <- true + }() + + logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID) + + stdout, err := logCmd.StdoutPipe() + if err != nil { + t.Fatal(err) + } + + if err := logCmd.Start(); err != nil { + t.Fatal(err) + } + + // First read slowly + bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead) + if err != nil { + t.Fatal(err) + } + + // After the container has finished we can continue reading fast + bytes2, err := consumeWithSpeed(stdout, 32*1024, 0, nil) + if err != nil { + t.Fatal(err) + } + + actual := bytes1 + bytes2 + expected := 200000 + if actual != expected { + t.Fatalf("Invalid bytes read: %d, expected %d", actual, expected) + } + + logDone("logs - follow slow consumer") +} diff --git a/integration-cli/docker_cli_run_test.go b/integration-cli/docker_cli_run_test.go index 072c6f6b44..95cb0c86d1 100644 --- a/integration-cli/docker_cli_run_test.go +++ b/integration-cli/docker_cli_run_test.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "fmt" - "io" "io/ioutil" "net" "os" @@ -2462,7 +2461,7 @@ func TestRunSlowStdoutConsumer(t *testing.T) { if err := c.Start(); err != nil { t.Fatal(err) } - n, err := consumeSlow(stdout, 10000, 5*time.Millisecond) + n, err := consumeWithSpeed(stdout, 10000, 5*time.Millisecond, nil) if err != nil { t.Fatal(err) } @@ -2474,19 +2473,3 @@ func TestRunSlowStdoutConsumer(t *testing.T) { logDone("run - slow consumer") } - -func consumeSlow(reader io.Reader, chunkSize int, interval time.Duration) (n int, err error) { - buffer := make([]byte, chunkSize) - for { - var readBytes int - readBytes, err = reader.Read(buffer) - n += readBytes - if err != nil { - if err == io.EOF { - err = nil - } - return - } - time.Sleep(interval) - } -} diff --git a/integration-cli/utils.go b/integration-cli/utils.go index e99e45591b..05c27dc5ac 100644 --- a/integration-cli/utils.go +++ b/integration-cli/utils.go @@ -253,3 +253,26 @@ func makeRandomString(n int) string { } return string(b) } + +// Reads chunkSize bytes from reader after every interval. +// Returns total read bytes. +func consumeWithSpeed(reader io.Reader, chunkSize int, interval time.Duration, stop chan bool) (n int, err error) { + buffer := make([]byte, chunkSize) + for { + select { + case <-stop: + return + default: + var readBytes int + readBytes, err = reader.Read(buffer) + n += readBytes + if err != nil { + if err == io.EOF { + err = nil + } + return + } + time.Sleep(interval) + } + } +}