diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index cfa0ce5dc2..829b606c9f 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -3,7 +3,6 @@ package fluentd // import "github.com/docker/docker/daemon/logger/fluentd" import ( - "fmt" "math" "net" "net/url" @@ -13,6 +12,7 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" + "github.com/docker/docker/errdefs" "github.com/docker/docker/pkg/urlutil" units "github.com/docker/go-units" "github.com/fluent/fluent-logger-golang/fluent" @@ -38,21 +38,23 @@ type location struct { const ( name = "fluentd" - defaultProtocol = "tcp" + defaultBufferLimit = 1024 * 1024 defaultHost = "127.0.0.1" defaultPort = 24224 - defaultBufferLimit = 1024 * 1024 + defaultProtocol = "tcp" // logger tries to reconnect 2**32 - 1 times // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds] - defaultRetryWait = 1000 defaultMaxRetries = math.MaxInt32 + defaultRetryWait = 1000 addressKey = "fluentd-address" + asyncKey = "fluentd-async" + asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead) bufferLimitKey = "fluentd-buffer-limit" - retryWaitKey = "fluentd-retry-wait" maxRetriesKey = "fluentd-max-retries" - asyncConnectKey = "fluentd-async-connect" + requestAckKey = "fluentd-request-ack" + retryWaitKey = "fluentd-retry-wait" subSecondPrecisionKey = "fluentd-sub-second-precision" ) @@ -69,72 +71,19 @@ func init() { // the context. The supported context configuration variable is // fluentd-address. func New(info logger.Info) (logger.Logger, error) { - loc, err := parseAddress(info.Config[addressKey]) + fluentConfig, err := parseConfig(info.Config) if err != nil { - return nil, err + return nil, errdefs.InvalidParameter(err) } tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate) if err != nil { - return nil, err + return nil, errdefs.InvalidParameter(err) } extra, err := info.ExtraAttributes(nil) if err != nil { - return nil, err - } - - bufferLimit := defaultBufferLimit - if info.Config[bufferLimitKey] != "" { - bl64, err := units.RAMInBytes(info.Config[bufferLimitKey]) - if err != nil { - return nil, err - } - bufferLimit = int(bl64) - } - - retryWait := defaultRetryWait - if info.Config[retryWaitKey] != "" { - rwd, err := time.ParseDuration(info.Config[retryWaitKey]) - if err != nil { - return nil, err - } - retryWait = int(rwd.Seconds() * 1000) - } - - maxRetries := defaultMaxRetries - if info.Config[maxRetriesKey] != "" { - mr64, err := strconv.ParseUint(info.Config[maxRetriesKey], 10, strconv.IntSize) - if err != nil { - return nil, err - } - maxRetries = int(mr64) - } - - asyncConnect := false - if info.Config[asyncConnectKey] != "" { - if asyncConnect, err = strconv.ParseBool(info.Config[asyncConnectKey]); err != nil { - return nil, err - } - } - - subSecondPrecision := false - if info.Config[subSecondPrecisionKey] != "" { - if subSecondPrecision, err = strconv.ParseBool(info.Config[subSecondPrecisionKey]); err != nil { - return nil, err - } - } - - fluentConfig := fluent.Config{ - FluentPort: loc.port, - FluentHost: loc.host, - FluentNetwork: loc.protocol, - FluentSocketPath: loc.path, - BufferLimit: bufferLimit, - RetryWait: retryWait, - MaxRetry: maxRetries, - Async: asyncConnect, - SubSecondPrecision: subSecondPrecision, + return nil, errdefs.InvalidParameter(err) } logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig). @@ -194,22 +143,110 @@ func ValidateLogOpt(cfg map[string]string) error { case "labels": case "labels-regex": case "tag": + case addressKey: - case bufferLimitKey: - case retryWaitKey: - case maxRetriesKey: + case asyncKey: case asyncConnectKey: + case bufferLimitKey: + case maxRetriesKey: + case requestAckKey: + case retryWaitKey: case subSecondPrecisionKey: // Accepted default: - return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key) + return errors.Errorf("unknown log opt '%s' for fluentd log driver", key) } } - _, err := parseAddress(cfg[addressKey]) + _, err := parseConfig(cfg) return err } +func parseConfig(cfg map[string]string) (fluent.Config, error) { + var config fluent.Config + + loc, err := parseAddress(cfg[addressKey]) + if err != nil { + return config, err + } + + bufferLimit := defaultBufferLimit + if cfg[bufferLimitKey] != "" { + bl64, err := units.RAMInBytes(cfg[bufferLimitKey]) + if err != nil { + return config, err + } + bufferLimit = int(bl64) + } + + retryWait := defaultRetryWait + if cfg[retryWaitKey] != "" { + rwd, err := time.ParseDuration(cfg[retryWaitKey]) + if err != nil { + return config, err + } + retryWait = int(rwd.Seconds() * 1000) + } + + maxRetries := defaultMaxRetries + if cfg[maxRetriesKey] != "" { + mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize) + if err != nil { + return config, err + } + maxRetries = int(mr64) + } + + if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" { + return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey) + } + + async := false + if cfg[asyncKey] != "" { + if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil { + return config, err + } + } + + // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases + asyncConnect := false + if cfg[asyncConnectKey] != "" { + if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil { + return config, err + } + } + + subSecondPrecision := false + if cfg[subSecondPrecisionKey] != "" { + if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil { + return config, err + } + } + + requestAck := false + if cfg[requestAckKey] != "" { + if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil { + return config, err + } + } + + config = fluent.Config{ + FluentPort: loc.port, + FluentHost: loc.host, + FluentNetwork: loc.protocol, + FluentSocketPath: loc.path, + BufferLimit: bufferLimit, + RetryWait: retryWait, + MaxRetry: maxRetries, + Async: async, + AsyncConnect: asyncConnect, + SubSecondPrecision: subSecondPrecision, + RequestAck: requestAck, + } + + return config, nil +} + func parseAddress(address string) (*location, error) { if address == "" { return &location{ diff --git a/docs/api/version-history.md b/docs/api/version-history.md index a7ad67e3e1..8d583ee99a 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -85,6 +85,17 @@ keywords: "API, Docker, rcli, REST, documentation" on the node.label. The format of the label filter is `node.label=`/`node.label==` to return those with the specified labels, or `node.label!=`/`node.label!==` to return those without the specified labels. +* `POST /containers/create` now accepts a `fluentd-async` option in `HostConfig.LogConfig.Config` + when using the Fluentd logging driver. This option deprecates the `fluentd-async-connect` + option, which remains funtional, but will be removed in a future release. Users + are encouraged to use the `fluentd-async` option going forward. This change is + not versioned, and affects all API versions if the daemon has this patch. +* `POST /containers/create` now accepts a `fluentd-request-ack` option in + `HostConfig.LogConfig.Config` when using the Fluentd logging driver. If enabled, + the Fluentd logging driver sends the chunk option with a unique ID. The server + will respond with an acknowledgement. This option improves the reliability of + the message transmission. This change is not versioned, and affects all API + versions if the daemon has this patch. * `POST /containers/create`, `GET /containers/{id}/json`, and `GET /containers/json` now supports `BindOptions.NonRecursive`. * `POST /swarm/init` now accepts a `DataPathPort` property to set data path port number.