fluentd.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/loggerutils"
  14. "github.com/docker/go-units"
  15. "github.com/fluent/fluent-logger-golang/fluent"
  16. )
  17. type fluentd struct {
  18. tag string
  19. containerID string
  20. containerName string
  21. writer *fluent.Fluent
  22. extra map[string]string
  23. }
  24. const (
  25. name = "fluentd"
  26. defaultHost = "127.0.0.1"
  27. defaultPort = 24224
  28. defaultBufferLimit = 1024 * 1024
  29. defaultTagPrefix = "docker"
  30. // logger tries to reconnect 2**32 - 1 times
  31. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  32. defaultRetryWait = 1000
  33. defaultTimeout = 3 * time.Second
  34. defaultMaxRetries = math.MaxInt32
  35. defaultReconnectWaitIncreRate = 1.5
  36. addressKey = "fluentd-address"
  37. bufferLimitKey = "fluentd-buffer-limit"
  38. retryWaitKey = "fluentd-retry-wait"
  39. maxRetriesKey = "fluentd-max-retries"
  40. asyncConnectKey = "fluentd-async-connect"
  41. )
  42. func init() {
  43. if err := logger.RegisterLogDriver(name, New); err != nil {
  44. logrus.Fatal(err)
  45. }
  46. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  47. logrus.Fatal(err)
  48. }
  49. }
  50. // New creates a fluentd logger using the configuration passed in on
  51. // the context. The supported context configuration variable is
  52. // fluentd-address.
  53. func New(ctx logger.Context) (logger.Logger, error) {
  54. host, port, err := parseAddress(ctx.Config[addressKey])
  55. if err != nil {
  56. return nil, err
  57. }
  58. tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
  59. if err != nil {
  60. return nil, err
  61. }
  62. extra := ctx.ExtraAttributes(nil)
  63. bufferLimit := defaultBufferLimit
  64. if ctx.Config[bufferLimitKey] != "" {
  65. bl64, err := units.RAMInBytes(ctx.Config[bufferLimitKey])
  66. if err != nil {
  67. return nil, err
  68. }
  69. bufferLimit = int(bl64)
  70. }
  71. retryWait := defaultRetryWait
  72. if ctx.Config[retryWaitKey] != "" {
  73. rwd, err := time.ParseDuration(ctx.Config[retryWaitKey])
  74. if err != nil {
  75. return nil, err
  76. }
  77. retryWait = int(rwd.Seconds() * 1000)
  78. }
  79. maxRetries := defaultMaxRetries
  80. if ctx.Config[maxRetriesKey] != "" {
  81. mr64, err := strconv.ParseUint(ctx.Config[maxRetriesKey], 10, strconv.IntSize)
  82. if err != nil {
  83. return nil, err
  84. }
  85. maxRetries = int(mr64)
  86. }
  87. asyncConnect := false
  88. if ctx.Config[asyncConnectKey] != "" {
  89. if asyncConnect, err = strconv.ParseBool(ctx.Config[asyncConnectKey]); err != nil {
  90. return nil, err
  91. }
  92. }
  93. fluentConfig := fluent.Config{
  94. FluentPort: port,
  95. FluentHost: host,
  96. BufferLimit: bufferLimit,
  97. RetryWait: retryWait,
  98. MaxRetry: maxRetries,
  99. AsyncConnect: asyncConnect,
  100. }
  101. logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
  102. Debug("logging driver fluentd configured")
  103. log, err := fluent.New(fluentConfig)
  104. if err != nil {
  105. return nil, err
  106. }
  107. return &fluentd{
  108. tag: tag,
  109. containerID: ctx.ContainerID,
  110. containerName: ctx.ContainerName,
  111. writer: log,
  112. extra: extra,
  113. }, nil
  114. }
  115. func (f *fluentd) Log(msg *logger.Message) error {
  116. data := map[string]string{
  117. "container_id": f.containerID,
  118. "container_name": f.containerName,
  119. "source": msg.Source,
  120. "log": string(msg.Line),
  121. }
  122. for k, v := range f.extra {
  123. data[k] = v
  124. }
  125. // fluent-logger-golang buffers logs from failures and disconnections,
  126. // and these are transferred again automatically.
  127. return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
  128. }
  129. func (f *fluentd) Close() error {
  130. return f.writer.Close()
  131. }
  132. func (f *fluentd) Name() string {
  133. return name
  134. }
  135. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  136. func ValidateLogOpt(cfg map[string]string) error {
  137. for key := range cfg {
  138. switch key {
  139. case "env":
  140. case "labels":
  141. case "tag":
  142. case addressKey:
  143. case bufferLimitKey:
  144. case retryWaitKey:
  145. case maxRetriesKey:
  146. case asyncConnectKey:
  147. // Accepted
  148. default:
  149. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  150. }
  151. }
  152. if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil {
  153. return err
  154. }
  155. return nil
  156. }
  157. func parseAddress(address string) (string, int, error) {
  158. if address == "" {
  159. return defaultHost, defaultPort, nil
  160. }
  161. host, port, err := net.SplitHostPort(address)
  162. if err != nil {
  163. if !strings.Contains(err.Error(), "missing port in address") {
  164. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  165. }
  166. return host, defaultPort, nil
  167. }
  168. portnum, err := strconv.Atoi(port)
  169. if err != nil {
  170. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  171. }
  172. return host, portnum, nil
  173. }