123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- // Package fluentd provides the log driver for forwarding server logs
- // to fluentd endpoints.
- package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
- import (
- "context"
- "math"
- "net/url"
- "strconv"
- "strings"
- "time"
- "github.com/containerd/log"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/daemon/logger/loggerutils"
- "github.com/docker/docker/errdefs"
- units "github.com/docker/go-units"
- "github.com/fluent/fluent-logger-golang/fluent"
- "github.com/pkg/errors"
- )
- type fluentd struct {
- tag string
- containerID string
- containerName string
- writer *fluent.Fluent
- extra map[string]string
- }
- type location struct {
- protocol string
- host string
- port int
- path string
- }
- const (
- name = "fluentd"
- defaultBufferLimit = 1024 * 1024
- defaultHost = "127.0.0.1"
- defaultPort = 24224
- defaultProtocol = "tcp"
- // logger tries to reconnect 2**32 - 1 times
- // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
- defaultMaxRetries = math.MaxInt32
- defaultRetryWait = 1000
- minReconnectInterval = 100 * time.Millisecond
- maxReconnectInterval = 10 * time.Second
- addressKey = "fluentd-address"
- asyncKey = "fluentd-async"
- asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
- asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
- bufferLimitKey = "fluentd-buffer-limit"
- maxRetriesKey = "fluentd-max-retries"
- requestAckKey = "fluentd-request-ack"
- retryWaitKey = "fluentd-retry-wait"
- subSecondPrecisionKey = "fluentd-sub-second-precision"
- )
- func init() {
- if err := logger.RegisterLogDriver(name, New); err != nil {
- panic(err)
- }
- if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
- panic(err)
- }
- }
- // New creates a fluentd logger using the configuration passed in on
- // the context. The supported context configuration variable is
- // fluentd-address.
- func New(info logger.Info) (logger.Logger, error) {
- fluentConfig, err := parseConfig(info.Config)
- if err != nil {
- return nil, errdefs.InvalidParameter(err)
- }
- tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
- if err != nil {
- return nil, errdefs.InvalidParameter(err)
- }
- extra, err := info.ExtraAttributes(nil)
- if err != nil {
- return nil, errdefs.InvalidParameter(err)
- }
- log.G(context.TODO()).WithField("container", info.ContainerID).WithField("config", fluentConfig).
- Debug("logging driver fluentd configured")
- log, err := fluent.New(fluentConfig)
- if err != nil {
- return nil, err
- }
- return &fluentd{
- tag: tag,
- containerID: info.ContainerID,
- containerName: info.ContainerName,
- writer: log,
- extra: extra,
- }, nil
- }
- func (f *fluentd) Log(msg *logger.Message) error {
- data := map[string]string{
- "container_id": f.containerID,
- "container_name": f.containerName,
- "source": msg.Source,
- "log": string(msg.Line),
- }
- for k, v := range f.extra {
- data[k] = v
- }
- if msg.PLogMetaData != nil {
- data["partial_message"] = "true"
- data["partial_id"] = msg.PLogMetaData.ID
- data["partial_ordinal"] = strconv.Itoa(msg.PLogMetaData.Ordinal)
- data["partial_last"] = strconv.FormatBool(msg.PLogMetaData.Last)
- }
- ts := msg.Timestamp
- logger.PutMessage(msg)
- // fluent-logger-golang buffers logs from failures and disconnections,
- // and these are transferred again automatically.
- return f.writer.PostWithTime(f.tag, ts, data)
- }
- func (f *fluentd) Close() error {
- return f.writer.Close()
- }
- func (f *fluentd) Name() string {
- return name
- }
- // ValidateLogOpt looks for fluentd specific log option fluentd-address.
- func ValidateLogOpt(cfg map[string]string) error {
- for key := range cfg {
- switch key {
- case "env":
- case "env-regex":
- case "labels":
- case "labels-regex":
- case "tag":
- case addressKey:
- case asyncKey:
- case asyncConnectKey:
- case asyncReconnectIntervalKey:
- case bufferLimitKey:
- case maxRetriesKey:
- case requestAckKey:
- case retryWaitKey:
- case subSecondPrecisionKey:
- // Accepted
- default:
- return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
- }
- }
- _, err := parseConfig(cfg)
- return err
- }
- func parseConfig(cfg map[string]string) (fluent.Config, error) {
- var config fluent.Config
- loc, err := parseAddress(cfg[addressKey])
- if err != nil {
- return config, errors.Wrapf(err, "invalid fluentd-address (%s)", cfg[addressKey])
- }
- bufferLimit := defaultBufferLimit
- if cfg[bufferLimitKey] != "" {
- bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
- if err != nil {
- return config, err
- }
- bufferLimit = int(bl64)
- }
- retryWait := defaultRetryWait
- if cfg[retryWaitKey] != "" {
- rwd, err := time.ParseDuration(cfg[retryWaitKey])
- if err != nil {
- return config, err
- }
- retryWait = int(rwd.Seconds() * 1000)
- }
- maxRetries := defaultMaxRetries
- if cfg[maxRetriesKey] != "" {
- mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
- if err != nil {
- return config, err
- }
- maxRetries = int(mr64)
- }
- if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
- return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
- }
- async := false
- if cfg[asyncKey] != "" {
- if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
- return config, err
- }
- }
- // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
- asyncConnect := false
- if cfg[asyncConnectKey] != "" {
- if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
- return config, err
- }
- }
- asyncReconnectInterval := 0
- if cfg[asyncReconnectIntervalKey] != "" {
- interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
- if err != nil {
- return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
- }
- if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
- return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
- asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
- }
- asyncReconnectInterval = int(interval.Milliseconds())
- }
- subSecondPrecision := false
- if cfg[subSecondPrecisionKey] != "" {
- if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
- return config, err
- }
- }
- requestAck := false
- if cfg[requestAckKey] != "" {
- if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
- return config, err
- }
- }
- config = fluent.Config{
- FluentPort: loc.port,
- FluentHost: loc.host,
- FluentNetwork: loc.protocol,
- FluentSocketPath: loc.path,
- BufferLimit: bufferLimit,
- RetryWait: retryWait,
- MaxRetry: maxRetries,
- Async: async,
- AsyncConnect: asyncConnect,
- AsyncReconnectInterval: asyncReconnectInterval,
- SubSecondPrecision: subSecondPrecision,
- RequestAck: requestAck,
- ForceStopAsyncSend: async || asyncConnect,
- }
- return config, nil
- }
- func parseAddress(address string) (*location, error) {
- if address == "" {
- return &location{
- protocol: defaultProtocol,
- host: defaultHost,
- port: defaultPort,
- path: "",
- }, nil
- }
- if !strings.Contains(address, "://") {
- address = defaultProtocol + "://" + address
- }
- addr, err := url.Parse(address)
- if err != nil {
- return nil, err
- }
- switch addr.Scheme {
- case "unix":
- if strings.TrimLeft(addr.Path, "/") == "" {
- return nil, errors.New("path is empty")
- }
- return &location{protocol: addr.Scheme, path: addr.Path}, nil
- case "tcp", "tls":
- // continue processing below
- default:
- return nil, errors.Errorf("unsupported scheme: '%s'", addr.Scheme)
- }
- if addr.Path != "" {
- return nil, errors.New("should not contain a path element")
- }
- host := defaultHost
- port := defaultPort
- if h := addr.Hostname(); h != "" {
- host = h
- }
- if p := addr.Port(); p != "" {
- // Port numbers are 16 bit: https://www.ietf.org/rfc/rfc793.html#section-3.1
- portNum, err := strconv.ParseUint(p, 10, 16)
- if err != nil {
- return nil, errors.Wrap(err, "invalid port")
- }
- port = int(portNum)
- }
- return &location{
- protocol: addr.Scheme,
- host: host,
- port: port,
- path: "",
- }, nil
- }
|