123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- // Package gelf provides the log driver for forwarding server logs to
- // endpoints that support the Graylog Extended Log Format.
- package gelf // import "github.com/docker/docker/daemon/logger/gelf"
- import (
- "compress/flate"
- "encoding/json"
- "fmt"
- "net"
- "net/url"
- "strconv"
- "time"
- "github.com/Graylog2/go-gelf/gelf"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/loggerutils"
- )
- const name = "gelf"
- type gelfLogger struct {
- writer gelf.Writer
- info logger.Info
- hostname string
- rawExtra json.RawMessage
- }
- func init() {
- if err := logger.RegisterLogDriver(name, New); err != nil {
- panic(err)
- }
- if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
- panic(err)
- }
- }
- // New creates a gelf logger using the configuration passed in on the
- // context. The supported context configuration variable is gelf-address.
- func New(info logger.Info) (logger.Logger, error) {
- // parse gelf address
- address, err := parseAddress(info.Config["gelf-address"])
- if err != nil {
- return nil, err
- }
- // collect extra data for GELF message
- hostname, err := info.Hostname()
- if err != nil {
- return nil, fmt.Errorf("gelf: cannot access hostname to set source field")
- }
- // parse log tag
- tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
- if err != nil {
- return nil, err
- }
- extra := map[string]interface{}{
- "_container_id": info.ContainerID,
- "_container_name": info.Name(),
- "_image_id": info.ContainerImageID,
- "_image_name": info.ContainerImageName,
- "_command": info.Command(),
- "_tag": tag,
- "_created": info.ContainerCreated,
- }
- extraAttrs, err := info.ExtraAttributes(func(key string) string {
- if key[0] == '_' {
- return key
- }
- return "_" + key
- })
- if err != nil {
- return nil, err
- }
- for k, v := range extraAttrs {
- extra[k] = v
- }
- rawExtra, err := json.Marshal(extra)
- if err != nil {
- return nil, err
- }
- var gelfWriter gelf.Writer
- if address.Scheme == "udp" {
- gelfWriter, err = newGELFUDPWriter(address.Host, info)
- if err != nil {
- return nil, err
- }
- } else if address.Scheme == "tcp" {
- gelfWriter, err = newGELFTCPWriter(address.Host, info)
- if err != nil {
- return nil, err
- }
- }
- return &gelfLogger{
- writer: gelfWriter,
- info: info,
- hostname: hostname,
- rawExtra: rawExtra,
- }, nil
- }
- // create new TCP gelfWriter
- func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
- gelfWriter, err := gelf.NewTCPWriter(address)
- if err != nil {
- return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
- }
- if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
- i, err := strconv.Atoi(v)
- if err != nil || i < 0 {
- return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
- }
- gelfWriter.MaxReconnect = i
- }
- if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
- i, err := strconv.Atoi(v)
- if err != nil || i < 0 {
- return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
- }
- gelfWriter.ReconnectDelay = time.Duration(i)
- }
- return gelfWriter, nil
- }
- // create new UDP gelfWriter
- func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
- gelfWriter, err := gelf.NewUDPWriter(address)
- if err != nil {
- return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
- }
- if v, ok := info.Config["gelf-compression-type"]; ok {
- switch v {
- case "gzip":
- gelfWriter.CompressionType = gelf.CompressGzip
- case "zlib":
- gelfWriter.CompressionType = gelf.CompressZlib
- case "none":
- gelfWriter.CompressionType = gelf.CompressNone
- default:
- return nil, fmt.Errorf("gelf: invalid compression type %q", v)
- }
- }
- if v, ok := info.Config["gelf-compression-level"]; ok {
- val, err := strconv.Atoi(v)
- if err != nil {
- return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err)
- }
- gelfWriter.CompressionLevel = val
- }
- return gelfWriter, nil
- }
- func (s *gelfLogger) Log(msg *logger.Message) error {
- if len(msg.Line) == 0 {
- return nil
- }
- level := gelf.LOG_INFO
- if msg.Source == "stderr" {
- level = gelf.LOG_ERR
- }
- m := gelf.Message{
- Version: "1.1",
- Host: s.hostname,
- Short: string(msg.Line),
- TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
- Level: int32(level),
- RawExtra: s.rawExtra,
- }
- logger.PutMessage(msg)
- if err := s.writer.WriteMessage(&m); err != nil {
- return fmt.Errorf("gelf: cannot send GELF message: %v", err)
- }
- return nil
- }
- func (s *gelfLogger) Close() error {
- return s.writer.Close()
- }
- func (s *gelfLogger) Name() string {
- return name
- }
- // ValidateLogOpt looks for gelf specific log option gelf-address.
- func ValidateLogOpt(cfg map[string]string) error {
- address, err := parseAddress(cfg["gelf-address"])
- if err != nil {
- return err
- }
- for key, val := range cfg {
- switch key {
- case "gelf-address":
- case "tag":
- case "labels":
- case "labels-regex":
- case "env":
- case "env-regex":
- case "gelf-compression-level":
- if address.Scheme != "udp" {
- return fmt.Errorf("compression is only supported on UDP")
- }
- i, err := strconv.Atoi(val)
- if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
- return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
- }
- case "gelf-compression-type":
- if address.Scheme != "udp" {
- return fmt.Errorf("compression is only supported on UDP")
- }
- switch val {
- case "gzip", "zlib", "none":
- default:
- return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
- }
- case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay":
- if address.Scheme != "tcp" {
- return fmt.Errorf("%q is only valid for TCP", key)
- }
- i, err := strconv.Atoi(val)
- if err != nil || i < 0 {
- return fmt.Errorf("%q must be a positive integer", key)
- }
- default:
- return fmt.Errorf("unknown log opt %q for gelf log driver", key)
- }
- }
- return nil
- }
- func parseAddress(address string) (*url.URL, error) {
- if address == "" {
- return nil, fmt.Errorf("gelf-address is a required parameter")
- }
- addr, err := url.Parse(address)
- if err != nil {
- return nil, err
- }
- if addr.Scheme != "udp" && addr.Scheme != "tcp" {
- return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP")
- }
- if _, _, err = net.SplitHostPort(addr.Host); err != nil {
- return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port")
- }
- return addr, nil
- }
|