Преглед изворни кода

Merge pull request #19372 from cloudflare/fix-log-copier

only close LogDriver after LogCopier is done
Brian Goff пре 9 година
родитељ
комит
3044a08326
3 измењених фајлова са 71 додато и 17 уклоњено
  1. 3 0
      container/monitor.go
  2. 31 16
      daemon/logger/copier.go
  3. 37 1
      daemon/logger/copier_test.go

+ 3 - 0
container/monitor.go

@@ -369,6 +369,9 @@ func (m *containerMonitor) resetContainer(lock bool) {
 			select {
 			case <-time.After(loggerCloseTimeout):
 				logrus.Warnf("Logger didn't exit in time: logs may be truncated")
+				container.LogCopier.Close()
+				// always waits for the LogCopier to finished before closing
+				<-exit
 			case <-exit:
 			}
 		}

+ 31 - 16
daemon/logger/copier.go

@@ -20,14 +20,16 @@ type Copier struct {
 	srcs     map[string]io.Reader
 	dst      Logger
 	copyJobs sync.WaitGroup
+	closed   chan struct{}
 }
 
 // NewCopier creates a new Copier
 func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier {
 	return &Copier{
-		cid:  cid,
-		srcs: srcs,
-		dst:  dst,
+		cid:    cid,
+		srcs:   srcs,
+		dst:    dst,
+		closed: make(chan struct{}),
 	}
 }
 
@@ -44,24 +46,28 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 	reader := bufio.NewReader(src)
 
 	for {
-		line, err := reader.ReadBytes('\n')
-		line = bytes.TrimSuffix(line, []byte{'\n'})
+		select {
+		case <-c.closed:
+			return
+		default:
+			line, err := reader.ReadBytes('\n')
+			line = bytes.TrimSuffix(line, []byte{'\n'})
 
-		// ReadBytes can return full or partial output even when it failed.
-		// e.g. it can return a full entry and EOF.
-		if err == nil || len(line) > 0 {
-			if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
-				logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
+			// ReadBytes can return full or partial output even when it failed.
+			// e.g. it can return a full entry and EOF.
+			if err == nil || len(line) > 0 {
+				if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
+					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
+				}
 			}
-		}
 
-		if err != nil {
-			if err != io.EOF {
-				logrus.Errorf("Error scanning log stream: %s", err)
+			if err != nil {
+				if err != io.EOF {
+					logrus.Errorf("Error scanning log stream: %s", err)
+				}
+				return
 			}
-			return
 		}
-
 	}
 }
 
@@ -69,3 +75,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 func (c *Copier) Wait() {
 	c.copyJobs.Wait()
 }
+
+// Close closes the copier
+func (c *Copier) Close() {
+	select {
+	case <-c.closed:
+	default:
+		close(c.closed)
+	}
+}

+ 37 - 1
daemon/logger/copier_test.go

@@ -10,9 +10,15 @@ import (
 
 type TestLoggerJSON struct {
 	*json.Encoder
+	delay time.Duration
 }
 
-func (l *TestLoggerJSON) Log(m *Message) error { return l.Encode(m) }
+func (l *TestLoggerJSON) Log(m *Message) error {
+	if l.delay > 0 {
+		time.Sleep(l.delay)
+	}
+	return l.Encode(m)
+}
 
 func (l *TestLoggerJSON) Close() error { return nil }
 
@@ -94,3 +100,33 @@ func TestCopier(t *testing.T) {
 		}
 	}
 }
+
+func TestCopierSlow(t *testing.T) {
+	stdoutLine := "Line that thinks that it is log line from docker stdout"
+	var stdout bytes.Buffer
+	for i := 0; i < 30; i++ {
+		if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	var jsonBuf bytes.Buffer
+	//encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
+	jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
+
+	cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657"
+	c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog)
+	c.Run()
+	wait := make(chan struct{})
+	go func() {
+		c.Wait()
+		close(wait)
+	}()
+	<-time.After(150 * time.Millisecond)
+	c.Close()
+	select {
+	case <-time.After(200 * time.Millisecond):
+		t.Fatalf("failed to exit in time after the copier is closed")
+	case <-wait:
+	}
+}