Jelajahi Sumber

Merge pull request #35831 from anusha-ragunathan/splunk_partial

 Improve partial message support in logger
Anusha Ragunathan 7 tahun lalu
induk
melakukan
9689059b56

+ 15 - 5
api/types/backend/backend.go

@@ -25,17 +25,27 @@ type ContainerAttachConfig struct {
 	MuxStreams bool
 	MuxStreams bool
 }
 }
 
 
+// PartialLogMetaData provides meta data for a partial log message. Messages
+// exceeding a predefined size are split into chunks with this metadata. The
+// expectation is for the logger endpoints to assemble the chunks using this
+// metadata.
+type PartialLogMetaData struct {
+	Last    bool   //true if this message is last of a partial
+	ID      string // identifies group of messages comprising a single record
+	Ordinal int    // ordering of message in partial group
+}
+
 // LogMessage is datastructure that represents piece of output produced by some
 // LogMessage is datastructure that represents piece of output produced by some
 // container.  The Line member is a slice of an array whose contents can be
 // container.  The Line member is a slice of an array whose contents can be
 // changed after a log driver's Log() method returns.
 // changed after a log driver's Log() method returns.
 // changes to this struct need to be reflect in the reset method in
 // changes to this struct need to be reflect in the reset method in
 // daemon/logger/logger.go
 // daemon/logger/logger.go
 type LogMessage struct {
 type LogMessage struct {
-	Line      []byte
-	Source    string
-	Timestamp time.Time
-	Attrs     []LogAttr
-	Partial   bool
+	Line         []byte
+	Source       string
+	Timestamp    time.Time
+	Attrs        []LogAttr
+	PLogMetaData *PartialLogMetaData
 
 
 	// Err is an error associated with a message. Completeness of a message
 	// Err is an error associated with a message. Completeness of a message
 	// with Err is not expected, tho it may be partially complete (fields may
 	// with Err is not expected, tho it may be partially complete (fields may

+ 1 - 1
daemon/logger/adapter.go

@@ -37,7 +37,7 @@ func (a *pluginAdapter) Log(msg *Message) error {
 
 
 	a.buf.Line = msg.Line
 	a.buf.Line = msg.Line
 	a.buf.TimeNano = msg.Timestamp.UnixNano()
 	a.buf.TimeNano = msg.Timestamp.UnixNano()
-	a.buf.Partial = msg.Partial
+	a.buf.Partial = (msg.PLogMetaData != nil)
 	a.buf.Source = msg.Source
 	a.buf.Source = msg.Source
 
 
 	err := a.enc.Encode(&a.buf)
 	err := a.enc.Encode(&a.buf)

+ 39 - 3
daemon/logger/copier.go

@@ -6,6 +6,8 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	types "github.com/docker/docker/api/types/backend"
+	"github.com/docker/docker/pkg/stringid"
 	"github.com/sirupsen/logrus"
 	"github.com/sirupsen/logrus"
 )
 )
 
 
@@ -58,6 +60,11 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 
 
 	n := 0
 	n := 0
 	eof := false
 	eof := false
+	var partialid string
+	var partialTS time.Time
+	var ordinal int
+	firstPartial := true
+	hasMorePartial := false
 
 
 	for {
 	for {
 		select {
 		select {
@@ -87,6 +94,7 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 			}
 			}
 			// Break up the data that we've buffered up into lines, and log each in turn.
 			// Break up the data that we've buffered up into lines, and log each in turn.
 			p := 0
 			p := 0
+
 			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
 			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
 				select {
 				select {
 				case <-c.closed:
 				case <-c.closed:
@@ -94,9 +102,23 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 				default:
 				default:
 					msg := NewMessage()
 					msg := NewMessage()
 					msg.Source = name
 					msg.Source = name
-					msg.Timestamp = time.Now().UTC()
 					msg.Line = append(msg.Line, buf[p:p+q]...)
 					msg.Line = append(msg.Line, buf[p:p+q]...)
 
 
+					if hasMorePartial {
+						msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
+
+						// reset
+						partialid = ""
+						ordinal = 0
+						firstPartial = true
+						hasMorePartial = false
+					}
+					if msg.PLogMetaData == nil {
+						msg.Timestamp = time.Now().UTC()
+					} else {
+						msg.Timestamp = partialTS
+					}
+
 					if logErr := c.dst.Log(msg); logErr != nil {
 					if logErr := c.dst.Log(msg); logErr != nil {
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 					}
 					}
@@ -110,9 +132,23 @@ func (c *Copier) copySrc(name string, src io.Reader) {
 				if p < n {
 				if p < n {
 					msg := NewMessage()
 					msg := NewMessage()
 					msg.Source = name
 					msg.Source = name
-					msg.Timestamp = time.Now().UTC()
 					msg.Line = append(msg.Line, buf[p:n]...)
 					msg.Line = append(msg.Line, buf[p:n]...)
-					msg.Partial = true
+
+					// Generate unique partialID for first partial. Use it across partials.
+					// Record timestamp for first partial. Use it across partials.
+					// Initialize Ordinal for first partial. Increment it across partials.
+					if firstPartial {
+						msg.Timestamp = time.Now().UTC()
+						partialTS = msg.Timestamp
+						partialid = stringid.GenerateRandomID()
+						ordinal = 1
+						firstPartial = false
+					} else {
+						msg.Timestamp = partialTS
+					}
+					msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
+					ordinal++
+					hasMorePartial = true
 
 
 					if logErr := c.dst.Log(msg); logErr != nil {
 					if logErr := c.dst.Log(msg); logErr != nil {
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)

+ 135 - 0
daemon/logger/copier_test.go

@@ -258,6 +258,141 @@ func TestCopierWithSized(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func checkIdentical(t *testing.T, msg Message, expectedID string, expectedTS time.Time) {
+	if msg.PLogMetaData.ID != expectedID {
+		t.Fatalf("IDs are not he same across partials. Expected: %s Received: %s",
+			expectedID, msg.PLogMetaData.ID)
+	}
+	if msg.Timestamp != expectedTS {
+		t.Fatalf("Timestamps are not the same across partials. Expected: %v Received: %v",
+			expectedTS.Format(time.UnixDate), msg.Timestamp.Format(time.UnixDate))
+	}
+}
+
+// Have long lines and make sure that it comes out with PartialMetaData
+func TestCopierWithPartial(t *testing.T) {
+	stdoutLongLine := strings.Repeat("a", defaultBufSize)
+	stderrLongLine := strings.Repeat("b", defaultBufSize)
+	stdoutTrailingLine := "stdout trailing line"
+	stderrTrailingLine := "stderr trailing line"
+	normalStr := "This is an impartial message :)"
+
+	var stdout bytes.Buffer
+	var stderr bytes.Buffer
+	var normalMsg bytes.Buffer
+
+	for i := 0; i < 3; i++ {
+		if _, err := stdout.WriteString(stdoutLongLine); err != nil {
+			t.Fatal(err)
+		}
+		if _, err := stderr.WriteString(stderrLongLine); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	if _, err := stdout.WriteString(stdoutTrailingLine + "\n"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := stderr.WriteString(stderrTrailingLine + "\n"); err != nil {
+		t.Fatal(err)
+	}
+	if _, err := normalMsg.WriteString(normalStr + "\n"); err != nil {
+		t.Fatal(err)
+	}
+
+	var jsonBuf bytes.Buffer
+
+	jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
+
+	c := NewCopier(
+		map[string]io.Reader{
+			"stdout": &stdout,
+			"normal": &normalMsg,
+			"stderr": &stderr,
+		},
+		jsonLog)
+	c.Run()
+	wait := make(chan struct{})
+	go func() {
+		c.Wait()
+		close(wait)
+	}()
+	select {
+	case <-time.After(1 * time.Second):
+		t.Fatal("Copier failed to do its work in 1 second")
+	case <-wait:
+	}
+
+	dec := json.NewDecoder(&jsonBuf)
+	expectedMsgs := 9
+	recvMsgs := 0
+	var expectedPartID1, expectedPartID2 string
+	var expectedTS1, expectedTS2 time.Time
+
+	for {
+		var msg Message
+
+		if err := dec.Decode(&msg); err != nil {
+			if err == io.EOF {
+				break
+			}
+			t.Fatal(err)
+		}
+		if msg.Source != "stdout" && msg.Source != "stderr" && msg.Source != "normal" {
+			t.Fatalf("Wrong Source: %q, should be %q or %q or %q", msg.Source, "stdout", "stderr", "normal")
+		}
+
+		if msg.Source == "stdout" {
+			if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
+				t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
+			}
+
+			if msg.PLogMetaData.ID == "" {
+				t.Fatalf("Expected partial metadata. Got nothing")
+			}
+
+			if msg.PLogMetaData.Ordinal == 1 {
+				expectedPartID1 = msg.PLogMetaData.ID
+				expectedTS1 = msg.Timestamp
+			} else {
+				checkIdentical(t, msg, expectedPartID1, expectedTS1)
+			}
+			if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
+				t.Fatalf("Last is not set for last chunk")
+			}
+		}
+
+		if msg.Source == "stderr" {
+			if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
+				t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
+			}
+
+			if msg.PLogMetaData.ID == "" {
+				t.Fatalf("Expected partial metadata. Got nothing")
+			}
+
+			if msg.PLogMetaData.Ordinal == 1 {
+				expectedPartID2 = msg.PLogMetaData.ID
+				expectedTS2 = msg.Timestamp
+			} else {
+				checkIdentical(t, msg, expectedPartID2, expectedTS2)
+			}
+			if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
+				t.Fatalf("Last is not set for last chunk")
+			}
+		}
+
+		if msg.Source == "normal" && msg.PLogMetaData != nil {
+			t.Fatalf("Normal messages should not have PartialLogMetaData")
+		}
+		recvMsgs++
+	}
+
+	if expectedMsgs != recvMsgs {
+		t.Fatalf("Expected msgs: %d Recv msgs: %d", expectedMsgs, recvMsgs)
+	}
+}
+
 type BenchmarkLoggerDummy struct {
 type BenchmarkLoggerDummy struct {
 }
 }
 
 

+ 1 - 1
daemon/logger/journald/journald.go

@@ -108,7 +108,7 @@ func (s *journald) Log(msg *logger.Message) error {
 	for k, v := range s.vars {
 	for k, v := range s.vars {
 		vars[k] = v
 		vars[k] = v
 	}
 	}
-	if msg.Partial {
+	if msg.PLogMetaData != nil {
 		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
 		vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
 	}
 	}
 
 

+ 1 - 1
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -132,7 +132,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
 
 
 func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
 func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
 	logLine := msg.Line
 	logLine := msg.Line
-	if !msg.Partial {
+	if msg.PLogMetaData == nil || (msg.PLogMetaData != nil && msg.PLogMetaData.Last) {
 		logLine = append(msg.Line, '\n')
 		logLine = append(msg.Line, '\n')
 	}
 	}
 	err := (&jsonlog.JSONLogs{
 	err := (&jsonlog.JSONLogs{

+ 1 - 1
daemon/logger/logger.go

@@ -60,7 +60,7 @@ func (m *Message) reset() {
 	m.Line = m.Line[:0]
 	m.Line = m.Line[:0]
 	m.Source = ""
 	m.Source = ""
 	m.Attrs = nil
 	m.Attrs = nil
-	m.Partial = false
+	m.PLogMetaData = nil
 
 
 	m.Err = nil
 	m.Err = nil
 }
 }

+ 3 - 3
daemon/logger/logger_test.go

@@ -6,9 +6,9 @@ import (
 
 
 func (m *Message) copy() *Message {
 func (m *Message) copy() *Message {
 	msg := &Message{
 	msg := &Message{
-		Source:    m.Source,
-		Partial:   m.Partial,
-		Timestamp: m.Timestamp,
+		Source:       m.Source,
+		PLogMetaData: m.PLogMetaData,
+		Timestamp:    m.Timestamp,
 	}
 	}
 
 
 	if m.Attrs != nil {
 	if m.Attrs != nil {