Explorar o código

Merge pull request #19831 from cloudflare/optimize-gelf

GELF logger: Add gelf-compression-type and gelf-compression-level
Antonio Murdaca %!s(int64=9) %!d(string=hai) anos
pai
achega
0f59b0b12c

+ 45 - 5
daemon/logger/gelf/gelf.go

@@ -6,9 +6,12 @@ package gelf
 
 import (
 	"bytes"
+	"compress/flate"
+	"encoding/json"
 	"fmt"
 	"net"
 	"net/url"
+	"strconv"
 	"time"
 
 	"github.com/Graylog2/go-gelf/gelf"
@@ -24,7 +27,7 @@ type gelfLogger struct {
 	writer   *gelf.Writer
 	ctx      logger.Context
 	hostname string
-	extra    map[string]interface{}
+	rawExtra json.RawMessage
 }
 
 func init() {
@@ -81,17 +84,43 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		extra[k] = v
 	}
 
+	rawExtra, err := json.Marshal(extra)
+	if err != nil {
+		return nil, err
+	}
+
 	// create new gelfWriter
 	gelfWriter, err := gelf.NewWriter(address)
 	if err != nil {
 		return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
 	}
 
+	if v, ok := ctx.Config["gelf-compression-type"]; ok {
+		switch v {
+		case "gzip":
+			gelfWriter.CompressionType = gelf.CompressGzip
+		case "zlib":
+			gelfWriter.CompressionType = gelf.CompressZlib
+		case "none":
+			gelfWriter.CompressionType = gelf.CompressNone
+		default:
+			return nil, fmt.Errorf("gelf: invalid compression type %q", v)
+		}
+	}
+
+	if v, ok := ctx.Config["gelf-compression-level"]; ok {
+		val, err := strconv.Atoi(v)
+		if err != nil {
+			return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err)
+		}
+		gelfWriter.CompressionLevel = val
+	}
+
 	return &gelfLogger{
 		writer:   gelfWriter,
 		ctx:      ctx,
 		hostname: hostname,
-		extra:    extra,
+		rawExtra: rawExtra,
 	}, nil
 }
 
@@ -107,7 +136,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
 		Short:    string(msg.Line),
 		TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
 		Level:    level,
