瀏覽代碼

Merge pull request #22982 from nalind/log-newlines

Improve logging of long log lines
Alexander Morozov 9 年之前
父節點
當前提交
e43033bc23

+ 2 - 1
daemon/logger/awslogs/cloudwatchlogs.go

@@ -165,7 +165,8 @@ func (l *logStream) Log(msg *logger.Message) error {
 	l.lock.RLock()
 	defer l.lock.RUnlock()
 	if !l.closed {
-		l.messages <- msg
+		// buffer up the data, making sure to copy the Line data
+		l.messages <- logger.CopyMessage(msg)
 	}
 	return nil
 }

+ 66 - 16
daemon/logger/copier.go

@@ -1,7 +1,6 @@
 package logger
 
 import (
-	"bufio"
 	"bytes"
 	"io"
 	"sync"
@@ -10,8 +9,13 @@ import (
 	"github.com/Sirupsen/logrus"
 )
 
+const (
+	bufSize  = 16 * 1024
+	readSize = 2 * 1024
+)
+
 // Copier can copy logs from specified sources to Logger and attach Timestamp.
-// Writes are concurrent, so you need implement some sync in your logger
+// Writes are concurrent, so you need implement some sync in your logger.
 type Copier struct {
 	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
 	srcs      map[string]io.Reader
@@ -40,30 +44,76 @@ func (c *Copier) Run() {
 
 func (c *Copier) copySrc(name string, src io.Reader) {
 	defer c.copyJobs.Done()
-	reader := bufio.NewReader(src)
+	buf := make([]byte, bufSize)
+	n := 0
+	eof := false
+	msg := &Message{Source: name}
 
 	for {
 		select {
 		case <-c.closed:
 			return
 		default:
-			line, err := reader.ReadBytes('\n')
-			line = bytes.TrimSuffix(line, []byte{'\n'})
-
-			// ReadBytes can return full or partial output even when it failed.
-			// e.g. it can return a full entry and EOF.
-			if err == nil || len(line) > 0 {
-				if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
-					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
-				}
+			// Work out how much more data we are okay with reading this time.
+			upto := n + readSize
+			if upto > cap(buf) {
+				upto = cap(buf)
 			}
-
-			if err != nil {
-				if err != io.EOF {
-					logrus.Errorf("Error scanning log stream: %s", err)
+			// Try to read that data.
+			if upto > n {
+				read, err := src.Read(buf[n:upto])
+				if err != nil {
+					if err != io.EOF {
+						logrus.Errorf("Error scanning log stream: %s", err)
+						return
+					}
+					eof = true
 				}
+				n += read
+			}
+			// If we have no data to log, and there's no more coming, we're done.
+			if n == 0 && eof {
 				return
 			}
+			// Break up the data that we've buffered up into lines, and log each in turn.
+			p := 0
+			for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
+				msg.Line = buf[p : p+q]
+				msg.Timestamp = time.Now().UTC()
+				msg.Partial = false
+				select {
+				case <-c.closed:
+					return
+				default:
+					if logErr := c.dst.Log(msg); logErr != nil {
+						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
+					}
+				}
+				p += q + 1
+			}
+			// If there's no more coming, or the buffer is full but
+			// has no newlines, log whatever we haven't logged yet,
+			// noting that it's a partial log line.
+			if eof || (p == 0 && n == len(buf)) {
+				if p < n {
+					msg.Line = buf[p:n]
+					msg.Timestamp = time.Now().UTC()
+					msg.Partial = true
+					if logErr := c.dst.Log(msg); logErr != nil {
+						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
+					}
+					p = 0
+					n = 0
+				}
+				if eof {
+					return
+				}
+			}
+			// Move any unlogged data to the front of the buffer in preparation for another read.
+			if p > 0 {
+				copy(buf[0:], buf[p:n])
+				n -= p
+			}
 		}
 	}
 }

+ 92 - 0
daemon/logger/copier_test.go

@@ -3,7 +3,9 @@ package logger
 import (
 	"bytes"
 	"encoding/json"
+	"fmt"
 	"io"
+	"os"
 	"sync"
 	"testing"
 	"time"
@@ -116,3 +118,93 @@ func TestCopierSlow(t *testing.T) {
 	case <-wait:
 	}
 }
+
+type BenchmarkLoggerDummy struct {
+}
+
+func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil }
+
+func (l *BenchmarkLoggerDummy) Close() error { return nil }
+
+func (l *BenchmarkLoggerDummy) Name() string { return "dummy" }
+
+func BenchmarkCopier64(b *testing.B) {
+	benchmarkCopier(b, 1<<6)
+}
+func BenchmarkCopier128(b *testing.B) {
+	benchmarkCopier(b, 1<<7)
+}
+func BenchmarkCopier256(b *testing.B) {
+	benchmarkCopier(b, 1<<8)
+}
+func BenchmarkCopier512(b *testing.B) {
+	benchmarkCopier(b, 1<<9)
+}
+func BenchmarkCopier1K(b *testing.B) {
+	benchmarkCopier(b, 1<<10)
+}
+func BenchmarkCopier2K(b *testing.B) {
+	benchmarkCopier(b, 1<<11)
+}
+func BenchmarkCopier4K(b *testing.B) {
+	benchmarkCopier(b, 1<<12)
+}
+func BenchmarkCopier8K(b *testing.B) {
+	benchmarkCopier(b, 1<<13)
+}
+func BenchmarkCopier16K(b *testing.B) {
+	benchmarkCopier(b, 1<<14)
+}
+func BenchmarkCopier32K(b *testing.B) {
+	benchmarkCopier(b, 1<<15)
+}
+func BenchmarkCopier64K(b *testing.B) {
+	benchmarkCopier(b, 1<<16)
+}
+func BenchmarkCopier128K(b *testing.B) {
+	benchmarkCopier(b, 1<<17)
+}
+func BenchmarkCopier256K(b *testing.B) {
+	benchmarkCopier(b, 1<<18)
+}
+
+func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader {
+	r, w, err := os.Pipe()
+	if err != nil {
+		b.Fatal(err)
+		return nil
+	}
+	go func() {
+		for i := 0; i < iterations; i++ {
+			time.Sleep(delay)
+			if n, err := w.Write(buf); err != nil || n != len(buf) {
+				if err != nil {
+					b.Fatal(err)
+				}
+				b.Fatal(fmt.Errorf("short write"))
+			}
+		}
+		w.Close()
+	}()
+	return r
+}
+
+func benchmarkCopier(b *testing.B, length int) {
+	b.StopTimer()
+	buf := []byte{'A'}
+	for len(buf) < length {
+		buf = append(buf, buf...)
+	}
+	buf = append(buf[:length-1], []byte{'\n'}...)
+	b.StartTimer()
+	for i := 0; i < b.N; i++ {
+		c := NewCopier(
+			map[string]io.Reader{
+				"buffer": piped(b, 10, time.Nanosecond, buf),
+			},
+			&BenchmarkLoggerDummy{})
+		c.Run()
+		c.Wait()
+		c.Close()
+	}
+}

+ 9 - 2
daemon/logger/journald/journald.go

@@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error {
 }
 
 func (s *journald) Log(msg *logger.Message) error {
+	vars := map[string]string{}
+	for k, v := range s.vars {
+		vars[k] = v
+	}
+	if msg.Partial {
+		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
+	}
 	if msg.Source == "stderr" {
-		return journal.Send(string(msg.Line), journal.PriErr, s.vars)
+		return journal.Send(string(msg.Line), journal.PriErr, vars)
 	}
-	return journal.Send(string(msg.Line), journal.PriInfo, s.vars)
+	return journal.Send(string(msg.Line), journal.PriInfo, vars)
 }
 
 func (s *journald) Name() string {

+ 11 - 4
daemon/logger/journald/read.go

@@ -12,11 +12,15 @@ package journald
 // #include <time.h>
 // #include <unistd.h>
 //
-//static int get_message(sd_journal *j, const char **msg, size_t *length)
+//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
 //{
 //	int rc;
+//	size_t plength;
 //	*msg = NULL;
 //	*length = 0;
+//	plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
+//	rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
+//	*partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
 //	rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
 //	if (rc == 0) {
 //		if (*length > 8) {
@@ -167,7 +171,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
 	var msg, data, cursor *C.char
 	var length C.size_t
 	var stamp C.uint64_t
-	var priority C.int
+	var priority, partial C.int
 
 	// Walk the journal from here forward until we run out of new entries.
 drain:
@@ -183,7 +187,7 @@ drain:
 			}
 		}
 		// Read and send the logged message, if there is one to read.
-		i := C.get_message(j, &msg, &length)
+		i := C.get_message(j, &msg, &length, &partial)
 		if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
 			// Read the entry's timestamp.
 			if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
@@ -191,7 +195,10 @@ drain:
 			}
 			// Set up the time and text of the entry.
 			timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
-			line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...)
+			line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
+			if partial == 0 {
+				line = append(line, "\n"...)
+			}
 			// Recover the stream name by mapping
 			// from the journal priority back to
 			// the stream that we would have

