|
@@ -1,10 +1,13 @@
|
|
package fluent
|
|
package fluent
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
+ "context"
|
|
|
|
+ "crypto/tls"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
"math"
|
|
"math"
|
|
|
|
+ "math/rand"
|
|
"net"
|
|
"net"
|
|
"os"
|
|
"os"
|
|
"reflect"
|
|
"reflect"
|
|
@@ -15,7 +18,6 @@ import (
|
|
"bytes"
|
|
"bytes"
|
|
"encoding/base64"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
- "math/rand"
|
|
|
|
|
|
|
|
"github.com/tinylib/msgp/msgp"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
)
|
|
@@ -35,22 +37,30 @@ const (
|
|
// Default sub-second precision value to false since it is only compatible
|
|
// Default sub-second precision value to false since it is only compatible
|
|
// with fluentd versions v0.14 and above.
|
|
// with fluentd versions v0.14 and above.
|
|
defaultSubSecondPrecision = false
|
|
defaultSubSecondPrecision = false
|
|
|
|
+
|
|
|
|
+ // Default value whether to skip checking insecure certs on TLS connections.
|
|
|
|
+ defaultTlsInsecureSkipVerify = false
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
|
|
|
|
+// during tests with a deterministic function.
|
|
|
|
+var randomGenerator = rand.Uint64
|
|
|
|
+
|
|
type Config struct {
|
|
type Config struct {
|
|
- FluentPort int `json:"fluent_port"`
|
|
|
|
- FluentHost string `json:"fluent_host"`
|
|
|
|
- FluentNetwork string `json:"fluent_network"`
|
|
|
|
- FluentSocketPath string `json:"fluent_socket_path"`
|
|
|
|
- Timeout time.Duration `json:"timeout"`
|
|
|
|
- WriteTimeout time.Duration `json:"write_timeout"`
|
|
|
|
- BufferLimit int `json:"buffer_limit"`
|
|
|
|
- RetryWait int `json:"retry_wait"`
|
|
|
|
- MaxRetry int `json:"max_retry"`
|
|
|
|
- MaxRetryWait int `json:"max_retry_wait"`
|
|
|
|
- TagPrefix string `json:"tag_prefix"`
|
|
|
|
- Async bool `json:"async"`
|
|
|
|
- ForceStopAsyncSend bool `json:"force_stop_async_send"`
|
|
|
|
|
|
+ FluentPort int `json:"fluent_port"`
|
|
|
|
+ FluentHost string `json:"fluent_host"`
|
|
|
|
+ FluentNetwork string `json:"fluent_network"`
|
|
|
|
+ FluentSocketPath string `json:"fluent_socket_path"`
|
|
|
|
+ Timeout time.Duration `json:"timeout"`
|
|
|
|
+ WriteTimeout time.Duration `json:"write_timeout"`
|
|
|
|
+ BufferLimit int `json:"buffer_limit"`
|
|
|
|
+ RetryWait int `json:"retry_wait"`
|
|
|
|
+ MaxRetry int `json:"max_retry"`
|
|
|
|
+ MaxRetryWait int `json:"max_retry_wait"`
|
|
|
|
+ TagPrefix string `json:"tag_prefix"`
|
|
|
|
+ Async bool `json:"async"`
|
|
|
|
+ ForceStopAsyncSend bool `json:"force_stop_async_send"`
|
|
|
|
+ AsyncResultCallback func(data []byte, err error)
|
|
// Deprecated: Use Async instead
|
|
// Deprecated: Use Async instead
|
|
AsyncConnect bool `json:"async_connect"`
|
|
AsyncConnect bool `json:"async_connect"`
|
|
MarshalAsJSON bool `json:"marshal_as_json"`
|
|
MarshalAsJSON bool `json:"marshal_as_json"`
|
|
@@ -63,6 +73,9 @@ type Config struct {
|
|
// respond with an acknowledgement. This option improves the reliability
|
|
// respond with an acknowledgement. This option improves the reliability
|
|
// of the message transmission.
|
|
// of the message transmission.
|
|
RequestAck bool `json:"request_ack"`
|
|
RequestAck bool `json:"request_ack"`
|
|
|
|
+
|
|
|
|
+ // Flag to skip verifying insecure certs on TLS connections
|
|
|
|
+ TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
|
|
}
|
|
}
|
|
|
|
|
|
type ErrUnknownNetwork struct {
|
|
type ErrUnknownNetwork struct {
|
|
@@ -85,17 +98,24 @@ type msgToSend struct {
|
|
type Fluent struct {
|
|
type Fluent struct {
|
|
Config
|
|
Config
|
|
|
|
|
|
- dialer dialer
|
|
|
|
- stopRunning chan bool
|
|
|
|
- pending chan *msgToSend
|
|
|
|
- pendingMutex sync.RWMutex
|
|
|
|
- chanClosed bool
|
|
|
|
- wg sync.WaitGroup
|
|
|
|
-
|
|
|
|
- muconn sync.Mutex
|
|
|
|
|
|
+ dialer dialer
|
|
|
|
+ // stopRunning is used in async mode to signal to run() it should abort.
|
|
|
|
+ stopRunning chan struct{}
|
|
|
|
+ // cancelDialings is used by Close() to stop any in-progress dialing.
|
|
|
|
+ cancelDialings context.CancelFunc
|
|
|
|
+ pending chan *msgToSend
|
|
|
|
+ pendingMutex sync.RWMutex
|
|
|
|
+ closed bool
|
|
|
|
+ wg sync.WaitGroup
|
|
|
|
+
|
|
|
|
+ muconn sync.RWMutex
|
|
conn net.Conn
|
|
conn net.Conn
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+type dialer interface {
|
|
|
|
+ DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
|
|
|
+}
|
|
|
|
+
|
|
// New creates a new Logger.
|
|
// New creates a new Logger.
|
|
func New(config Config) (*Fluent, error) {
|
|
func New(config Config) (*Fluent, error) {
|
|
if config.Timeout == 0 {
|
|
if config.Timeout == 0 {
|
|
@@ -106,10 +126,6 @@ func New(config Config) (*Fluent, error) {
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
-type dialer interface {
|
|
|
|
- Dial(string, string) (net.Conn, error)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
|
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
|
if config.FluentNetwork == "" {
|
|
if config.FluentNetwork == "" {
|
|
config.FluentNetwork = defaultNetwork
|
|
config.FluentNetwork = defaultNetwork
|
|
@@ -138,27 +154,36 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
|
if config.MaxRetryWait == 0 {
|
|
if config.MaxRetryWait == 0 {
|
|
config.MaxRetryWait = defaultMaxRetryWait
|
|
config.MaxRetryWait = defaultMaxRetryWait
|
|
}
|
|
}
|
|
|
|
+ if !config.TlsInsecureSkipVerify {
|
|
|
|
+ config.TlsInsecureSkipVerify = defaultTlsInsecureSkipVerify
|
|
|
|
+ }
|
|
if config.AsyncConnect {
|
|
if config.AsyncConnect {
|
|
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
|
|
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
|
|
config.Async = config.Async || config.AsyncConnect
|
|
config.Async = config.Async || config.AsyncConnect
|
|
}
|
|
}
|
|
|
|
|
|
if config.Async {
|
|
if config.Async {
|
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
+
|
|
f = &Fluent{
|
|
f = &Fluent{
|
|
- Config: config,
|
|
|
|
- dialer: d,
|
|
|
|
- pending: make(chan *msgToSend, config.BufferLimit),
|
|
|
|
- pendingMutex: sync.RWMutex{},
|
|
|
|
- stopRunning: make(chan bool, 1),
|
|
|
|
|
|
+ Config: config,
|
|
|
|
+ dialer: d,
|
|
|
|
+ stopRunning: make(chan struct{}),
|
|
|
|
+ cancelDialings: cancel,
|
|
|
|
+ pending: make(chan *msgToSend, config.BufferLimit),
|
|
|
|
+ pendingMutex: sync.RWMutex{},
|
|
|
|
+ muconn: sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
+
|
|
f.wg.Add(1)
|
|
f.wg.Add(1)
|
|
- go f.run()
|
|
|
|
|
|
+ go f.run(ctx)
|
|
} else {
|
|
} else {
|
|
f = &Fluent{
|
|
f = &Fluent{
|
|
Config: config,
|
|
Config: config,
|
|
dialer: d,
|
|
dialer: d,
|
|
|
|
+ muconn: sync.RWMutex{},
|
|
}
|
|
}
|
|
- err = f.connect()
|
|
|
|
|
|
+ err = f.connect(context.Background())
|
|
}
|
|
}
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -211,13 +236,18 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
|
|
fields := msgtype.NumField()
|
|
fields := msgtype.NumField()
|
|
for i := 0; i < fields; i++ {
|
|
for i := 0; i < fields; i++ {
|
|
field := msgtype.Field(i)
|
|
field := msgtype.Field(i)
|
|
|
|
+ value := msg.FieldByIndex(field.Index)
|
|
|
|
+ // ignore unexported fields
|
|
|
|
+ if !value.CanInterface() {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
name := field.Name
|
|
name := field.Name
|
|
if n1 := field.Tag.Get("msg"); n1 != "" {
|
|
if n1 := field.Tag.Get("msg"); n1 != "" {
|
|
name = n1
|
|
name = n1
|
|
} else if n2 := field.Tag.Get("codec"); n2 != "" {
|
|
} else if n2 := field.Tag.Get("codec"); n2 != "" {
|
|
name = n2
|
|
name = n2
|
|
}
|
|
}
|
|
- kv[name] = msg.FieldByIndex(field.Index).Interface()
|
|
|
|
|
|
+ kv[name] = value.Interface()
|
|
}
|
|
}
|
|
return f.EncodeAndPostData(tag, tm, kv)
|
|
return f.EncodeAndPostData(tag, tm, kv)
|
|
}
|
|
}
|
|
@@ -254,8 +284,12 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
|
|
if f.Config.Async {
|
|
if f.Config.Async {
|
|
return f.appendBuffer(msg)
|
|
return f.appendBuffer(msg)
|
|
}
|
|
}
|
|
|
|
+
|
|
// Synchronous write
|
|
// Synchronous write
|
|
- return f.write(msg)
|
|
|
|
|
|
+ if f.closed {
|
|
|
|
+ return fmt.Errorf("fluent#postRawData: Logger already closed")
|
|
|
|
+ }
|
|
|
|
+ return f.writeWithRetry(context.Background(), msg)
|
|
}
|
|
}
|
|
|
|
|
|
// For sending forward protocol adopted JSON
|
|
// For sending forward protocol adopted JSON
|
|
@@ -289,7 +323,7 @@ func getUniqueID(timeUnix int64) (string, error) {
|
|
enc.Close()
|
|
enc.Close()
|
|
return "", err
|
|
return "", err
|
|
}
|
|
}
|
|
- if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
|
|
|
|
|
|
+ if err := binary.Write(enc, binary.LittleEndian, randomGenerator()); err != nil {
|
|
enc.Close()
|
|
enc.Close()
|
|
return "", err
|
|
return "", err
|
|
}
|
|
}
|
|
@@ -325,32 +359,53 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-// Close closes the connection, waiting for pending logs to be sent
|
|
|
|
|
|
+// Close closes the connection, waiting for pending logs to be sent. If the client is
|
|
|
|
+// running in async mode, the run() goroutine exits before Close() returns.
|
|
func (f *Fluent) Close() (err error) {
|
|
func (f *Fluent) Close() (err error) {
|
|
- defer f.close(f.conn)
|
|
|
|
if f.Config.Async {
|
|
if f.Config.Async {
|
|
f.pendingMutex.Lock()
|
|
f.pendingMutex.Lock()
|
|
- if f.chanClosed {
|
|
|
|
|
|
+ if f.closed {
|
|
f.pendingMutex.Unlock()
|
|
f.pendingMutex.Unlock()
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
- f.chanClosed = true
|
|
|
|
|
|
+ f.closed = true
|
|
f.pendingMutex.Unlock()
|
|
f.pendingMutex.Unlock()
|
|
|
|
+
|
|
if f.Config.ForceStopAsyncSend {
|
|
if f.Config.ForceStopAsyncSend {
|
|
- f.stopRunning <- true
|
|
|
|
close(f.stopRunning)
|
|
close(f.stopRunning)
|
|
|
|
+ f.cancelDialings()
|
|
}
|
|
}
|
|
|
|
+
|
|
close(f.pending)
|
|
close(f.pending)
|
|
|
|
+ // If ForceStopAsyncSend is false, all logs in the channel have to be sent
|
|
|
|
+ // before closing the connection. At this point closed is true so no more
|
|
|
|
+ // logs are written to the channel and f.pending has been closed, so run()
|
|
|
|
+ // goroutine will exit as soon as all logs in the channel are sent.
|
|
|
|
+ if !f.Config.ForceStopAsyncSend {
|
|
|
|
+ f.wg.Wait()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ f.muconn.Lock()
|
|
|
|
+ f.close()
|
|
|
|
+ f.closed = true
|
|
|
|
+ f.muconn.Unlock()
|
|
|
|
+
|
|
|
|
+ // If ForceStopAsyncSend is true, we shall close the connection before waiting for
|
|
|
|
+ // run() goroutine to exit to be sure we aren't waiting on ack message that might
|
|
|
|
+ // never come (eg. because fluentd server is down). However we want to be sure the
|
|
|
|
+ // run() goroutine stops before returning from Close().
|
|
|
|
+ if f.Config.ForceStopAsyncSend {
|
|
f.wg.Wait()
|
|
f.wg.Wait()
|
|
}
|
|
}
|
|
- return nil
|
|
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
|
|
// appendBuffer appends data to buffer with lock.
|
|
// appendBuffer appends data to buffer with lock.
|
|
func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|
func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|
f.pendingMutex.RLock()
|
|
f.pendingMutex.RLock()
|
|
defer f.pendingMutex.RUnlock()
|
|
defer f.pendingMutex.RUnlock()
|
|
- if f.chanClosed {
|
|
|
|
|
|
+ if f.closed {
|
|
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
|
|
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
|
|
}
|
|
}
|
|
select {
|
|
select {
|
|
@@ -361,58 +416,114 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-// close closes the connection.
|
|
|
|
-func (f *Fluent) close(c net.Conn) {
|
|
|
|
- f.muconn.Lock()
|
|
|
|
- if f.conn != nil && f.conn == c {
|
|
|
|
|
|
+// close closes the connection. Callers should take care of locking muconn first.
|
|
|
|
+func (f *Fluent) close() {
|
|
|
|
+ if f.conn != nil {
|
|
f.conn.Close()
|
|
f.conn.Close()
|
|
f.conn = nil
|
|
f.conn = nil
|
|
}
|
|
}
|
|
- f.muconn.Unlock()
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-// connect establishes a new connection using the specified transport.
|
|
|
|
-func (f *Fluent) connect() (err error) {
|
|
|
|
|
|
+// connect establishes a new connection using the specified transport. Caller should
|
|
|
|
+// take care of locking muconn first.
|
|
|
|
+func (f *Fluent) connect(ctx context.Context) (err error) {
|
|
switch f.Config.FluentNetwork {
|
|
switch f.Config.FluentNetwork {
|
|
case "tcp":
|
|
case "tcp":
|
|
- f.conn, err = f.dialer.Dial(
|
|
|
|
|
|
+ f.conn, err = f.dialer.DialContext(ctx,
|
|
f.Config.FluentNetwork,
|
|
f.Config.FluentNetwork,
|
|
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
|
|
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
|
|
|
|
+ case "tls":
|
|
|
|
+ tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify}
|
|
|
|
+ f.conn, err = tls.DialWithDialer(
|
|
|
|
+ &net.Dialer{Timeout: f.Config.Timeout},
|
|
|
|
+ "tcp",
|
|
|
|
+ f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), tlsConfig,
|
|
|
|
+ )
|
|
case "unix":
|
|
case "unix":
|
|
- f.conn, err = f.dialer.Dial(
|
|
|
|
|
|
+ f.conn, err = f.dialer.DialContext(ctx,
|
|
f.Config.FluentNetwork,
|
|
f.Config.FluentNetwork,
|
|
f.Config.FluentSocketPath)
|
|
f.Config.FluentSocketPath)
|
|
default:
|
|
default:
|
|
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
|
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
|
}
|
|
}
|
|
|
|
+
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
-func (f *Fluent) run() {
|
|
|
|
- drainEvents := false
|
|
|
|
- var emitEventDrainMsg sync.Once
|
|
|
|
|
|
+var errIsClosing = errors.New("fluent logger is closing")
|
|
|
|
+
|
|
|
|
+// Caller should take care of locking muconn first.
|
|
|
|
+func (f *Fluent) connectWithRetry(ctx context.Context) error {
|
|
|
|
+ // A Time channel is used instead of time.Sleep() to avoid blocking this
|
|
|
|
+ // goroutine during way too much time (because of the exponential back-off
|
|
|
|
+ // retry).
|
|
|
|
+ // time.NewTimer() is used instead of time.After() to avoid leaking the
|
|
|
|
+ // timer channel (cf. https://pkg.go.dev/time#After).
|
|
|
|
+ timeout := time.NewTimer(time.Duration(0))
|
|
|
|
+ defer func() {
|
|
|
|
+ // timeout.Stop() is called in a function literal instead of being
|
|
|
|
+ // defered directly as it's re-assigned below when the retry loop spins.
|
|
|
|
+ timeout.Stop()
|
|
|
|
+ }()
|
|
|
|
+
|
|
|
|
+ for i := 0; i < f.Config.MaxRetry; i++ {
|
|
|
|
+ select {
|
|
|
|
+ case <-timeout.C:
|
|
|
|
+ err := f.connect(ctx)
|
|
|
|
+ if err == nil {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if _, ok := err.(*ErrUnknownNetwork); ok {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if err == context.Canceled {
|
|
|
|
+ return errIsClosing
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
|
|
|
+ if waitTime > f.Config.MaxRetryWait {
|
|
|
|
+ waitTime = f.Config.MaxRetryWait
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ timeout = time.NewTimer(time.Duration(waitTime) * time.Millisecond)
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
+ return errIsClosing
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// run is the goroutine used to unqueue and write logs in async mode. That
|
|
|
|
+// goroutine is meant to run during the whole life of the Fluent logger.
|
|
|
|
+func (f *Fluent) run(ctx context.Context) {
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case entry, ok := <-f.pending:
|
|
case entry, ok := <-f.pending:
|
|
|
|
+ // f.stopRunning is closed before f.pending only when ForceStopAsyncSend
|
|
|
|
+ // is enabled. Otherwise, f.pending is closed when Close() is called.
|
|
if !ok {
|
|
if !ok {
|
|
f.wg.Done()
|
|
f.wg.Done()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- if drainEvents {
|
|
|
|
- emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- err := f.write(entry)
|
|
|
|
- if err != nil {
|
|
|
|
|
|
+
|
|
|
|
+ err := f.writeWithRetry(ctx, entry)
|
|
|
|
+ if err != nil && err != errIsClosing {
|
|
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|
|
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|
|
}
|
|
}
|
|
- }
|
|
|
|
- select {
|
|
|
|
- case stopRunning, ok := <-f.stopRunning:
|
|
|
|
- if stopRunning || !ok {
|
|
|
|
- drainEvents = true
|
|
|
|
|
|
+ if f.AsyncResultCallback != nil {
|
|
|
|
+ var data []byte
|
|
|
|
+ if entry != nil {
|
|
|
|
+ data = entry.data
|
|
|
|
+ }
|
|
|
|
+ f.AsyncResultCallback(data, err)
|
|
}
|
|
}
|
|
- default:
|
|
|
|
|
|
+ case <-f.stopRunning:
|
|
|
|
+ fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339))
|
|
|
|
+
|
|
|
|
+ f.wg.Done()
|
|
|
|
+ return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -421,63 +532,85 @@ func e(x, y float64) int {
|
|
return int(math.Pow(x, y))
|
|
return int(math.Pow(x, y))
|
|
}
|
|
}
|
|
|
|
|
|
-func (f *Fluent) write(msg *msgToSend) error {
|
|
|
|
- var c net.Conn
|
|
|
|
|
|
+func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
|
|
for i := 0; i < f.Config.MaxRetry; i++ {
|
|
for i := 0; i < f.Config.MaxRetry; i++ {
|
|
- c = f.conn
|
|
|
|
- // Connect if needed
|
|
|
|
- if c == nil {
|
|
|
|
- f.muconn.Lock()
|
|
|
|
- if f.conn == nil {
|
|
|
|
- err := f.connect()
|
|
|
|
- if err != nil {
|
|
|
|
- f.muconn.Unlock()
|
|
|
|
-
|
|
|
|
- if _, ok := err.(*ErrUnknownNetwork); ok {
|
|
|
|
- // do not retry on unknown network error
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
|
|
|
- if waitTime > f.Config.MaxRetryWait {
|
|
|
|
- waitTime = f.Config.MaxRetryWait
|
|
|
|
- }
|
|
|
|
- time.Sleep(time.Duration(waitTime) * time.Millisecond)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- c = f.conn
|
|
|
|
- f.muconn.Unlock()
|
|
|
|
|
|
+ if retry, err := f.write(ctx, msg); !retry {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// write writes the provided msg to fluentd server. Its first return values is
|
|
|
|
+// a bool indicating whether the write should be retried.
|
|
|
|
+// This method relies on function literals to execute muconn.Unlock or
|
|
|
|
+// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
|
|
|
|
+// the case of panic recovering.
|
|
|
|
+func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
|
|
|
|
+ closer := func() {
|
|
|
|
+ f.muconn.Lock()
|
|
|
|
+ defer f.muconn.Unlock()
|
|
|
|
+
|
|
|
|
+ f.close()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err := func() (err error) {
|
|
|
|
+ f.muconn.Lock()
|
|
|
|
+ defer f.muconn.Unlock()
|
|
|
|
+
|
|
|
|
+ if f.conn == nil {
|
|
|
|
+ err = f.connectWithRetry(ctx)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return err
|
|
|
|
+ }(); err != nil {
|
|
|
|
+ // Here, we don't want to retry the write since connectWithRetry already
|
|
|
|
+ // retries Config.MaxRetry times to connect.
|
|
|
|
+ return false, fmt.Errorf("fluent#write: %v", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err := func() (err error) {
|
|
|
|
+ f.muconn.RLock()
|
|
|
|
+ defer f.muconn.RUnlock()
|
|
|
|
+
|
|
|
|
+ if f.conn == nil {
|
|
|
|
+ return fmt.Errorf("connection has been closed before writing to it")
|
|
}
|
|
}
|
|
|
|
|
|
- // We're connected, write msg
|
|
|
|
t := f.Config.WriteTimeout
|
|
t := f.Config.WriteTimeout
|
|
if time.Duration(0) < t {
|
|
if time.Duration(0) < t {
|
|
- c.SetWriteDeadline(time.Now().Add(t))
|
|
|
|
|
|
+ f.conn.SetWriteDeadline(time.Now().Add(t))
|
|
} else {
|
|
} else {
|
|
- c.SetWriteDeadline(time.Time{})
|
|
|
|
|
|
+ f.conn.SetWriteDeadline(time.Time{})
|
|
}
|
|
}
|
|
- _, err := c.Write(msg.data)
|
|
|
|
- if err != nil {
|
|
|
|
- f.close(c)
|
|
|
|
|
|
+
|
|
|
|
+ _, err = f.conn.Write(msg.data)
|
|
|
|
+ return err
|
|
|
|
+ }(); err != nil {
|
|
|
|
+ closer()
|
|
|
|
+ return true, fmt.Errorf("fluent#write: %v", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Acknowledgment check
|
|
|
|
+ if msg.ack != "" {
|
|
|
|
+ resp := &AckResp{}
|
|
|
|
+ var err error
|
|
|
|
+ if f.Config.MarshalAsJSON {
|
|
|
|
+ dec := json.NewDecoder(f.conn)
|
|
|
|
+ err = dec.Decode(resp)
|
|
} else {
|
|
} else {
|
|
- // Acknowledgment check
|
|
|
|
- if msg.ack != "" {
|
|
|
|
- resp := &AckResp{}
|
|
|
|
- if f.Config.MarshalAsJSON {
|
|
|
|
- dec := json.NewDecoder(c)
|
|
|
|
- err = dec.Decode(resp)
|
|
|
|
- } else {
|
|
|
|
- r := msgp.NewReader(c)
|
|
|
|
- err = resp.DecodeMsg(r)
|
|
|
|
- }
|
|
|
|
- if err != nil || resp.Ack != msg.ack {
|
|
|
|
- f.close(c)
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return err
|
|
|
|
|
|
+ r := msgp.NewReader(f.conn)
|
|
|
|
+ err = resp.DecodeMsg(r)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err != nil || resp.Ack != msg.ack {
|
|
|
|
+ fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)
|
|
|
|
+
|
|
|
|
+ closer()
|
|
|
|
+ return true, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
|
|
|
|
|
|
+ return false, nil
|
|
}
|
|
}
|