fluentd.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
  4. import (
  5. "math"
  6. "net"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/daemon/logger/loggerutils"
  13. "github.com/docker/docker/errdefs"
  14. "github.com/docker/docker/pkg/urlutil"
  15. units "github.com/docker/go-units"
  16. "github.com/fluent/fluent-logger-golang/fluent"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultBufferLimit = 1024 * 1024
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultProtocol = "tcp"
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultMaxRetries = math.MaxInt32
  42. defaultRetryWait = 1000
  43. minReconnectInterval = 100 * time.Millisecond
  44. maxReconnectInterval = 10 * time.Second
  45. addressKey = "fluentd-address"
  46. asyncKey = "fluentd-async"
  47. asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
  48. asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
  49. bufferLimitKey = "fluentd-buffer-limit"
  50. maxRetriesKey = "fluentd-max-retries"
  51. requestAckKey = "fluentd-request-ack"
  52. retryWaitKey = "fluentd-retry-wait"
  53. subSecondPrecisionKey = "fluentd-sub-second-precision"
  54. )
  55. func init() {
  56. if err := logger.RegisterLogDriver(name, New); err != nil {
  57. logrus.Fatal(err)
  58. }
  59. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  60. logrus.Fatal(err)
  61. }
  62. }
  63. // New creates a fluentd logger using the configuration passed in on
  64. // the context. The supported context configuration variable is
  65. // fluentd-address.
  66. func New(info logger.Info) (logger.Logger, error) {
  67. fluentConfig, err := parseConfig(info.Config)
  68. if err != nil {
  69. return nil, errdefs.InvalidParameter(err)
  70. }
  71. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  72. if err != nil {
  73. return nil, errdefs.InvalidParameter(err)
  74. }
  75. extra, err := info.ExtraAttributes(nil)
  76. if err != nil {
  77. return nil, errdefs.InvalidParameter(err)
  78. }
  79. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  80. Debug("logging driver fluentd configured")
  81. log, err := fluent.New(fluentConfig)
  82. if err != nil {
  83. return nil, err
  84. }
  85. return &fluentd{
  86. tag: tag,
  87. containerID: info.ContainerID,
  88. containerName: info.ContainerName,
  89. writer: log,
  90. extra: extra,
  91. }, nil
  92. }
  93. func (f *fluentd) Log(msg *logger.Message) error {
  94. data := map[string]string{
  95. "container_id": f.containerID,
  96. "container_name": f.containerName,
  97. "source": msg.Source,
  98. "log": string(msg.Line),
  99. }
  100. for k, v := range f.extra {
  101. data[k] = v
  102. }
  103. if msg.PLogMetaData != nil {
  104. data["partial_message"] = "true"
  105. data["partial_id"] = msg.PLogMetaData.ID
  106. data["partial_ordinal"] = strconv.Itoa(msg.PLogMetaData.Ordinal)
  107. data["partial_last"] = strconv.FormatBool(msg.PLogMetaData.Last)
  108. }
  109. ts := msg.Timestamp
  110. logger.PutMessage(msg)
  111. // fluent-logger-golang buffers logs from failures and disconnections,
  112. // and these are transferred again automatically.
  113. return f.writer.PostWithTime(f.tag, ts, data)
  114. }
  115. func (f *fluentd) Close() error {
  116. return f.writer.Close()
  117. }
  118. func (f *fluentd) Name() string {
  119. return name
  120. }
  121. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  122. func ValidateLogOpt(cfg map[string]string) error {
  123. for key := range cfg {
  124. switch key {
  125. case "env":
  126. case "env-regex":
  127. case "labels":
  128. case "labels-regex":
  129. case "tag":
  130. case addressKey:
  131. case asyncKey:
  132. case asyncConnectKey:
  133. case asyncReconnectIntervalKey:
  134. case bufferLimitKey:
  135. case maxRetriesKey:
  136. case requestAckKey:
  137. case retryWaitKey:
  138. case subSecondPrecisionKey:
  139. // Accepted
  140. default:
  141. return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
  142. }
  143. }
  144. _, err := parseConfig(cfg)
  145. return err
  146. }
  147. func parseConfig(cfg map[string]string) (fluent.Config, error) {
  148. var config fluent.Config
  149. loc, err := parseAddress(cfg[addressKey])
  150. if err != nil {
  151. return config, err
  152. }
  153. bufferLimit := defaultBufferLimit
  154. if cfg[bufferLimitKey] != "" {
  155. bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
  156. if err != nil {
  157. return config, err
  158. }
  159. bufferLimit = int(bl64)
  160. }
  161. retryWait := defaultRetryWait
  162. if cfg[retryWaitKey] != "" {
  163. rwd, err := time.ParseDuration(cfg[retryWaitKey])
  164. if err != nil {
  165. return config, err
  166. }
  167. retryWait = int(rwd.Seconds() * 1000)
  168. }
  169. maxRetries := defaultMaxRetries
  170. if cfg[maxRetriesKey] != "" {
  171. mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
  172. if err != nil {
  173. return config, err
  174. }
  175. maxRetries = int(mr64)
  176. }
  177. if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
  178. return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
  179. }
  180. async := false
  181. if cfg[asyncKey] != "" {
  182. if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
  183. return config, err
  184. }
  185. }
  186. // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
  187. asyncConnect := false
  188. if cfg[asyncConnectKey] != "" {
  189. if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
  190. return config, err
  191. }
  192. }
  193. asyncReconnectInterval := 0
  194. if cfg[asyncReconnectIntervalKey] != "" {
  195. interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
  196. if err != nil {
  197. return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
  198. }
  199. if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
  200. return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
  201. asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
  202. }
  203. asyncReconnectInterval = int(interval.Milliseconds())
  204. }
  205. subSecondPrecision := false
  206. if cfg[subSecondPrecisionKey] != "" {
  207. if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
  208. return config, err
  209. }
  210. }
  211. requestAck := false
  212. if cfg[requestAckKey] != "" {
  213. if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
  214. return config, err
  215. }
  216. }
  217. config = fluent.Config{
  218. FluentPort: loc.port,
  219. FluentHost: loc.host,
  220. FluentNetwork: loc.protocol,
  221. FluentSocketPath: loc.path,
  222. BufferLimit: bufferLimit,
  223. RetryWait: retryWait,
  224. MaxRetry: maxRetries,
  225. Async: async,
  226. AsyncConnect: asyncConnect,
  227. AsyncReconnectInterval: asyncReconnectInterval,
  228. SubSecondPrecision: subSecondPrecision,
  229. RequestAck: requestAck,
  230. ForceStopAsyncSend: async || asyncConnect,
  231. }
  232. return config, nil
  233. }
  234. func parseAddress(address string) (*location, error) {
  235. if address == "" {
  236. return &location{
  237. protocol: defaultProtocol,
  238. host: defaultHost,
  239. port: defaultPort,
  240. path: "",
  241. }, nil
  242. }
  243. protocol := defaultProtocol
  244. givenAddress := address
  245. if urlutil.IsTransportURL(address) {
  246. url, err := url.Parse(address)
  247. if err != nil {
  248. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  249. }
  250. // unix and unixgram socket
  251. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  252. return &location{
  253. protocol: url.Scheme,
  254. host: "",
  255. port: 0,
  256. path: url.Path,
  257. }, nil
  258. }
  259. // tcp|udp
  260. protocol = url.Scheme
  261. address = url.Host
  262. }
  263. host, port, err := net.SplitHostPort(address)
  264. if err != nil {
  265. if !strings.Contains(err.Error(), "missing port in address") {
  266. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  267. }
  268. return &location{
  269. protocol: protocol,
  270. host: host,
  271. port: defaultPort,
  272. path: "",
  273. }, nil
  274. }
  275. portnum, err := strconv.Atoi(port)
  276. if err != nil {
  277. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  278. }
  279. return &location{
  280. protocol: protocol,
  281. host: host,
  282. port: portnum,
  283. path: "",
  284. }, nil
  285. }