-		Extra:    s.extra,
+		RawExtra: s.rawExtra,
 	}
 
 	if err := s.writer.WriteMessage(&m); err != nil {
@@ -127,15 +156,26 @@ func (s *gelfLogger) Name() string {
 // ValidateLogOpt looks for gelf specific log options gelf-address, &
 // gelf-tag.
 func ValidateLogOpt(cfg map[string]string) error {
-	for key := range cfg {
+	for key, val := range cfg {
 		switch key {
 		case "gelf-address":
 		case "gelf-tag":
 		case "tag":
 		case "labels":
 		case "env":
+		case "gelf-compression-level":
+			i, err := strconv.Atoi(val)
+			if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
+				return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
+			}
+		case "gelf-compression-type":
+			switch val {
+			case "gzip", "zlib", "none":
+			default:
+				return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
+			}
 		default:
-			return fmt.Errorf("unknown log opt '%s' for gelf log driver", key)
+			return fmt.Errorf("unknown log opt %q for gelf log driver", key)
 		}
 	}
 

+ 10 - 0
docs/admin/logging/overview.md

@@ -152,6 +152,8 @@ The GELF logging driver supports the following options:
     --log-opt tag="database"
     --log-opt labels=label1,label2
     --log-opt env=env1,env2
+    --log-opt gelf-compression-type=gzip
+    --log-opt gelf-compression-level=1
 
 The `gelf-address` option specifies the remote GELF server address that the
 driver connects to. Currently, only `udp` is supported as the transport and you must
@@ -173,6 +175,14 @@ underscore (`_`).
     "_fizz": "buzz",
     // […]
 
+The `gelf-compression-type` option can be used to change how the GELF driver
+compresses each log message. The accepted values are `gzip`, `zlib` and `none`.
+`gzip` is chosen by default.
+
+The `gelf-compression-level` option can be used to change the level of compresssion
+when `gzip` or `zlib` is selected as `gelf-compression-type`. Accepted value
+must be from from -1 to 9 (BestCompression). Higher levels typically
+run slower but compress more. Default value is 1 (BestSpeed).
 
 ## fluentd options
 

+ 1 - 1
hack/vendor.sh

@@ -68,7 +68,7 @@ clone git github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd85
 clone git github.com/golang/protobuf f7137ae6b19afbfd61a94b746fda3b3fe0491874
 
 # gelf logging driver deps
-clone git github.com/Graylog2/go-gelf 6c62a85f1d47a67f2a5144c0e745b325889a8120
+clone git github.com/Graylog2/go-gelf aab2f594e4585d43468ac57287b0dece9d806883
 
 clone git github.com/fluent/fluent-logger-golang v1.0.0
 # fluent-logger-golang deps

+ 5 - 7
vendor/src/github.com/Graylog2/go-gelf/gelf/reader.go

@@ -66,7 +66,6 @@ func (r *Reader) ReadMessage() (*Message, error) {
 	var (
 		err        error
 		n, length  int
-		buf        bytes.Buffer
 		cid, ocid  []byte
 		seq, total uint8
 		cHead      []byte
@@ -122,19 +121,18 @@ func (r *Reader) ReadMessage() (*Message, error) {
 		// zlib is slightly more complicated, but correct
 		cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
 	} else {
-		return nil, fmt.Errorf("unknown magic: %x %v", cHead, cHead)
+		// compliance with https://github.com/Graylog2/graylog2-server
+		// treating all messages as uncompressed if  they are not gzip, zlib or
+		// chunked
+		cReader = bytes.NewReader(cBuf)
 	}
 
 	if err != nil {
 		return nil, fmt.Errorf("NewReader: %s", err)
 	}
 
-	if _, err = io.Copy(&buf, cReader); err != nil {
-		return nil, fmt.Errorf("io.Copy: %s", err)
-	}
-
 	msg := new(Message)
-	if err := json.Unmarshal(buf.Bytes(), &msg); err != nil {
+	if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
 		return nil, fmt.Errorf("json.Unmarshal: %s", err)
 	}
 

+ 90 - 45
vendor/src/github.com/Graylog2/go-gelf/gelf/writer.go

@@ -41,6 +41,7 @@ type CompressType int
 const (
 	CompressGzip CompressType = iota
 	CompressZlib
+	CompressNone
 )
 
 // Message represents the contents of the GELF message.  It is gzipped
@@ -49,15 +50,14 @@ type Message struct {
 	Version  string                 `json:"version"`
 	Host     string                 `json:"host"`
 	Short    string                 `json:"short_message"`
-	Full     string                 `json:"full_message"`
+	Full     string                 `json:"full_message,omitempty"`
 	TimeUnix float64                `json:"timestamp"`
-	Level    int32                  `json:"level"`
-	Facility string                 `json:"facility"`
+	Level    int32                  `json:"level,omitempty"`
+	Facility string                 `json:"facility,omitempty"`
 	Extra    map[string]interface{} `json:"-"`
+	RawExtra json.RawMessage        `json:"-"`
 }
 
-type innerMessage Message //against circular (Un)MarshalJSON
-
 // Used to control GELF chunking.  Should be less than (MTU - len(UDP
 // header)).
 //
@@ -76,14 +76,14 @@ var (
 
 // Syslog severity levels
 const (
-  LOG_EMERG   = int32(0)
-  LOG_ALERT   = int32(1)
-  LOG_CRIT    = int32(2)
-  LOG_ERR     = int32(3)
-  LOG_WARNING = int32(4)
-  LOG_NOTICE  = int32(5)
-  LOG_INFO    = int32(6)
-  LOG_DEBUG   = int32(7)
+	LOG_EMERG   = int32(0)
+	LOG_ALERT   = int32(1)
+	LOG_CRIT    = int32(2)
+	LOG_ERR     = int32(3)
+	LOG_WARNING = int32(4)
+	LOG_NOTICE  = int32(5)
+	LOG_INFO    = int32(6)
+	LOG_DEBUG   = int32(7)
 )
 
 // numChunks returns the number of GELF chunks necessary to transmit
@@ -176,40 +176,70 @@ func (w *Writer) writeChunked(zBytes []byte) (err error) {
 	return nil
 }
 
+// 1k bytes buffer by default
+var bufPool = sync.Pool{
+	New: func() interface{} {
+		return bytes.NewBuffer(make([]byte, 0, 1024))
+	},
+}
+
+func newBuffer() *bytes.Buffer {
+	b := bufPool.Get().(*bytes.Buffer)
+	if b != nil {
+		b.Reset()
+		return b
+	}
+	return bytes.NewBuffer(nil)
+}
+
 // WriteMessage sends the specified message to the GELF server
 // specified in the call to New().  It assumes all the fields are
 // filled out appropriately.  In general, clients will want to use
 // Write, rather than WriteMessage.
 func (w *Writer) WriteMessage(m *Message) (err error) {
-	mBytes, err := json.Marshal(m)
-	if err != nil {
-		return
+	mBuf := newBuffer()
+	defer bufPool.Put(mBuf)
+	if err = m.MarshalJSONBuf(mBuf); err != nil {
+		return err
 	}
+	mBytes := mBuf.Bytes()
+
+	var (
+		zBuf   *bytes.Buffer
+		zBytes []byte
+	)
 
-	var zBuf bytes.Buffer
 	var zw io.WriteCloser
 	switch w.CompressionType {
 	case CompressGzip:
-		zw, err = gzip.NewWriterLevel(&zBuf, w.CompressionLevel)
+		zBuf = newBuffer()
+		defer bufPool.Put(zBuf)
+		zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
 	case CompressZlib:
-		zw, err = zlib.NewWriterLevel(&zBuf, w.CompressionLevel)
+		zBuf = newBuffer()
+		defer bufPool.Put(zBuf)
+		zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
+	case CompressNone:
+		zBytes = mBytes
 	default:
 		panic(fmt.Sprintf("unknown compression type %d",
 			w.CompressionType))
 	}
-	if err != nil {
-		return
-	}
-	if _, err = zw.Write(mBytes); err != nil {
-		return
+	if zw != nil {
+		if err != nil {
+			return
+		}
+		if _, err = zw.Write(mBytes); err != nil {
+			zw.Close()
+			return
+		}
+		zw.Close()
+		zBytes = zBuf.Bytes()
 	}
-	zw.Close()
 
-	zBytes := zBuf.Bytes()
 	if numChunks(zBytes) > 1 {
 		return w.writeChunked(zBytes)
 	}
-
 	n, err := w.conn.Write(zBytes)
 	if err != nil {
 		return
@@ -222,8 +252,8 @@ func (w *Writer) WriteMessage(m *Message) (err error) {
 }
 
 // Close connection and interrupt blocked Read or Write operations
-func (w *Writer) Close() (error) {
-  return w.conn.Close()
+func (w *Writer) Close() error {
+	return w.conn.Close()
 }
 
 /*
@@ -315,28 +345,43 @@ func (w *Writer) Write(p []byte) (n int, err error) {
 	return len(p), nil
 }
 
-func (m *Message) MarshalJSON() ([]byte, error) {
-	var err error
-	var b, eb []byte
-
-	extra := m.Extra
-	b, err = json.Marshal((*innerMessage)(m))
-	m.Extra = extra
+func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
+	b, err := json.Marshal(m)
 	if err != nil {
-		return nil, err
+		return err
 	}
-
-	if len(extra) == 0 {
-		return b, nil
+	// write up until the final }
+	if _, err = buf.Write(b[:len(b)-1]); err != nil {
+		return err
+	}
+	if len(m.Extra) > 0 {
+		eb, err := json.Marshal(m.Extra)
+		if err != nil {
+			return err
+		}
+		// merge serialized message + serialized extra map
+		if err = buf.WriteByte(','); err != nil {
+			return err
+		}
+		// write serialized extra bytes, without enclosing quotes
+		if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
+			return err
+		}
 	}
 
-	if eb, err = json.Marshal(extra); err != nil {
-		return nil, err
+	if len(m.RawExtra) > 0 {
+		if err := buf.WriteByte(','); err != nil {
+			return err
+		}
+
+		// write serialized extra bytes, without enclosing quotes
+		if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
+			return err
+		}
 	}
 
-	// merge serialized message + serialized extra map
-	b[len(b)-1] = ','
-	return append(b, eb[1:len(eb)]...), nil
+	// write final closing quotes
+	return buf.WriteByte('}')
 }
 
 func (m *Message) UnmarshalJSON(data []byte) error {