|
@@ -8,10 +8,12 @@ import (
|
|
|
"net"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
|
+ "github.com/docker/go-units"
|
|
|
"github.com/fluent/fluent-logger-golang/fluent"
|
|
|
)
|
|
|
|
|
@@ -24,11 +26,25 @@ type fluentd struct {
|
|
|
}
|
|
|
|
|
|
const (
|
|
|
- name = "fluentd"
|
|
|
- defaultHostName = "localhost"
|
|
|
+ name = "fluentd"
|
|
|
+
|
|
|
+ defaultHost = "127.0.0.1"
|
|
|
defaultPort = 24224
|
|
|
+ defaultBufferLimit = 1024 * 1024
|
|
|
defaultTagPrefix = "docker"
|
|
|
- defaultBufferLimit = 1 * 1024 * 1024 // 1M buffer by default
|
|
|
+
|
|
|
+ // logger tries to reconnect 2**32 - 1 times
|
|
|
+ // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
|
|
|
+ defaultRetryWait = 1000
|
|
|
+ defaultTimeout = 3 * time.Second
|
|
|
+ defaultMaxRetries = math.MaxInt32
|
|
|
+ defaultReconnectWaitIncreRate = 1.5
|
|
|
+
|
|
|
+ addressKey = "fluentd-address"
|
|
|
+ bufferLimitKey = "fluentd-buffer-limit"
|
|
|
+ retryWaitKey = "fluentd-retry-wait"
|
|
|
+ maxRetriesKey = "fluentd-max-retries"
|
|
|
+ asyncConnectKey = "fluentd-async-connect"
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
@@ -44,7 +60,7 @@ func init() {
|
|
|
// the context. Supported context configuration variables are
|
|
|
// fluentd-address & fluentd-tag.
|
|
|
func New(ctx logger.Context) (logger.Logger, error) {
|
|
|
- host, port, err := parseAddress(ctx.Config["fluentd-address"])
|
|
|
+ host, port, err := parseAddress(ctx.Config[addressKey])
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -53,11 +69,56 @@ func New(ctx logger.Context) (logger.Logger, error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
extra := ctx.ExtraAttributes(nil)
|
|
|
- logrus.Debugf("logging driver fluentd configured for container:%s, host:%s, port:%d, tag:%s, extra:%v.", ctx.ContainerID, host, port, tag, extra)
|
|
|
- // logger tries to reconnect 2**32 - 1 times
|
|
|
- // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
|
|
|
- log, err := fluent.New(fluent.Config{FluentPort: port, FluentHost: host, RetryWait: 1000, MaxRetry: math.MaxInt32})
|
|
|
+
|
|
|
+ bufferLimit := defaultBufferLimit
|
|
|
+ if ctx.Config[bufferLimitKey] != "" {
|
|
|
+ bl64, err := units.RAMInBytes(ctx.Config[bufferLimitKey])
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ bufferLimit = int(bl64)
|
|
|
+ }
|
|
|
+
|
|
|
+ retryWait := defaultRetryWait
|
|
|
+ if ctx.Config[retryWaitKey] != "" {
|
|
|
+ rwd, err := time.ParseDuration(ctx.Config[retryWaitKey])
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ retryWait = int(rwd.Seconds() * 1000)
|
|
|
+ }
|
|
|
+
|
|
|
+ maxRetries := defaultMaxRetries
|
|
|
+ if ctx.Config[maxRetriesKey] != "" {
|
|
|
+ mr64, err := strconv.ParseUint(ctx.Config[maxRetriesKey], 10, strconv.IntSize)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ maxRetries = int(mr64)
|
|
|
+ }
|
|
|
+
|
|
|
+ asyncConnect := false
|
|
|
+ if ctx.Config[asyncConnectKey] != "" {
|
|
|
+ if asyncConnect, err = strconv.ParseBool(ctx.Config[asyncConnectKey]); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fluentConfig := fluent.Config{
|
|
|
+ FluentPort: port,
|
|
|
+ FluentHost: host,
|
|
|
+ BufferLimit: bufferLimit,
|
|
|
+ RetryWait: retryWait,
|
|
|
+ MaxRetry: maxRetries,
|
|
|
+ AsyncConnect: asyncConnect,
|
|
|
+ }
|
|
|
+
|
|
|
+ logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
|
|
|
+ Debug("logging driver fluentd configured")
|
|
|
+
|
|
|
+ log, err := fluent.New(fluentConfig)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
@@ -97,11 +158,16 @@ func (f *fluentd) Name() string {
|
|
|
func ValidateLogOpt(cfg map[string]string) error {
|
|
|
for key := range cfg {
|
|
|
switch key {
|
|
|
- case "fluentd-address":
|
|
|
+ case "env":
|
|
|
case "fluentd-tag":
|
|
|
- case "tag":
|
|
|
case "labels":
|
|
|
- case "env":
|
|
|
+ case "tag":
|
|
|
+ case addressKey:
|
|
|
+ case bufferLimitKey:
|
|
|
+ case retryWaitKey:
|
|
|
+ case maxRetriesKey:
|
|
|
+ case asyncConnectKey:
|
|
|
+ // Accepted
|
|
|
default:
|
|
|
return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
|
|
|
}
|
|
@@ -116,7 +182,7 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|
|
|
|
|
func parseAddress(address string) (string, int, error) {
|
|
|
if address == "" {
|
|
|
- return defaultHostName, defaultPort, nil
|
|
|
+ return defaultHost, defaultPort, nil
|
|
|
}
|
|
|
|
|
|
host, port, err := net.SplitHostPort(address)
|