|
@@ -3,7 +3,6 @@
|
|
|
package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
|
|
|
|
|
|
import (
|
|
|
- "fmt"
|
|
|
"math"
|
|
|
"net"
|
|
|
"net/url"
|
|
@@ -13,6 +12,7 @@ import (
|
|
|
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
|
+ "github.com/docker/docker/errdefs"
|
|
|
"github.com/docker/docker/pkg/urlutil"
|
|
|
units "github.com/docker/go-units"
|
|
|
"github.com/fluent/fluent-logger-golang/fluent"
|
|
@@ -38,21 +38,23 @@ type location struct {
|
|
|
const (
|
|
|
name = "fluentd"
|
|
|
|
|
|
- defaultProtocol = "tcp"
|
|
|
+ defaultBufferLimit = 1024 * 1024
|
|
|
defaultHost = "127.0.0.1"
|
|
|
defaultPort = 24224
|
|
|
- defaultBufferLimit = 1024 * 1024
|
|
|
+ defaultProtocol = "tcp"
|
|
|
|
|
|
// logger tries to reconnect 2**32 - 1 times
|
|
|
// failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
|
|
|
- defaultRetryWait = 1000
|
|
|
defaultMaxRetries = math.MaxInt32
|
|
|
+ defaultRetryWait = 1000
|
|
|
|
|
|
addressKey = "fluentd-address"
|
|
|
+ asyncKey = "fluentd-async"
|
|
|
+ asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
|
|
|
bufferLimitKey = "fluentd-buffer-limit"
|
|
|
- retryWaitKey = "fluentd-retry-wait"
|
|
|
maxRetriesKey = "fluentd-max-retries"
|
|
|
- asyncConnectKey = "fluentd-async-connect"
|
|
|
+ requestAckKey = "fluentd-request-ack"
|
|
|
+ retryWaitKey = "fluentd-retry-wait"
|
|
|
subSecondPrecisionKey = "fluentd-sub-second-precision"
|
|
|
)
|
|
|
|
|
@@ -69,72 +71,19 @@ func init() {
|
|
|
// the context. The supported context configuration variable is
|
|
|
// fluentd-address.
|
|
|
func New(info logger.Info) (logger.Logger, error) {
|
|
|
- loc, err := parseAddress(info.Config[addressKey])
|
|
|
+ fluentConfig, err := parseConfig(info.Config)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, errdefs.InvalidParameter(err)
|
|
|
}
|
|
|
|
|
|
tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, errdefs.InvalidParameter(err)
|
|
|
}
|
|
|
|
|
|
extra, err := info.ExtraAttributes(nil)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- bufferLimit := defaultBufferLimit
|
|
|
- if info.Config[bufferLimitKey] != "" {
|
|
|
- bl64, err := units.RAMInBytes(info.Config[bufferLimitKey])
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- bufferLimit = int(bl64)
|
|
|
- }
|
|
|
-
|
|
|
- retryWait := defaultRetryWait
|
|
|
- if info.Config[retryWaitKey] != "" {
|
|
|
- rwd, err := time.ParseDuration(info.Config[retryWaitKey])
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- retryWait = int(rwd.Seconds() * 1000)
|
|
|
- }
|
|
|
-
|
|
|
- maxRetries := defaultMaxRetries
|
|
|
- if info.Config[maxRetriesKey] != "" {
|
|
|
- mr64, err := strconv.ParseUint(info.Config[maxRetriesKey], 10, strconv.IntSize)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- maxRetries = int(mr64)
|
|
|
- }
|
|
|
-
|
|
|
- asyncConnect := false
|
|
|
- if info.Config[asyncConnectKey] != "" {
|
|
|
- if asyncConnect, err = strconv.ParseBool(info.Config[asyncConnectKey]); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- subSecondPrecision := false
|
|
|
- if info.Config[subSecondPrecisionKey] != "" {
|
|
|
- if subSecondPrecision, err = strconv.ParseBool(info.Config[subSecondPrecisionKey]); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fluentConfig := fluent.Config{
|
|
|
- FluentPort: loc.port,
|
|
|
- FluentHost: loc.host,
|
|
|
- FluentNetwork: loc.protocol,
|
|
|
- FluentSocketPath: loc.path,
|
|
|
- BufferLimit: bufferLimit,
|
|
|
- RetryWait: retryWait,
|
|
|
- MaxRetry: maxRetries,
|
|
|
- Async: asyncConnect,
|
|
|
- SubSecondPrecision: subSecondPrecision,
|
|
|
+ return nil, errdefs.InvalidParameter(err)
|
|
|
}
|
|
|
|
|
|
logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
|
|
@@ -194,22 +143,110 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|
|
case "labels":
|
|
|
case "labels-regex":
|
|
|
case "tag":
|
|
|
+
|
|
|
case addressKey:
|
|
|
+ case asyncKey:
|
|
|
+ case asyncConnectKey:
|
|
|
case bufferLimitKey:
|
|
|
- case retryWaitKey:
|
|
|
case maxRetriesKey:
|
|
|
- case asyncConnectKey:
|
|
|
+ case requestAckKey:
|
|
|
+ case retryWaitKey:
|
|
|
case subSecondPrecisionKey:
|
|
|
// Accepted
|
|
|
default:
|
|
|
- return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
|
|
|
+ return errors.Errorf("unknown log opt '%s' for fluentd log driver", key)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- _, err := parseAddress(cfg[addressKey])
|
|
|
+ _, err := parseConfig(cfg)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+func parseConfig(cfg map[string]string) (fluent.Config, error) {
|
|
|
+ var config fluent.Config
|
|
|
+
|
|
|
+ loc, err := parseAddress(cfg[addressKey])
|
|
|
+ if err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+
|
|
|
+ bufferLimit := defaultBufferLimit
|
|
|
+ if cfg[bufferLimitKey] != "" {
|
|
|
+ bl64, err := units.RAMInBytes(cfg[bufferLimitKey])
|
|
|
+ if err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ bufferLimit = int(bl64)
|
|
|
+ }
|
|
|
+
|
|
|
+ retryWait := defaultRetryWait
|
|
|
+ if cfg[retryWaitKey] != "" {
|
|
|
+ rwd, err := time.ParseDuration(cfg[retryWaitKey])
|
|
|
+ if err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ retryWait = int(rwd.Seconds() * 1000)
|
|
|
+ }
|
|
|
+
|
|
|
+ maxRetries := defaultMaxRetries
|
|
|
+ if cfg[maxRetriesKey] != "" {
|
|
|
+ mr64, err := strconv.ParseUint(cfg[maxRetriesKey], 10, strconv.IntSize)
|
|
|
+ if err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ maxRetries = int(mr64)
|
|
|
+ }
|
|
|
+
|
|
|
+ if cfg[asyncKey] != "" && cfg[asyncConnectKey] != "" {
|
|
|
+ return config, errors.Errorf("conflicting options: cannot specify both '%s' and '%s", asyncKey, asyncConnectKey)
|
|
|
+ }
|
|
|
+
|
|
|
+ async := false
|
|
|
+ if cfg[asyncKey] != "" {
|
|
|
+ if async, err = strconv.ParseBool(cfg[asyncKey]); err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO fluentd-async-connect is deprecated in driver v1.4.0. Remove after two stable releases
|
|
|
+ asyncConnect := false
|
|
|
+ if cfg[asyncConnectKey] != "" {
|
|
|
+ if asyncConnect, err = strconv.ParseBool(cfg[asyncConnectKey]); err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ subSecondPrecision := false
|
|
|
+ if cfg[subSecondPrecisionKey] != "" {
|
|
|
+ if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ requestAck := false
|
|
|
+ if cfg[requestAckKey] != "" {
|
|
|
+ if requestAck, err = strconv.ParseBool(cfg[requestAckKey]); err != nil {
|
|
|
+ return config, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ config = fluent.Config{
|
|
|
+ FluentPort: loc.port,
|
|
|
+ FluentHost: loc.host,
|
|
|
+ FluentNetwork: loc.protocol,
|
|
|
+ FluentSocketPath: loc.path,
|
|
|
+ BufferLimit: bufferLimit,
|
|
|
+ RetryWait: retryWait,
|
|
|
+ MaxRetry: maxRetries,
|
|
|
+ Async: async,
|
|
|
+ AsyncConnect: asyncConnect,
|
|
|
+ SubSecondPrecision: subSecondPrecision,
|
|
|
+ RequestAck: requestAck,
|
|
|
+ }
|
|
|
+
|
|
|
+ return config, nil
|
|
|
+}
|
|
|
+
|
|
|
func parseAddress(address string) (*location, error) {
|
|
|
if address == "" {
|
|
|
return &location{
|