fluentd.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/loggerutils"
  14. "github.com/docker/docker/pkg/urlutil"
  15. "github.com/docker/go-units"
  16. "github.com/fluent/fluent-logger-golang/fluent"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultProtocol = "tcp"
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultBufferLimit = 1024 * 1024
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultRetryWait = 1000
  42. defaultMaxRetries = math.MaxInt32
  43. addressKey = "fluentd-address"
  44. bufferLimitKey = "fluentd-buffer-limit"
  45. retryWaitKey = "fluentd-retry-wait"
  46. maxRetriesKey = "fluentd-max-retries"
  47. asyncConnectKey = "fluentd-async-connect"
  48. subSecondPrecisionKey = "fluentd-sub-second-precision"
  49. )
  50. func init() {
  51. if err := logger.RegisterLogDriver(name, New); err != nil {
  52. logrus.Fatal(err)
  53. }
  54. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  55. logrus.Fatal(err)
  56. }
  57. }
  58. // New creates a fluentd logger using the configuration passed in on
  59. // the context. The supported context configuration variable is
  60. // fluentd-address.
  61. func New(info logger.Info) (logger.Logger, error) {
  62. loc, err := parseAddress(info.Config[addressKey])
  63. if err != nil {
  64. return nil, err
  65. }
  66. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  67. if err != nil {
  68. return nil, err
  69. }
  70. extra, err := info.ExtraAttributes(nil)
  71. if err != nil {
  72. return nil, err
  73. }
  74. bufferLimit := defaultBufferLimit
  75. if info.Config[bufferLimitKey] != "" {
  76. bl64, err := units.RAMInBytes(info.Config[bufferLimitKey])
  77. if err != nil {
  78. return nil, err
  79. }
  80. bufferLimit = int(bl64)
  81. }
  82. retryWait := defaultRetryWait
  83. if info.Config[retryWaitKey] != "" {
  84. rwd, err := time.ParseDuration(info.Config[retryWaitKey])
  85. if err != nil {
  86. return nil, err
  87. }
  88. retryWait = int(rwd.Seconds() * 1000)
  89. }
  90. maxRetries := defaultMaxRetries
  91. if info.Config[maxRetriesKey] != "" {
  92. mr64, err := strconv.ParseUint(info.Config[maxRetriesKey], 10, strconv.IntSize)
  93. if err != nil {
  94. return nil, err
  95. }
  96. maxRetries = int(mr64)
  97. }
  98. asyncConnect := false
  99. if info.Config[asyncConnectKey] != "" {
  100. if asyncConnect, err = strconv.ParseBool(info.Config[asyncConnectKey]); err != nil {
  101. return nil, err
  102. }
  103. }
  104. subSecondPrecision := false
  105. if info.Config[subSecondPrecisionKey] != "" {
  106. if subSecondPrecision, err = strconv.ParseBool(info.Config[subSecondPrecisionKey]); err != nil {
  107. return nil, err
  108. }
  109. }
  110. fluentConfig := fluent.Config{
  111. FluentPort: loc.port,
  112. FluentHost: loc.host,
  113. FluentNetwork: loc.protocol,
  114. FluentSocketPath: loc.path,
  115. BufferLimit: bufferLimit,
  116. RetryWait: retryWait,
  117. MaxRetry: maxRetries,
  118. AsyncConnect: asyncConnect,
  119. SubSecondPrecision: subSecondPrecision,
  120. }
  121. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  122. Debug("logging driver fluentd configured")
  123. log, err := fluent.New(fluentConfig)
  124. if err != nil {
  125. return nil, err
  126. }
  127. return &fluentd{
  128. tag: tag,
  129. containerID: info.ContainerID,
  130. containerName: info.ContainerName,
  131. writer: log,
  132. extra: extra,
  133. }, nil
  134. }
  135. func (f *fluentd) Log(msg *logger.Message) error {
  136. data := map[string]string{
  137. "container_id": f.containerID,
  138. "container_name": f.containerName,
  139. "source": msg.Source,
  140. "log": string(msg.Line),
  141. }
  142. for k, v := range f.extra {
  143. data[k] = v
  144. }
  145. if msg.Partial {
  146. data["partial_message"] = "true"
  147. }
  148. ts := msg.Timestamp
  149. logger.PutMessage(msg)
  150. // fluent-logger-golang buffers logs from failures and disconnections,
  151. // and these are transferred again automatically.
  152. return f.writer.PostWithTime(f.tag, ts, data)
  153. }
  154. func (f *fluentd) Close() error {
  155. return f.writer.Close()
  156. }
  157. func (f *fluentd) Name() string {
  158. return name
  159. }
  160. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  161. func ValidateLogOpt(cfg map[string]string) error {
  162. for key := range cfg {
  163. switch key {
  164. case "env":
  165. case "env-regex":
  166. case "labels":
  167. case "tag":
  168. case addressKey:
  169. case bufferLimitKey:
  170. case retryWaitKey:
  171. case maxRetriesKey:
  172. case asyncConnectKey:
  173. case subSecondPrecisionKey:
  174. // Accepted
  175. default:
  176. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  177. }
  178. }
  179. _, err := parseAddress(cfg[addressKey])
  180. return err
  181. }
  182. func parseAddress(address string) (*location, error) {
  183. if address == "" {
  184. return &location{
  185. protocol: defaultProtocol,
  186. host: defaultHost,
  187. port: defaultPort,
  188. path: "",
  189. }, nil
  190. }
  191. protocol := defaultProtocol
  192. givenAddress := address
  193. if urlutil.IsTransportURL(address) {
  194. url, err := url.Parse(address)
  195. if err != nil {
  196. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  197. }
  198. // unix and unixgram socket
  199. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  200. return &location{
  201. protocol: url.Scheme,
  202. host: "",
  203. port: 0,
  204. path: url.Path,
  205. }, nil
  206. }
  207. // tcp|udp
  208. protocol = url.Scheme
  209. address = url.Host
  210. }
  211. host, port, err := net.SplitHostPort(address)
  212. if err != nil {
  213. if !strings.Contains(err.Error(), "missing port in address") {
  214. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  215. }
  216. return &location{
  217. protocol: protocol,
  218. host: host,
  219. port: defaultPort,
  220. path: "",
  221. }, nil
  222. }
  223. portnum, err := strconv.Atoi(port)
  224. if err != nil {
  225. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  226. }
  227. return &location{
  228. protocol: protocol,
  229. host: host,
  230. port: portnum,
  231. path: "",
  232. }, nil
  233. }