Browse Source

Merge pull request #43100 from conorevans/conorevans/update-fluent

vendor: github.com/fluent/fluent-logger-golang v1.9.0
Brian Goff 3 years ago
parent
commit
520dfc36f9

+ 39 - 20
daemon/logger/fluentd/fluentd.go

@@ -48,14 +48,18 @@ const (
 	defaultMaxRetries = math.MaxInt32
 	defaultRetryWait  = 1000
 
-	addressKey            = "fluentd-address"
-	asyncKey              = "fluentd-async"
-	asyncConnectKey       = "fluentd-async-connect" // deprecated option (use fluent-async instead)
-	bufferLimitKey        = "fluentd-buffer-limit"
-	maxRetriesKey         = "fluentd-max-retries"
-	requestAckKey         = "fluentd-request-ack"
-	retryWaitKey          = "fluentd-retry-wait"
-	subSecondPrecisionKey = "fluentd-sub-second-precision"
+	minReconnectInterval = 100 * time.Millisecond
+	maxReconnectInterval = 10 * time.Second
+
+	addressKey                = "fluentd-address"
+	asyncKey                  = "fluentd-async"
+	asyncConnectKey           = "fluentd-async-connect" // deprecated option (use fluent-async instead)
+	asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
+	bufferLimitKey            = "fluentd-buffer-limit"
+	maxRetriesKey             = "fluentd-max-retries"
+	requestAckKey             = "fluentd-request-ack"
+	retryWaitKey              = "fluentd-retry-wait"
+	subSecondPrecisionKey     = "fluentd-sub-second-precision"
 )
 
 func init() {
@@ -147,6 +151,7 @@ func ValidateLogOpt(cfg map[string]string) error {
 		case addressKey:
 		case asyncKey:
 		case asyncConnectKey:
+		case asyncReconnectIntervalKey:
 		case bufferLimitKey:
 		case maxRetriesKey:
 		case requestAckKey:
@@ -216,6 +221,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
 		}
 	}
 
+	asyncReconnectInterval := 0
+	if cfg[asyncReconnectIntervalKey] != "" {
+		interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
+		if err != nil {
+			return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
+		}
+		if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
+			return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
+				asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
+		}
+		asyncReconnectInterval = int(interval.Milliseconds())
+	}
+
 	subSecondPrecision := false
 	if cfg[subSecondPrecisionKey] != "" {
 		if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
@@ -231,18 +249,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
 	}
 
 	config = fluent.Config{
-		FluentPort:         loc.port,
-		FluentHost:         loc.host,
-		FluentNetwork:      loc.protocol,
-		FluentSocketPath:   loc.path,
-		BufferLimit:        bufferLimit,
-		RetryWait:          retryWait,
-		MaxRetry:           maxRetries,
-		Async:              async,
-		AsyncConnect:       asyncConnect,
-		SubSecondPrecision: subSecondPrecision,
-		RequestAck:         requestAck,
-		ForceStopAsyncSend: async || asyncConnect,
+		FluentPort:             loc.port,
+		FluentHost:             loc.host,
+		FluentNetwork:          loc.protocol,
+		FluentSocketPath:       loc.path,
+		BufferLimit:            bufferLimit,
+		RetryWait:              retryWait,
+		MaxRetry:               maxRetries,
+		Async:                  async,
+		AsyncConnect:           asyncConnect,
+		AsyncReconnectInterval: asyncReconnectInterval,
+		SubSecondPrecision:     subSecondPrecision,
+		RequestAck:             requestAck,
+		ForceStopAsyncSend:     async || asyncConnect,
 	}
 
 	return config, nil

+ 24 - 0
daemon/logger/fluentd/fluentd_test.go

@@ -0,0 +1,24 @@
+package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
+import (
+	"testing"
+
+	"gotest.tools/v3/assert"
+)
+
+func TestValidateLogOptReconnectInterval(t *testing.T) {
+	invalidIntervals := []string{"-1", "1", "-1s", "99ms", "11s"}
+	for _, v := range invalidIntervals {
+		t.Run("invalid "+v, func(t *testing.T) {
+			err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
+			assert.ErrorContains(t, err, "invalid value for fluentd-async-reconnect-interval:")
+		})
+	}
+
+	validIntervals := []string{"100ms", "10s"}
+	for _, v := range validIntervals {
+		t.Run("valid "+v, func(t *testing.T) {
+			err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
+			assert.NilError(t, err)
+		})
+	}
+}

+ 1 - 1
vendor.conf

@@ -103,7 +103,7 @@ github.com/godbus/dbus/v5                           c88335c0b1d28a30e7fc76d526a0
 github.com/Graylog2/go-gelf                         1550ee647df0510058c9d67a45c56f18911d80b8 # v2 branch
 
 # fluent-logger-golang deps
-github.com/fluent/fluent-logger-golang              0b652e850a9140d0b1db6390d8925d0601e952db # v1.8.0
+github.com/fluent/fluent-logger-golang              5538e904aeb515c10a624da620581bdf420d4b8a # v1.9.0
 github.com/philhofer/fwd                            bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0
 github.com/tinylib/msgp                             af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0
 

+ 5 - 0
vendor/github.com/fluent/fluent-logger-golang/README.md

@@ -132,6 +132,11 @@ When Async is enabled, if this is callback is provided, it will be called on eve
 takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the 
 delivery of the message was unsuccessful.
 
+### AsyncReconnectInterval
+When async is enabled, this option defines the interval (ms) at which the connection
+to the fluentd-address is re-established. This option is useful if the address
+may resolve to one or more IP addresses, e.g. a Consul service address.
+
 ### SubSecondPrecision
 
 Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.

+ 21 - 0
vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go

@@ -65,6 +65,11 @@ type Config struct {
 	AsyncConnect  bool `json:"async_connect"`
 	MarshalAsJSON bool `json:"marshal_as_json"`
 
+	// AsyncReconnectInterval defines the interval (ms) at which the connection
+	// to the fluentd-address is re-established. This option is useful if the address
+	// may resolve to one or more IP addresses, e.g. a Consul service address.
+	AsyncReconnectInterval int `json:"async_reconnect_interval"`
+
 	// Sub-second precision timestamps are only possible for those using fluentd
 	// v0.14+ and serializing their messages with msgpack.
 	SubSecondPrecision bool `json:"sub_second_precision"`
@@ -108,6 +113,9 @@ type Fluent struct {
 	closed         bool
 	wg             sync.WaitGroup
 
+	// time at which the most recent connection to fluentd-address was established.
+	latestReconnectTime time.Time
+
 	muconn sync.RWMutex
 	conn   net.Conn
 }
@@ -447,6 +455,10 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
 		err = NewErrUnknownNetwork(f.Config.FluentNetwork)
 	}
 
+	if err == nil {
+		f.latestReconnectTime = time.Now()
+	}
+
 	return err
 }
 
@@ -508,6 +520,15 @@ func (f *Fluent) run(ctx context.Context) {
 				return
 			}
 
+			if f.AsyncReconnectInterval > 0 {
+				if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond {
+					f.muconn.Lock()
+					f.close()
+					f.connectWithRetry(ctx)
+					f.muconn.Unlock()
+				}
+			}
+
 			err := f.writeWithRetry(ctx, entry)
 			if err != nil && err != errIsClosing {
 				fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))

+ 1 - 1
vendor/github.com/fluent/fluent-logger-golang/fluent/version.go

@@ -1,3 +1,3 @@
 package fluent
 
-const Version = "1.4.0"
+const Version = "1.9.0"