fluentd.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
  4. import (
  5. "math"
  6. "net"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/daemon/logger/loggerutils"
  13. "github.com/docker/docker/errdefs"
  14. "github.com/docker/docker/pkg/urlutil"
  15. units "github.com/docker/go-units"
  16. "github.com/fluent/fluent-logger-golang/fluent"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultBufferLimit = 1024 * 1024
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultProtocol = "tcp"
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultMaxRetries = math.MaxInt32
  42. defaultRetryWait = 1000
  43. addressKey = "fluentd-address"
  44. asyncKey = "fluentd-async"
  45. asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
  46. bufferLimitKey = "fluentd-buffer-limit"
  47. maxRetriesKey = "fluentd-max-retries"
  48. requestAckKey = "fluentd-request-ack"
  49. retryWaitKey = "fluentd-retry-wait"
  50. subSecondPrecisionKey = "fluentd-sub-second-precision"
  51. )
  52. func init() {
  53. if err := logger.RegisterLogDriver(name, New); err != nil {
  54. logrus.Fatal(err)
  55. }
  56. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  57. logrus.Fatal(err)
  58. }
  59. }
  60. // New creates a fluentd logger using the configuration passed in on
  61. // the context. The supported context configuration variable is
  62. // fluentd-address.
  63. func New(info logger.Info) (logger.Logger, error) {
  64. fluentConfig, err := parseConfig(info.Config)
  65. if err != nil {
  66. return nil, errdefs.InvalidParameter(err)
  67. }
  68. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  69. if err != nil {
  70. return nil, errdefs.InvalidParameter(err)
  71. }
  72. extra, err := info.ExtraAttributes(nil)
  73. if err != nil {
  74. return nil, errdefs.InvalidParameter(err)
  75. }
  76. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  77. Debug("logging driver fluentd configured")
  78. log, err := fluent.New(fluentConfig)
  79. if err != nil {
  80. return nil, err
  81. }
  82. return &fluentd{
  83. tag: tag,
  84. containerID: info.ContainerID,
  85. containerName: info.ContainerName,
  86. writer: log,
  87. extra: extra,
  88. }, nil
  89. }
  90. func (f *fluentd) Log(msg *logger.Message) error {
  91. data := map[string]string{
  92. "container_id": f.containerID,
  93. "container_name": f.containerName,
  94. "source": msg.Source,
  95. "log": string(msg.Line),
  96. }
  97. for k, v := range f.extra {
  98. data[k] = v
  99. }
  100. if msg.PLogMetaData != nil {
  101. data["partial_message"] = "true"
  102. data["partial_id"] = msg.PLogMetaData.ID
  103. data["partial_ordinal"] = strconv.Itoa(msg.PLogMetaData.Ordinal)
  104. data["partial_last"] = strconv.FormatBool(msg.PLogMetaData.Last)
  105. }
  106. ts := msg.Timestamp
  107. logger.PutMessage(msg)
  108. // fluent-logger-golang buffers logs from failures and disconnections,
  109. // and these are transferred again automatically.
  110. return f.writer.PostWithTime(f.tag, ts, data)
  111. }
  112. func (f *fluentd) Close() error {
  113. return f.writer.Close()
  114. }
  115. func (f *fluentd) Name() string {
  116. return name
  117. }
  118. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  119. func ValidateLogOpt(cfg map[string]string) error {
  120. for key := range cfg {
  121. switch key {
  122. case "env":
  123. case "env-regex":
  124. case "labels":
  125. case "labels-regex":
  126. case "tag":
  127. case addressKey:
  128. case asyncKey:
  129. case asyncConnectKey:
  130. case bufferLimitKey:
  131. case maxRetriesKey:
  132. case requestAckKey:
  133. case retryWaitKey:
  134. case subSecondPrecisionKey:
  135. // Accepted
  136. default:
  137. return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
  138. }
  139. }
  140. _, err := parseConfig(cfg)
  141. return err
  142. }
  143. func parseConfig(cfg map[string]string) (fluent.Config, error) {
  144. var config fluent.Config
  145. loc, err := parseAddress(cfg[addressKey])
  146. if err != nil {
  147. return config, err
  148. }
  149. bufferLimit := defaultBufferLimit
  150. if cfg[bufferLimitKey] != "" {
  151. bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
  152. if err != nil {
  153. return config, err
  154. }
  155. bufferLimit = int(bl64)
  156. }
  157. retryWait := defaultRetryWait
  158. if cfg[retryWaitKey] != "" {
  159. rwd, err := time.ParseDuration(cfg[retryWaitKey])
  160. if err != nil {
  161. return config, err
  162. }
  163. retryWait = int(rwd.Seconds() * 1000)
  164. }
  165. maxRetries := defaultMaxRetries
  166. if cfg[maxRetriesKey] != "" {
  167. mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
  168. if err != nil {
  169. return config, err
  170. }
  171. maxRetries = int(mr64)
  172. }
  173. if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
  174. return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
  175. }
  176. async := false
  177. if cfg[asyncKey] != "" {
  178. if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
  179. return config, err
  180. }
  181. }
  182. // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
  183. asyncConnect := false
  184. if cfg[asyncConnectKey] != "" {
  185. if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
  186. return config, err
  187. }
  188. }
  189. subSecondPrecision := false
  190. if cfg[subSecondPrecisionKey] != "" {
  191. if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
  192. return config, err
  193. }
  194. }
  195. requestAck := false
  196. if cfg[requestAckKey] != "" {
  197. if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
  198. return config, err
  199. }
  200. }
  201. config = fluent.Config{
  202. FluentPort: loc.port,
  203. FluentHost: loc.host,
  204. FluentNetwork: loc.protocol,
  205. FluentSocketPath: loc.path,
  206. BufferLimit: bufferLimit,
  207. RetryWait: retryWait,
  208. MaxRetry: maxRetries,
  209. Async: async,
  210. AsyncConnect: asyncConnect,
  211. SubSecondPrecision: subSecondPrecision,
  212. RequestAck: requestAck,
  213. ForceStopAsyncSend: async || asyncConnect,
  214. }
  215. return config, nil
  216. }
  217. func parseAddress(address string) (*location, error) {
  218. if address == "" {
  219. return &location{
  220. protocol: defaultProtocol,
  221. host: defaultHost,
  222. port: defaultPort,
  223. path: "",
  224. }, nil
  225. }
  226. protocol := defaultProtocol
  227. givenAddress := address
  228. if urlutil.IsTransportURL(address) {
  229. url, err := url.Parse(address)
  230. if err != nil {
  231. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  232. }
  233. // unix and unixgram socket
  234. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  235. return &location{
  236. protocol: url.Scheme,
  237. host: "",
  238. port: 0,
  239. path: url.Path,
  240. }, nil
  241. }
  242. // tcp|udp
  243. protocol = url.Scheme
  244. address = url.Host
  245. }
  246. host, port, err := net.SplitHostPort(address)
  247. if err != nil {
  248. if !strings.Contains(err.Error(), "missing port in address") {
  249. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  250. }
  251. return &location{
  252. protocol: protocol,
  253. host: host,
  254. port: defaultPort,
  255. path: "",
  256. }, nil
  257. }
  258. portnum, err := strconv.Atoi(port)
  259. if err != nil {
  260. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  261. }
  262. return &location{
  263. protocol: protocol,
  264. host: host,
  265. port: portnum,
  266. path: "",
  267. }, nil
  268. }