|
@@ -65,6 +65,11 @@ type Config struct {
|
|
AsyncConnect bool `json:"async_connect"`
|
|
AsyncConnect bool `json:"async_connect"`
|
|
MarshalAsJSON bool `json:"marshal_as_json"`
|
|
MarshalAsJSON bool `json:"marshal_as_json"`
|
|
|
|
|
|
|
|
+ // AsyncReconnectInterval defines the interval (ms) at which the connection
|
|
|
|
+ // to the fluentd-address is re-established. This option is useful if the address
|
|
|
|
+ // may resolve to one or more IP addresses, e.g. a Consul service address.
|
|
|
|
+ AsyncReconnectInterval int `json:"async_reconnect_interval"`
|
|
|
|
+
|
|
// Sub-second precision timestamps are only possible for those using fluentd
|
|
// Sub-second precision timestamps are only possible for those using fluentd
|
|
// v0.14+ and serializing their messages with msgpack.
|
|
// v0.14+ and serializing their messages with msgpack.
|
|
SubSecondPrecision bool `json:"sub_second_precision"`
|
|
SubSecondPrecision bool `json:"sub_second_precision"`
|
|
@@ -108,6 +113,9 @@ type Fluent struct {
|
|
closed bool
|
|
closed bool
|
|
wg sync.WaitGroup
|
|
wg sync.WaitGroup
|
|
|
|
|
|
|
|
+ // time at which the most recent connection to fluentd-address was established.
|
|
|
|
+ latestReconnectTime time.Time
|
|
|
|
+
|
|
muconn sync.RWMutex
|
|
muconn sync.RWMutex
|
|
conn net.Conn
|
|
conn net.Conn
|
|
}
|
|
}
|
|
@@ -447,6 +455,10 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
|
|
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
|
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if err == nil {
|
|
|
|
+ f.latestReconnectTime = time.Now()
|
|
|
|
+ }
|
|
|
|
+
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
@@ -508,6 +520,15 @@ func (f *Fluent) run(ctx context.Context) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if f.AsyncReconnectInterval > 0 {
|
|
|
|
+ if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond {
|
|
|
|
+ f.muconn.Lock()
|
|
|
|
+ f.close()
|
|
|
|
+ f.connectWithRetry(ctx)
|
|
|
|
+ f.muconn.Unlock()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
err := f.writeWithRetry(ctx, entry)
|
|
err := f.writeWithRetry(ctx, entry)
|
|
if err != nil && err != errIsClosing {
|
|
if err != nil && err != errIsClosing {
|
|
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|
|
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|