fluentd.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/loggerutils"
  14. "github.com/docker/go-units"
  15. "github.com/fluent/fluent-logger-golang/fluent"
  16. )
  17. type fluentd struct {
  18. tag string
  19. containerID string
  20. containerName string
  21. writer *fluent.Fluent
  22. extra map[string]string
  23. }
  24. const (
  25. name = "fluentd"
  26. defaultHost = "127.0.0.1"
  27. defaultPort = 24224
  28. defaultBufferLimit = 1024 * 1024
  29. // logger tries to reconnect 2**32 - 1 times
  30. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  31. defaultRetryWait = 1000
  32. defaultMaxRetries = math.MaxInt32
  33. addressKey = "fluentd-address"
  34. bufferLimitKey = "fluentd-buffer-limit"
  35. retryWaitKey = "fluentd-retry-wait"
  36. maxRetriesKey = "fluentd-max-retries"
  37. asyncConnectKey = "fluentd-async-connect"
  38. )
  39. func init() {
  40. if err := logger.RegisterLogDriver(name, New); err != nil {
  41. logrus.Fatal(err)
  42. }
  43. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  44. logrus.Fatal(err)
  45. }
  46. }
  47. // New creates a fluentd logger using the configuration passed in on
  48. // the context. The supported context configuration variable is
  49. // fluentd-address.
  50. func New(ctx logger.Context) (logger.Logger, error) {
  51. host, port, err := parseAddress(ctx.Config[addressKey])
  52. if err != nil {
  53. return nil, err
  54. }
  55. tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
  56. if err != nil {
  57. return nil, err
  58. }
  59. extra := ctx.ExtraAttributes(nil)
  60. bufferLimit := defaultBufferLimit
  61. if ctx.Config[bufferLimitKey] != "" {
  62. bl64, err := units.RAMInBytes(ctx.Config[bufferLimitKey])
  63. if err != nil {
  64. return nil, err
  65. }
  66. bufferLimit = int(bl64)
  67. }
  68. retryWait := defaultRetryWait
  69. if ctx.Config[retryWaitKey] != "" {
  70. rwd, err := time.ParseDuration(ctx.Config[retryWaitKey])
  71. if err != nil {
  72. return nil, err
  73. }
  74. retryWait = int(rwd.Seconds() * 1000)
  75. }
  76. maxRetries := defaultMaxRetries
  77. if ctx.Config[maxRetriesKey] != "" {
  78. mr64, err := strconv.ParseUint(ctx.Config[maxRetriesKey], 10, strconv.IntSize)
  79. if err != nil {
  80. return nil, err
  81. }
  82. maxRetries = int(mr64)
  83. }
  84. asyncConnect := false
  85. if ctx.Config[asyncConnectKey] != "" {
  86. if asyncConnect, err = strconv.ParseBool(ctx.Config[asyncConnectKey]); err != nil {
  87. return nil, err
  88. }
  89. }
  90. fluentConfig := fluent.Config{
  91. FluentPort: port,
  92. FluentHost: host,
  93. BufferLimit: bufferLimit,
  94. RetryWait: retryWait,
  95. MaxRetry: maxRetries,
  96. AsyncConnect: asyncConnect,
  97. }
  98. logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
  99. Debug("logging driver fluentd configured")
  100. log, err := fluent.New(fluentConfig)
  101. if err != nil {
  102. return nil, err
  103. }
  104. return &fluentd{
  105. tag: tag,
  106. containerID: ctx.ContainerID,
  107. containerName: ctx.ContainerName,
  108. writer: log,
  109. extra: extra,
  110. }, nil
  111. }
  112. func (f *fluentd) Log(msg *logger.Message) error {
  113. data := map[string]string{
  114. "container_id": f.containerID,
  115. "container_name": f.containerName,
  116. "source": msg.Source,
  117. "log": string(msg.Line),
  118. }
  119. for k, v := range f.extra {
  120. data[k] = v
  121. }
  122. // fluent-logger-golang buffers logs from failures and disconnections,
  123. // and these are transferred again automatically.
  124. return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
  125. }
  126. func (f *fluentd) Close() error {
  127. return f.writer.Close()
  128. }
  129. func (f *fluentd) Name() string {
  130. return name
  131. }
  132. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  133. func ValidateLogOpt(cfg map[string]string) error {
  134. for key := range cfg {
  135. switch key {
  136. case "env":
  137. case "labels":
  138. case "tag":
  139. case addressKey:
  140. case bufferLimitKey:
  141. case retryWaitKey:
  142. case maxRetriesKey:
  143. case asyncConnectKey:
  144. // Accepted
  145. default:
  146. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  147. }
  148. }
  149. if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil {
  150. return err
  151. }
  152. return nil
  153. }
  154. func parseAddress(address string) (string, int, error) {
  155. if address == "" {
  156. return defaultHost, defaultPort, nil
  157. }
  158. host, port, err := net.SplitHostPort(address)
  159. if err != nil {
  160. if !strings.Contains(err.Error(), "missing port in address") {
  161. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  162. }
  163. return host, defaultPort, nil
  164. }
  165. portnum, err := strconv.Atoi(port)
  166. if err != nil {
  167. return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
  168. }
  169. return host, portnum, nil
  170. }