fluentd.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // Package fluentd provides the log driver for forwarding server logs
  2. // to fluentd endpoints.
  3. package fluentd
  4. import (
  5. "fmt"
  6. "math"
  7. "net"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/daemon/logger"
  14. "github.com/docker/docker/daemon/logger/loggerutils"
  15. "github.com/docker/docker/pkg/urlutil"
  16. "github.com/docker/go-units"
  17. "github.com/fluent/fluent-logger-golang/fluent"
  18. "github.com/pkg/errors"
  19. )
  20. type fluentd struct {
  21. tag string
  22. containerID string
  23. containerName string
  24. writer *fluent.Fluent
  25. extra map[string]string
  26. }
  27. type location struct {
  28. protocol string
  29. host string
  30. port int
  31. path string
  32. }
  33. const (
  34. name = "fluentd"
  35. defaultProtocol = "tcp"
  36. defaultHost = "127.0.0.1"
  37. defaultPort = 24224
  38. defaultBufferLimit = 1024 * 1024
  39. // logger tries to reconnect 2**32 - 1 times
  40. // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
  41. defaultRetryWait = 1000
  42. defaultMaxRetries = math.MaxInt32
  43. addressKey = "fluentd-address"
  44. bufferLimitKey = "fluentd-buffer-limit"
  45. retryWaitKey = "fluentd-retry-wait"
  46. maxRetriesKey = "fluentd-max-retries"
  47. asyncConnectKey = "fluentd-async-connect"
  48. )
  49. func init() {
  50. if err := logger.RegisterLogDriver(name, New); err != nil {
  51. logrus.Fatal(err)
  52. }
  53. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  54. logrus.Fatal(err)
  55. }
  56. }
  57. // New creates a fluentd logger using the configuration passed in on
  58. // the context. The supported context configuration variable is
  59. // fluentd-address.
  60. func New(info logger.Info) (logger.Logger, error) {
  61. loc, err := parseAddress(info.Config[addressKey])
  62. if err != nil {
  63. return nil, err
  64. }
  65. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  66. if err != nil {
  67. return nil, err
  68. }
  69. extra, err := info.ExtraAttributes(nil)
  70. if err != nil {
  71. return nil, err
  72. }
  73. bufferLimit := defaultBufferLimit
  74. if info.Config[bufferLimitKey] != "" {
  75. bl64, err := units.RAMInBytes(info.Config[bufferLimitKey])
  76. if err != nil {
  77. return nil, err
  78. }
  79. bufferLimit = int(bl64)
  80. }
  81. retryWait := defaultRetryWait
  82. if info.Config[retryWaitKey] != "" {
  83. rwd, err := time.ParseDuration(info.Config[retryWaitKey])
  84. if err != nil {
  85. return nil, err
  86. }
  87. retryWait = int(rwd.Seconds() * 1000)
  88. }
  89. maxRetries := defaultMaxRetries
  90. if info.Config[maxRetriesKey] != "" {
  91. mr64, err := strconv.ParseUint(info.Config[maxRetriesKey], 10, strconv.IntSize)
  92. if err != nil {
  93. return nil, err
  94. }
  95. maxRetries = int(mr64)
  96. }
  97. asyncConnect := false
  98. if info.Config[asyncConnectKey] != "" {
  99. if asyncConnect, err = strconv.ParseBool(info.Config[asyncConnectKey]); err != nil {
  100. return nil, err
  101. }
  102. }
  103. fluentConfig := fluent.Config{
  104. FluentPort: loc.port,
  105. FluentHost: loc.host,
  106. FluentNetwork: loc.protocol,
  107. FluentSocketPath: loc.path,
  108. BufferLimit: bufferLimit,
  109. RetryWait: retryWait,
  110. MaxRetry: maxRetries,
  111. AsyncConnect: asyncConnect,
  112. }
  113. logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
  114. Debug("logging driver fluentd configured")
  115. log, err := fluent.New(fluentConfig)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return &fluentd{
  120. tag: tag,
  121. containerID: info.ContainerID,
  122. containerName: info.ContainerName,
  123. writer: log,
  124. extra: extra,
  125. }, nil
  126. }
  127. func (f *fluentd) Log(msg *logger.Message) error {
  128. data := map[string]string{
  129. "container_id": f.containerID,
  130. "container_name": f.containerName,
  131. "source": msg.Source,
  132. "log": string(msg.Line),
  133. }
  134. for k, v := range f.extra {
  135. data[k] = v
  136. }
  137. ts := msg.Timestamp
  138. logger.PutMessage(msg)
  139. // fluent-logger-golang buffers logs from failures and disconnections,
  140. // and these are transferred again automatically.
  141. return f.writer.PostWithTime(f.tag, ts, data)
  142. }
  143. func (f *fluentd) Close() error {
  144. return f.writer.Close()
  145. }
  146. func (f *fluentd) Name() string {
  147. return name
  148. }
  149. // ValidateLogOpt looks for fluentd specific log option fluentd-address.
  150. func ValidateLogOpt(cfg map[string]string) error {
  151. for key := range cfg {
  152. switch key {
  153. case "env":
  154. case "env-regex":
  155. case "labels":
  156. case "tag":
  157. case addressKey:
  158. case bufferLimitKey:
  159. case retryWaitKey:
  160. case maxRetriesKey:
  161. case asyncConnectKey:
  162. // Accepted
  163. default:
  164. return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
  165. }
  166. }
  167. _, err := parseAddress(cfg["fluentd-address"])
  168. return err
  169. }
  170. func parseAddress(address string) (*location, error) {
  171. if address == "" {
  172. return &location{
  173. protocol: defaultProtocol,
  174. host: defaultHost,
  175. port: defaultPort,
  176. path: "",
  177. }, nil
  178. }
  179. protocol := defaultProtocol
  180. givenAddress := address
  181. if urlutil.IsTransportURL(address) {
  182. url, err := url.Parse(address)
  183. if err != nil {
  184. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  185. }
  186. // unix and unixgram socket
  187. if url.Scheme == "unix" || url.Scheme == "unixgram" {
  188. return &location{
  189. protocol: url.Scheme,
  190. host: "",
  191. port: 0,
  192. path: url.Path,
  193. }, nil
  194. }
  195. // tcp|udp
  196. protocol = url.Scheme
  197. address = url.Host
  198. }
  199. host, port, err := net.SplitHostPort(address)
  200. if err != nil {
  201. if !strings.Contains(err.Error(), "missing port in address") {
  202. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  203. }
  204. return &location{
  205. protocol: protocol,
  206. host: host,
  207. port: defaultPort,
  208. path: "",
  209. }, nil
  210. }
  211. portnum, err := strconv.Atoi(port)
  212. if err != nil {
  213. return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
  214. }
  215. return &location{
  216. protocol: protocol,
  217. host: host,
  218. port: portnum,
  219. path: "",
  220. }, nil
  221. }