+ 5 - 1
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
 		return err
 	}
 	l.mu.Lock()
+	logline := msg.Line
+	if !msg.Partial {
+		logline = append(msg.Line, '\n')
+	}
 	err = (&jsonlog.JSONLogs{
-		Log:      append(msg.Line, '\n'),
+		Log:      logline,
 		Stream:   msg.Source,
 		Created:  timestamp,
 		RawAttrs: l.extra,

+ 22 - 1
daemon/logger/logger.go

@@ -26,12 +26,33 @@ const (
 	logWatcherBufferSize = 4096
 )
 
-// Message is datastructure that represents record from some container.
+// Message is datastructure that represents piece of output produced by some
+// container.  The Line member is a slice of an array whose contents can be
+// changed after a log driver's Log() method returns.
 type Message struct {
 	Line      []byte
 	Source    string
 	Timestamp time.Time
 	Attrs     LogAttributes
+	Partial   bool
+}
+
+// CopyMessage creates a copy of the passed-in Message which will remain
+// unchanged if the original is changed.  Log drivers which buffer Messages
+// rather than dispatching them during their Log() method should use this
+// function to obtain a Message whose Line member's contents won't change.
+func CopyMessage(msg *Message) *Message {
+	m := new(Message)
+	m.Line = make([]byte, len(msg.Line))
+	copy(m.Line, msg.Line)
+	m.Source = msg.Source
+	m.Timestamp = msg.Timestamp
+	m.Partial = msg.Partial
+	m.Attrs = make(LogAttributes)
+	for k, v := range m.Attrs {
+		m.Attrs[k] = v
+	}
+	return m
 }
 
 // LogAttributes is used to hold the extra attributes available in the log message