Bläddra i källkod

Merge pull request #8866 from tonistiigi/fix-8832-logs-panic

Fix panic on slow log consumer.
Jessie Frazelle 10 år sedan
förälder
incheckning
aca253d6d0

+ 16 - 3
daemon/logs.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"io"
 	"os"
 	"os"
 	"strconv"
 	"strconv"
+	"sync"
 
 
 	log "github.com/Sirupsen/logrus"
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/engine"
 	"github.com/docker/docker/engine"
@@ -112,24 +113,36 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
 	}
 	}
 	if follow && container.IsRunning() {
 	if follow && container.IsRunning() {
 		errors := make(chan error, 2)
 		errors := make(chan error, 2)
+		wg := sync.WaitGroup{}
+
 		if stdout {
 		if stdout {
+			wg.Add(1)
 			stdoutPipe := container.StdoutLogPipe()
 			stdoutPipe := container.StdoutLogPipe()
 			defer stdoutPipe.Close()
 			defer stdoutPipe.Close()
 			go func() {
 			go func() {
 				errors <- jsonlog.WriteLog(stdoutPipe, job.Stdout, format)
 				errors <- jsonlog.WriteLog(stdoutPipe, job.Stdout, format)
+				wg.Done()
 			}()
 			}()
 		}
 		}
 		if stderr {
 		if stderr {
+			wg.Add(1)
 			stderrPipe := container.StderrLogPipe()
 			stderrPipe := container.StderrLogPipe()
 			defer stderrPipe.Close()
 			defer stderrPipe.Close()
 			go func() {
 			go func() {
 				errors <- jsonlog.WriteLog(stderrPipe, job.Stderr, format)
 				errors <- jsonlog.WriteLog(stderrPipe, job.Stderr, format)
+				wg.Done()
 			}()
 			}()
 		}
 		}
-		err := <-errors
-		if err != nil {
-			log.Errorf("%s", err)
+
+		wg.Wait()
+		close(errors)
+
+		for err := range errors {
+			if err != nil {
+				log.Errorf("%s", err)
+			}
 		}
 		}
+
 	}
 	}
 	return engine.StatusOK
 	return engine.StatusOK
 }
 }

+ 51 - 0
integration-cli/docker_cli_logs_test.go

@@ -284,3 +284,54 @@ func TestLogsFollowStopped(t *testing.T) {
 	deleteContainer(cleanedContainerID)
 	deleteContainer(cleanedContainerID)
 	logDone("logs - logs follow stopped container")
 	logDone("logs - logs follow stopped container")
 }
 }
+
+// Regression test for #8832
+func TestLogsFollowSlowStdoutConsumer(t *testing.T) {
+	runCmd := exec.Command(dockerBinary, "run", "-d", "busybox", "/bin/sh", "-c", `usleep 200000;yes X | head -c 200000`)
+
+	out, _, _, err := runCommandWithStdoutStderr(runCmd)
+	if err != nil {
+		t.Fatalf("run failed with errors: %s, %v", out, err)
+	}
+
+	cleanedContainerID := stripTrailingCharacters(out)
+	defer deleteContainer(cleanedContainerID)
+
+	stopSlowRead := make(chan bool)
+
+	go func() {
+		exec.Command(dockerBinary, "wait", cleanedContainerID).Run()
+		stopSlowRead <- true
+	}()
+
+	logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
+
+	stdout, err := logCmd.StdoutPipe()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := logCmd.Start(); err != nil {
+		t.Fatal(err)
+	}
+
+	// First read slowly
+	bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// After the container has finished we can continue reading fast
+	bytes2, err := consumeWithSpeed(stdout, 32*1024, 0, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	actual := bytes1 + bytes2
+	expected := 200000
+	if actual != expected {
+		t.Fatalf("Invalid bytes read: %d, expected %d", actual, expected)
+	}
+
+	logDone("logs - follow slow consumer")
+}

+ 1 - 18
integration-cli/docker_cli_run_test.go

@@ -4,7 +4,6 @@ import (
 	"bufio"
 	"bufio"
 	"bytes"
 	"bytes"
 	"fmt"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"os"
 	"os"
@@ -2462,7 +2461,7 @@ func TestRunSlowStdoutConsumer(t *testing.T) {
 	if err := c.Start(); err != nil {
 	if err := c.Start(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	n, err := consumeSlow(stdout, 10000, 5*time.Millisecond)
+	n, err := consumeWithSpeed(stdout, 10000, 5*time.Millisecond, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -2474,19 +2473,3 @@ func TestRunSlowStdoutConsumer(t *testing.T) {
 
 
 	logDone("run - slow consumer")
 	logDone("run - slow consumer")
 }
 }
-
-func consumeSlow(reader io.Reader, chunkSize int, interval time.Duration) (n int, err error) {
-	buffer := make([]byte, chunkSize)
-	for {
-		var readBytes int
-		readBytes, err = reader.Read(buffer)
-		n += readBytes
-		if err != nil {
-			if err == io.EOF {
-				err = nil
-			}
-			return
-		}
-		time.Sleep(interval)
-	}
-}

+ 23 - 0
integration-cli/utils.go

@@ -253,3 +253,26 @@ func makeRandomString(n int) string {
 	}
 	}
 	return string(b)
 	return string(b)
 }
 }
+
+// Reads chunkSize bytes from reader after every interval.
+// Returns total read bytes.
+func consumeWithSpeed(reader io.Reader, chunkSize int, interval time.Duration, stop chan bool) (n int, err error) {
+	buffer := make([]byte, chunkSize)
+	for {
+		select {
+		case <-stop:
+			return
+		default:
+			var readBytes int
+			readBytes, err = reader.Read(buffer)
+			n += readBytes
+			if err != nil {
+				if err == io.EOF {
+					err = nil
+				}
+				return
+			}
+			time.Sleep(interval)
+		}
+	}
+}