fluentd.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/logger"
  14. "github.com/docker/docker/daemon/logger/loggerutils"
  15. "github.com/docker/docker/pkg/urlutil"
  16. "github.com/docker/go-units"
  17. "github.com/fluent/fluent-logger-golang/fluent"
  18. "github.com/pkg/errors"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultProtocol = "tcp"
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultBufferLimit = 1024 * 1024
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultRetryWait = 1000
  42. defaultMaxRetries = math.MaxInt32
  43. addressKey = "fluentd-address"
  44. bufferLimitKey = "fluentd-buffer-limit"
  45. retryWaitKey = "fluentd-retry-wait"
  46. maxRetriesKey = "fluentd-max-retries"
  47. asyncConnectKey = "fluentd-async-connect"
  48. )
  49. func init() {
  50. if err := logger.RegisterLogDriver(name, New); err != nil {
  51. logrus.Fatal(err)
  52. }
  53. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  54. logrus.Fatal(err)
  55. }
  56. }
  57. // New creates a fluentd logger using the configuration passed in on
  58. // the context. The supported context configuration variable is
  59. // fluentd-address.
  60. func New(ctx logger.Context) (logger.Logger, error) {
  61. loc, err := parseAddress(ctx.Config[addressKey])
  62. if err != nil {
  63. return nil, err
  64. }
  65. tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
  66. if err != nil {
  67. return nil, err
  68. }
  69. extra := ctx.ExtraAttributes(nil)
  70. bufferLimit := defaultBufferLimit
  71. if ctx.Config[bufferLimitKey] != "" {
  72. bl64, err := units.RAMInBytes(ctx.Config[bufferLimitKey])
  73. if err != nil {
  74. return nil, err
  75. }
  76. bufferLimit = int(bl64)
  77. }
  78. retryWait := defaultRetryWait
  79. if ctx.Config[retryWaitKey] != "" {
  80. rwd, err := time.ParseDuration(ctx.Config[retryWaitKey])
  81. if err != nil {
  82. return nil, err
  83. }
  84. retryWait = int(rwd.Seconds() * 1000)
  85. }
  86. maxRetries := defaultMaxRetries
  87. if ctx.Config[maxRetriesKey] != "" {
  88. mr64, err := strconv.ParseUint(ctx.Config[maxRetriesKey], 10, strconv.IntSize)
  89. if err != nil {
  90. return nil, err
  91. }
  92. maxRetries = int(mr64)
  93. }
  94. asyncConnect := false
  95. if ctx.Config[asyncConnectKey] != "" {
  96. if asyncConnect, err = strconv.ParseBool(ctx.Config[asyncConnectKey]); err != nil {
  97. return nil, err
  98. }
  99. }
  100. fluentConfig := fluent.Config{
  101. FluentPort: loc.port,
  102. FluentHost: loc.host,
  103. FluentNetwork: loc.protocol,
  104. FluentSocketPath: loc.path,
  105. BufferLimit: bufferLimit,
  106. RetryWait: retryWait,
  107. MaxRetry: maxRetries,
  108. AsyncConnect: asyncConnect,
  109. }
  110. logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
  111. Debug("logging driver fluentd configured")
  112. log, err := fluent.New(fluentConfig)
  113. if err != nil {
  114. return nil, err
  115. }
  116. return &fluentd{
  117. tag: tag,
  118. containerID: ctx.ContainerID,
  119. containerName: ctx.ContainerName,
  120. writer: log,
  121. extra: extra,
  122. }, nil
  123. }
  124. func (f *fluentd) Log(msg *logger.Message) error {
  125. data := map[string]string{
  126. "container_id": f.containerID,
  127. "container_name": f.containerName,
  128. "source": msg.Source,
  129. "log": string(msg.Line),
  130. }
  131. for k, v := range f.extra {
  132. data[k] = v
  133. }
  134. // fluent-logger-golang buffers logs from failures and disconnections,
  135. // and these are transferred again automatically.
  136. return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
  137. }
  138. func (f *fluentd) Close() error {
  139. return f.writer.Close()
  140. }
  141. func (f *fluentd) Name() string {
  142. return name
  143. }
  144. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  145. func ValidateLogOpt(cfg map[string]string) error {
  146. for key := range cfg {
  147. switch key {
  148. case "env":
  149. case "labels":
  150. case "tag":
  151. case addressKey:
  152. case bufferLimitKey:
  153. case retryWaitKey:
  154. case maxRetriesKey:
  155. case asyncConnectKey:
  156. // Accepted
  157. default:
  158. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  159. }
  160. }
  161. if _, err := parseAddress(cfg["fluentd-address"]); err != nil {
  162. return err
  163. }
  164. return nil
  165. }
  166. func parseAddress(address string) (*location, error) {
  167. if address == "" {
  168. return &location{
  169. protocol: defaultProtocol,
  170. host: defaultHost,
  171. port: defaultPort,
  172. path: "",
  173. }, nil
  174. }
  175. protocol := defaultProtocol
  176. givenAddress := address
  177. if urlutil.IsTransportURL(address) {
  178. url, err := url.Parse(address)
  179. if err != nil {
  180. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  181. }
  182. // unix and unixgram socket
  183. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  184. return &location{
  185. protocol: url.Scheme,
  186. host: "",
  187. port: 0,
  188. path: url.Path,
  189. }, nil
  190. }
  191. // tcp|udp
  192. protocol = url.Scheme
  193. address = url.Host
  194. }
  195. host, port, err := net.SplitHostPort(address)
  196. if err != nil {
  197. if !strings.Contains(err.Error(), "missing port in address") {
  198. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  199. }
  200. return &location{
  201. protocol: protocol,
  202. host: host,
  203. port: defaultPort,
  204. path: "",
  205. }, nil
  206. }
  207. portnum, err := strconv.Atoi(port)
  208. if err != nil {
  209. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  210. }
  211. return &location{
  212. protocol: protocol,
  213. host: host,
  214. port: portnum,
  215. path: "",
  216. }, nil
  217. }