fluentd.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "strconv"
  9. "strings"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/daemon/logger/loggerutils"
  13. "github.com/fluent/fluent-logger-golang/fluent"
  14. )
  15. type fluentd struct {
  16. tag string
  17. containerID string
  18. containerName string
  19. writer *fluent.Fluent
  20. extra map[string]string
  21. }
  22. const (
  23. name = "fluentd"
  24. defaultHostName = "localhost"
  25. defaultPort = 24224
  26. defaultTagPrefix = "docker"
  27. )
  28. func init() {
  29. if err := logger.RegisterLogDriver(name, New); err != nil {
  30. logrus.Fatal(err)
  31. }
  32. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  33. logrus.Fatal(err)
  34. }
  35. }
  36. // New creates a fluentd logger using the configuration passed in on
  37. // the context. Supported context configuration variables are
  38. // fluentd-address & fluentd-tag.
  39. func New(ctx logger.Context) (logger.Logger, error) {
  40. host, port, err := parseAddress(ctx.Config["fluentd-address"])
  41. if err != nil {
  42. return nil, err
  43. }
  44. tag, err := loggerutils.ParseLogTag(ctx, "docker.{{.ID}}")
  45. if err != nil {
  46. return nil, err
  47. }
  48. extra := ctx.ExtraAttributes(nil)
  49. logrus.Debugf("logging driver fluentd configured for container:%s, host:%s, port:%d, tag:%s, extra:%v.", ctx.ContainerID, host, port, tag, extra)
  50. // logger tries to recoonect 2**32 - 1 times
  51. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  52. log, err := fluent.New(fluent.Config{FluentPort: port, FluentHost: host, RetryWait: 1000, MaxRetry: math.MaxInt32})
  53. if err != nil {
  54. return nil, err
  55. }
  56. return &fluentd{
  57. tag: tag,
  58. containerID: ctx.ContainerID,
  59. containerName: ctx.ContainerName,
  60. writer: log,
  61. extra: extra,
  62. }, nil
  63. }
  64. func (f *fluentd) Log(msg *logger.Message) error {
  65. data := map[string]string{
  66. "container_id": f.containerID,
  67. "container_name": f.containerName,
  68. "source": msg.Source,
  69. "log": string(msg.Line),
  70. }
  71. for k, v := range f.extra {
  72. data[k] = v
  73. }
  74. // fluent-logger-golang buffers logs from failures and disconnections,
  75. // and these are transferred again automatically.
  76. return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
  77. }
  78. func (f *fluentd) Close() error {
  79. return f.writer.Close()
  80. }
  81. func (f *fluentd) Name() string {
  82. return name
  83. }
  84. // ValidateLogOpt looks for fluentd specific log options fluentd-address & fluentd-tag.
  85. func ValidateLogOpt(cfg map[string]string) error {
  86. for key := range cfg {
  87. switch key {
  88. case "fluentd-address":
  89. case "fluentd-tag":
  90. case "tag":
  91. case "labels":
  92. case "env":
  93. default:
  94. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  95. }
  96. }
  97. if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil {
  98. return err
  99. }
  100. return nil
  101. }
  102. func parseAddress(address string) (string, int, error) {
  103. if address == "" {
  104. return defaultHostName, defaultPort, nil
  105. }
  106. host, port, err := net.SplitHostPort(address)
  107. if err != nil {
  108. if !strings.Contains(err.Error(), "missing port in address") {
  109. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  110. }
  111. return host, defaultPort, nil
  112. }
  113. portnum, err := strconv.Atoi(port)
  114. if err != nil {
  115. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  116. }
  117. return host, portnum, nil
  118. }