fluentd.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/loggerutils"
  14. "github.com/docker/docker/pkg/urlutil"
  15. "github.com/docker/go-units"
  16. "github.com/fluent/fluent-logger-golang/fluent"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultProtocol = "tcp"
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultBufferLimit = 1024 * 1024
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultRetryWait = 1000
  42. defaultMaxRetries = math.MaxInt32
  43. addressKey = "fluentd-address"
  44. bufferLimitKey = "fluentd-buffer-limit"
  45. retryWaitKey = "fluentd-retry-wait"
  46. maxRetriesKey = "fluentd-max-retries"
  47. asyncConnectKey = "fluentd-async-connect"
  48. subSecondPrecisionKey = "fluentd-sub-second-precision"
  49. )
  50. func init() {
  51. if err := logger.RegisterLogDriver(name, New); err != nil {
  52. logrus.Fatal(err)
  53. }
  54. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  55. logrus.Fatal(err)
  56. }
  57. }
  58. // New creates a fluentd logger using the configuration passed in on
  59. // the context. The supported context configuration variable is
  60. // fluentd-address.
  61. func New(info logger.Info) (logger.Logger, error) {
  62. loc, err := parseAddress(info.Config[addressKey])
  63. if err != nil {
  64. return nil, err
  65. }
  66. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  67. if err != nil {
  68. return nil, err
  69. }
  70. extra, err := info.ExtraAttributes(nil)
  71. if err != nil {
  72. return nil, err
  73. }
  74. bufferLimit := defaultBufferLimit
  75. if info.Config[bufferLimitKey] != "" {
  76. bl64, err := units.RAMInBytes(info.Config[bufferLimitKey])
  77. if err != nil {
  78. return nil, err
  79. }
  80. bufferLimit = int(bl64)
  81. }
  82. retryWait := defaultRetryWait
  83. if info.Config[retryWaitKey] != "" {
  84. rwd, err := time.ParseDuration(info.Config[retryWaitKey])
  85. if err != nil {
  86. return nil, err
  87. }
  88. retryWait = int(rwd.Seconds() * 1000)
  89. }
  90. maxRetries := defaultMaxRetries
  91. if info.Config[maxRetriesKey] != "" {
  92. mr64, err := strconv.ParseUint(info.Config[maxRetriesKey], 10, strconv.IntSize)
  93. if err != nil {
  94. return nil, err
  95. }
  96. maxRetries = int(mr64)
  97. }
  98. asyncConnect := false
  99. if info.Config[asyncConnectKey] != "" {
  100. if asyncConnect, err = strconv.ParseBool(info.Config[asyncConnectKey]); err != nil {
  101. return nil, err
  102. }
  103. }
  104. subSecondPrecision := false
  105. if info.Config[subSecondPrecisionKey] != "" {
  106. if subSecondPrecision, err = strconv.ParseBool(info.Config[subSecondPrecisionKey]); err != nil {
  107. return nil, err
  108. }
  109. }
  110. fluentConfig := fluent.Config{
  111. FluentPort: loc.port,
  112. FluentHost: loc.host,
  113. FluentNetwork: loc.protocol,
  114. FluentSocketPath: loc.path,
  115. BufferLimit: bufferLimit,
  116. RetryWait: retryWait,
  117. MaxRetry: maxRetries,
  118. AsyncConnect: asyncConnect,
  119. SubSecondPrecision: subSecondPrecision,
  120. }
  121. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  122. Debug("logging driver fluentd configured")
  123. log, err := fluent.New(fluentConfig)
  124. if err != nil {
  125. return nil, err
  126. }
  127. return &fluentd{
  128. tag: tag,
  129. containerID: info.ContainerID,
  130. containerName: info.ContainerName,
  131. writer: log,
  132. extra: extra,
  133. }, nil
  134. }
  135. func (f *fluentd) Log(msg *logger.Message) error {
  136. data := map[string]string{
  137. "container_id": f.containerID,
  138. "container_name": f.containerName,
  139. "source": msg.Source,
  140. "log": string(msg.Line),
  141. }
  142. for k, v := range f.extra {
  143. data[k] = v
  144. }
  145. ts := msg.Timestamp
  146. logger.PutMessage(msg)
  147. // fluent-logger-golang buffers logs from failures and disconnections,
  148. // and these are transferred again automatically.
  149. return f.writer.PostWithTime(f.tag, ts, data)
  150. }
  151. func (f *fluentd) Close() error {
  152. return f.writer.Close()
  153. }
  154. func (f *fluentd) Name() string {
  155. return name
  156. }
  157. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  158. func ValidateLogOpt(cfg map[string]string) error {
  159. for key := range cfg {
  160. switch key {
  161. case "env":
  162. case "env-regex":
  163. case "labels":
  164. case "tag":
  165. case addressKey:
  166. case bufferLimitKey:
  167. case retryWaitKey:
  168. case maxRetriesKey:
  169. case asyncConnectKey:
  170. case subSecondPrecisionKey:
  171. // Accepted
  172. default:
  173. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  174. }
  175. }
  176. _, err := parseAddress(cfg[addressKey])
  177. return err
  178. }
  179. func parseAddress(address string) (*location, error) {
  180. if address == "" {
  181. return &location{
  182. protocol: defaultProtocol,
  183. host: defaultHost,
  184. port: defaultPort,
  185. path: "",
  186. }, nil
  187. }
  188. protocol := defaultProtocol
  189. givenAddress := address
  190. if urlutil.IsTransportURL(address) {
  191. url, err := url.Parse(address)
  192. if err != nil {
  193. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  194. }
  195. // unix and unixgram socket
  196. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  197. return &location{
  198. protocol: url.Scheme,
  199. host: "",
  200. port: 0,
  201. path: url.Path,
  202. }, nil
  203. }
  204. // tcp|udp
  205. protocol = url.Scheme
  206. address = url.Host
  207. }
  208. host, port, err := net.SplitHostPort(address)
  209. if err != nil {
  210. if !strings.Contains(err.Error(), "missing port in address") {
  211. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  212. }
  213. return &location{
  214. protocol: protocol,
  215. host: host,
  216. port: defaultPort,
  217. path: "",
  218. }, nil
  219. }
  220. portnum, err := strconv.Atoi(port)
  221. if err != nil {
  222. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  223. }
  224. return &location{
  225. protocol: protocol,
  226. host: host,
  227. port: portnum,
  228. path: "",
  229. }, nil
  230. }