fluentd.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
  4. import (
  5. "math"
  6. "net/url"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/docker/docker/daemon/logger"
  11. "github.com/docker/docker/daemon/logger/loggerutils"
  12. "github.com/docker/docker/errdefs"
  13. units "github.com/docker/go-units"
  14. "github.com/fluent/fluent-logger-golang/fluent"
  15. "github.com/pkg/errors"
  16. "github.com/sirupsen/logrus"
  17. )
  18. type fluentd struct {
  19. tag string
  20. containerID string
  21. containerName string
  22. writer *fluent.Fluent
  23. extra map[string]string
  24. }
  25. type location struct {
  26. protocol string
  27. host string
  28. port int
  29. path string
  30. }
  31. const (
  32. name = "fluentd"
  33. defaultBufferLimit = 1024 * 1024
  34. defaultHost = "127.0.0.1"
  35. defaultPort = 24224
  36. defaultProtocol = "tcp"
  37. // logger tries to reconnect 2**32 - 1 times
  38. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  39. defaultMaxRetries = math.MaxInt32
  40. defaultRetryWait = 1000
  41. minReconnectInterval = 100 * time.Millisecond
  42. maxReconnectInterval = 10 * time.Second
  43. addressKey = "fluentd-address"
  44. asyncKey = "fluentd-async"
  45. asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
  46. asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
  47. bufferLimitKey = "fluentd-buffer-limit"
  48. maxRetriesKey = "fluentd-max-retries"
  49. requestAckKey = "fluentd-request-ack"
  50. retryWaitKey = "fluentd-retry-wait"
  51. subSecondPrecisionKey = "fluentd-sub-second-precision"
  52. )
  53. func init() {
  54. if err := logger.RegisterLogDriver(name, New); err != nil {
  55. logrus.Fatal(err)
  56. }
  57. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  58. logrus.Fatal(err)
  59. }
  60. }
  61. // New creates a fluentd logger using the configuration passed in on
  62. // the context. The supported context configuration variable is
  63. // fluentd-address.
  64. func New(info logger.Info) (logger.Logger, error) {
  65. fluentConfig, err := parseConfig(info.Config)
  66. if err != nil {
  67. return nil, errdefs.InvalidParameter(err)
  68. }
  69. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  70. if err != nil {
  71. return nil, errdefs.InvalidParameter(err)
  72. }
  73. extra, err := info.ExtraAttributes(nil)
  74. if err != nil {
  75. return nil, errdefs.InvalidParameter(err)
  76. }
  77. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  78. Debug("logging driver fluentd configured")
  79. log, err := fluent.New(fluentConfig)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return &fluentd{
  84. tag: tag,
  85. containerID: info.ContainerID,
  86. containerName: info.ContainerName,
  87. writer: log,
  88. extra: extra,
  89. }, nil
  90. }
  91. func (f *fluentd) Log(msg *logger.Message) error {
  92. data := map[string]string{
  93. "container_id": f.containerID,
  94. "container_name": f.containerName,
  95. "source": msg.Source,
  96. "log": string(msg.Line),
  97. }
  98. for k, v := range f.extra {
  99. data[k] = v
  100. }
  101. if msg.PLogMetaData != nil {
  102. data["partial_message"] = "true"
  103. data["partial_id"] = msg.PLogMetaData.ID
  104. data["partial_ordinal"] = strconv.Itoa(msg.PLogMetaData.Ordinal)
  105. data["partial_last"] = strconv.FormatBool(msg.PLogMetaData.Last)
  106. }
  107. ts := msg.Timestamp
  108. logger.PutMessage(msg)
  109. // fluent-logger-golang buffers logs from failures and disconnections,
  110. // and these are transferred again automatically.
  111. return f.writer.PostWithTime(f.tag, ts, data)
  112. }
  113. func (f *fluentd) Close() error {
  114. return f.writer.Close()
  115. }
  116. func (f *fluentd) Name() string {
  117. return name
  118. }
  119. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  120. func ValidateLogOpt(cfg map[string]string) error {
  121. for key := range cfg {
  122. switch key {
  123. case "env":
  124. case "env-regex":
  125. case "labels":
  126. case "labels-regex":
  127. case "tag":
  128. case addressKey:
  129. case asyncKey:
  130. case asyncConnectKey:
  131. case asyncReconnectIntervalKey:
  132. case bufferLimitKey:
  133. case maxRetriesKey:
  134. case requestAckKey:
  135. case retryWaitKey:
  136. case subSecondPrecisionKey:
  137. // Accepted
  138. default:
  139. return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
  140. }
  141. }
  142. _, err := parseConfig(cfg)
  143. return err
  144. }
  145. func parseConfig(cfg map[string]string) (fluent.Config, error) {
  146. var config fluent.Config
  147. loc, err := parseAddress(cfg[addressKey])
  148. if err != nil {
  149. return config, errors.Wrapf(err, "invalid fluentd-address (%s)", cfg[addressKey])
  150. }
  151. bufferLimit := defaultBufferLimit
  152. if cfg[bufferLimitKey] != "" {
  153. bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
  154. if err != nil {
  155. return config, err
  156. }
  157. bufferLimit = int(bl64)
  158. }
  159. retryWait := defaultRetryWait
  160. if cfg[retryWaitKey] != "" {
  161. rwd, err := time.ParseDuration(cfg[retryWaitKey])
  162. if err != nil {
  163. return config, err
  164. }
  165. retryWait = int(rwd.Seconds() * 1000)
  166. }
  167. maxRetries := defaultMaxRetries
  168. if cfg[maxRetriesKey] != "" {
  169. mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
  170. if err != nil {
  171. return config, err
  172. }
  173. maxRetries = int(mr64)
  174. }
  175. if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
  176. return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
  177. }
  178. async := false
  179. if cfg[asyncKey] != "" {
  180. if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
  181. return config, err
  182. }
  183. }
  184. // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
  185. asyncConnect := false
  186. if cfg[asyncConnectKey] != "" {
  187. if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
  188. return config, err
  189. }
  190. }
  191. asyncReconnectInterval := 0
  192. if cfg[asyncReconnectIntervalKey] != "" {
  193. interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
  194. if err != nil {
  195. return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
  196. }
  197. if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
  198. return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
  199. asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
  200. }
  201. asyncReconnectInterval = int(interval.Milliseconds())
  202. }
  203. subSecondPrecision := false
  204. if cfg[subSecondPrecisionKey] != "" {
  205. if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
  206. return config, err
  207. }
  208. }
  209. requestAck := false
  210. if cfg[requestAckKey] != "" {
  211. if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
  212. return config, err
  213. }
  214. }
  215. config = fluent.Config{
  216. FluentPort: loc.port,
  217. FluentHost: loc.host,
  218. FluentNetwork: loc.protocol,
  219. FluentSocketPath: loc.path,
  220. BufferLimit: bufferLimit,
  221. RetryWait: retryWait,
  222. MaxRetry: maxRetries,
  223. Async: async,
  224. AsyncConnect: asyncConnect,
  225. AsyncReconnectInterval: asyncReconnectInterval,
  226. SubSecondPrecision: subSecondPrecision,
  227. RequestAck: requestAck,
  228. ForceStopAsyncSend: async || asyncConnect,
  229. }
  230. return config, nil
  231. }
  232. func parseAddress(address string) (*location, error) {
  233. if address == "" {
  234. return &location{
  235. protocol: defaultProtocol,
  236. host: defaultHost,
  237. port: defaultPort,
  238. path: "",
  239. }, nil
  240. }
  241. if !strings.Contains(address, "://") {
  242. address = defaultProtocol + "://" + address
  243. }
  244. addr, err := url.Parse(address)
  245. if err != nil {
  246. return nil, err
  247. }
  248. switch addr.Scheme {
  249. case "unix":
  250. if strings.TrimLeft(addr.Path, "/") == "" {
  251. return nil, errors.New("path is empty")
  252. }
  253. return &location{protocol: addr.Scheme, path: addr.Path}, nil
  254. case "tcp", "tls":
  255. // continue processing below
  256. default:
  257. return nil, errors.Errorf("unsupported scheme: '%s'", addr.Scheme)
  258. }
  259. if addr.Path != "" {
  260. return nil, errors.New("should not contain a path element")
  261. }
  262. host := defaultHost
  263. port := defaultPort
  264. if h := addr.Hostname(); h != "" {
  265. host = h
  266. }
  267. if p := addr.Port(); p != "" {
  268. // Port numbers are 16 bit: https://www.ietf.org/rfc/rfc793.html#section-3.1
  269. portNum, err := strconv.ParseUint(p, 10, 16)
  270. if err != nil {
  271. return nil, errors.Wrap(err, "invalid port")
  272. }
  273. port = int(portNum)
  274. }
  275. return &location{
  276. protocol: addr.Scheme,
  277. host: host,
  278. port: port,
  279. path: "",
  280. }, nil
  281. }