fluentd.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
  4. import (
  5. "context"
  6. "math"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/containerd/log"
  12. "github.com/docker/docker/daemon/logger"
  13. "github.com/docker/docker/daemon/logger/loggerutils"
  14. "github.com/docker/docker/errdefs"
  15. units "github.com/docker/go-units"
  16. "github.com/fluent/fluent-logger-golang/fluent"
  17. "github.com/pkg/errors"
  18. )
  19. type fluentd struct {
  20. tag string
  21. containerID string
  22. containerName string
  23. writer *fluent.Fluent
  24. extra map[string]string
  25. }
  26. type location struct {
  27. protocol string
  28. host string
  29. port int
  30. path string
  31. }
  32. const (
  33. name = "fluentd"
  34. defaultBufferLimit = 1024 * 1024
  35. defaultHost = "127.0.0.1"
  36. defaultPort = 24224
  37. defaultProtocol = "tcp"
  38. // logger tries to reconnect 2**32 - 1 times
  39. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  40. defaultMaxRetries = math.MaxInt32
  41. defaultRetryWait = 1000
  42. minReconnectInterval = 100 * time.Millisecond
  43. maxReconnectInterval = 10 * time.Second
  44. addressKey = "fluentd-address"
  45. asyncKey = "fluentd-async"
  46. asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
  47. asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
  48. bufferLimitKey = "fluentd-buffer-limit"
  49. maxRetriesKey = "fluentd-max-retries"
  50. requestAckKey = "fluentd-request-ack"
  51. retryWaitKey = "fluentd-retry-wait"
  52. subSecondPrecisionKey = "fluentd-sub-second-precision"
  53. )
  54. func init() {
  55. if err := logger.RegisterLogDriver(name, New); err != nil {
  56. panic(err)
  57. }
  58. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  59. panic(err)
  60. }
  61. }
  62. // New creates a fluentd logger using the configuration passed in on
  63. // the context. The supported context configuration variable is
  64. // fluentd-address.
  65. func New(info logger.Info) (logger.Logger, error) {
  66. fluentConfig, err := parseConfig(info.Config)
  67. if err != nil {
  68. return nil, errdefs.InvalidParameter(err)
  69. }
  70. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  71. if err != nil {
  72. return nil, errdefs.InvalidParameter(err)
  73. }
  74. extra, err := info.ExtraAttributes(nil)
  75. if err != nil {
  76. return nil, errdefs.InvalidParameter(err)
  77. }
  78. log.G(context.TODO()).WithField("container", info.ContainerID).WithField("config", fluentConfig).
  79. Debug("logging driver fluentd configured")
  80. log, err := fluent.New(fluentConfig)
  81. if err != nil {
  82. return nil, err
  83. }
  84. return &fluentd{
  85. tag: tag,
  86. containerID: info.ContainerID,
  87. containerName: info.ContainerName,
  88. writer: log,
  89. extra: extra,
  90. }, nil
  91. }
  92. func (f *fluentd) Log(msg *logger.Message) error {
  93. data := map[string]string{
  94. "container_id": f.containerID,
  95. "container_name": f.containerName,
  96. "source": msg.Source,
  97. "log": string(msg.Line),
  98. }
  99. for k, v := range f.extra {
  100. data[k] = v
  101. }
  102. if msg.PLogMetaData != nil {
  103. data["partial_message"] = "true"
  104. data["partial_id"] = msg.PLogMetaData.ID
  105. data["partial_ordinal"] = strconv.Itoa(msg.PLogMetaData.Ordinal)
  106. data["partial_last"] = strconv.FormatBool(msg.PLogMetaData.Last)
  107. }
  108. ts := msg.Timestamp
  109. logger.PutMessage(msg)
  110. // fluent-logger-golang buffers logs from failures and disconnections,
  111. // and these are transferred again automatically.
  112. return f.writer.PostWithTime(f.tag, ts, data)
  113. }
  114. func (f *fluentd) Close() error {
  115. return f.writer.Close()
  116. }
  117. func (f *fluentd) Name() string {
  118. return name
  119. }
  120. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  121. func ValidateLogOpt(cfg map[string]string) error {
  122. for key := range cfg {
  123. switch key {
  124. case "env":
  125. case "env-regex":
  126. case "labels":
  127. case "labels-regex":
  128. case "tag":
  129. case addressKey:
  130. case asyncKey:
  131. case asyncConnectKey:
  132. case asyncReconnectIntervalKey:
  133. case bufferLimitKey:
  134. case maxRetriesKey:
  135. case requestAckKey:
  136. case retryWaitKey:
  137. case subSecondPrecisionKey:
  138. // Accepted
  139. default:
  140. return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
  141. }
  142. }
  143. _, err := parseConfig(cfg)
  144. return err
  145. }
  146. func parseConfig(cfg map[string]string) (fluent.Config, error) {
  147. var config fluent.Config
  148. loc, err := parseAddress(cfg[addressKey])
  149. if err != nil {
  150. return config, errors.Wrapf(err, "invalid fluentd-address (%s)", cfg[addressKey])
  151. }
  152. bufferLimit := defaultBufferLimit
  153. if cfg[bufferLimitKey] != "" {
  154. bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
  155. if err != nil {
  156. return config, err
  157. }
  158. bufferLimit = int(bl64)
  159. }
  160. retryWait := defaultRetryWait
  161. if cfg[retryWaitKey] != "" {
  162. rwd, err := time.ParseDuration(cfg[retryWaitKey])
  163. if err != nil {
  164. return config, err
  165. }
  166. retryWait = int(rwd.Seconds() * 1000)
  167. }
  168. maxRetries := defaultMaxRetries
  169. if cfg[maxRetriesKey] != "" {
  170. mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
  171. if err != nil {
  172. return config, err
  173. }
  174. maxRetries = int(mr64)
  175. }
  176. if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
  177. return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
  178. }
  179. async := false
  180. if cfg[asyncKey] != "" {
  181. if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
  182. return config, err
  183. }
  184. }
  185. // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
  186. asyncConnect := false
  187. if cfg[asyncConnectKey] != "" {
  188. if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
  189. return config, err
  190. }
  191. }
  192. asyncReconnectInterval := 0
  193. if cfg[asyncReconnectIntervalKey] != "" {
  194. interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
  195. if err != nil {
  196. return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
  197. }
  198. if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
  199. return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
  200. asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
  201. }
  202. asyncReconnectInterval = int(interval.Milliseconds())
  203. }
  204. subSecondPrecision := false
  205. if cfg[subSecondPrecisionKey] != "" {
  206. if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
  207. return config, err
  208. }
  209. }
  210. requestAck := false
  211. if cfg[requestAckKey] != "" {
  212. if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
  213. return config, err
  214. }
  215. }
  216. config = fluent.Config{
  217. FluentPort: loc.port,
  218. FluentHost: loc.host,
  219. FluentNetwork: loc.protocol,
  220. FluentSocketPath: loc.path,
  221. BufferLimit: bufferLimit,
  222. RetryWait: retryWait,
  223. MaxRetry: maxRetries,
  224. Async: async,
  225. AsyncConnect: asyncConnect,
  226. AsyncReconnectInterval: asyncReconnectInterval,
  227. SubSecondPrecision: subSecondPrecision,
  228. RequestAck: requestAck,
  229. ForceStopAsyncSend: async || asyncConnect,
  230. }
  231. return config, nil
  232. }
  233. func parseAddress(address string) (*location, error) {
  234. if address == "" {
  235. return &location{
  236. protocol: defaultProtocol,
  237. host: defaultHost,
  238. port: defaultPort,
  239. path: "",
  240. }, nil
  241. }
  242. if !strings.Contains(address, "://") {
  243. address = defaultProtocol + "://" + address
  244. }
  245. addr, err := url.Parse(address)
  246. if err != nil {
  247. return nil, err
  248. }
  249. switch addr.Scheme {
  250. case "unix":
  251. if strings.TrimLeft(addr.Path, "/") == "" {
  252. return nil, errors.New("path is empty")
  253. }
  254. return &location{protocol: addr.Scheme, path: addr.Path}, nil
  255. case "tcp", "tls":
  256. // continue processing below
  257. default:
  258. return nil, errors.Errorf("unsupported scheme: '%s'", addr.Scheme)
  259. }
  260. if addr.Path != "" {
  261. return nil, errors.New("should not contain a path element")
  262. }
  263. host := defaultHost
  264. port := defaultPort
  265. if h := addr.Hostname(); h != "" {
  266. host = h
  267. }
  268. if p := addr.Port(); p != "" {
  269. // Port numbers are 16 bit: https://www.ietf.org/rfc/rfc793.html#section-3.1
  270. portNum, err := strconv.ParseUint(p, 10, 16)
  271. if err != nil {
  272. return nil, errors.Wrap(err, "invalid port")
  273. }
  274. port = int(portNum)
  275. }
  276. return &location{
  277. protocol: addr.Scheme,
  278. host: host,
  279. port: port,
  280. path: "",
  281. }, nil
  282. }