vendor: github.com/fluent/fluent-logger-golang v1.8.0
Updates the fluent logger library to v1.8.0. Following PRs/commits were merged since last bump: * [Add callback for error handling when using async](https://github.com/fluent/fluent-logger-golang/pull/97) * [Fix panic when accessing unexported struct field](https://github.com/fluent/fluent-logger-golang/pull/99) * [Properly stop logger during (re)connect failure](https://github.com/fluent/fluent-logger-golang/pull/82) * [Support a TLS-enabled connection](e5d6aa13b7
) See https://github.com/fluent/fluent-logger-golang/compare/v1.6.1..v1.8.0 Signed-off-by: Albin Kerouanton <albinker@gmail.com> (cherry picked from commite24d61b7ef
) Signed-off-by: Wesley <wppttt@amazon.com>
This commit is contained in:
parent
d6f3add5c6
commit
81fc02b7e1
3 changed files with 288 additions and 123 deletions
|
@ -106,7 +106,7 @@ github.com/godbus/dbus/v5 37bf87eef99d69c4f1d3528bd66e
|
|||
github.com/Graylog2/go-gelf 1550ee647df0510058c9d67a45c56f18911d80b8 # v2 branch
|
||||
|
||||
# fluent-logger-golang deps
|
||||
github.com/fluent/fluent-logger-golang b9b7fb02ccfee8ba4e69aa87386820c2bf24fd11 # v1.6.1
|
||||
github.com/fluent/fluent-logger-golang 0b652e850a9140d0b1db6390d8925d0601e952db # v1.8.0
|
||||
github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0
|
||||
github.com/tinylib/msgp af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0
|
||||
|
||||
|
|
36
vendor/github.com/fluent/fluent-logger-golang/README.md
generated
vendored
36
vendor/github.com/fluent/fluent-logger-golang/README.md
generated
vendored
|
@ -1,7 +1,7 @@
|
|||
fluent-logger-golang
|
||||
====
|
||||
|
||||
[![Build Status](https://travis-ci.org/fluent/fluent-logger-golang.png?branch=master)](https://travis-ci.org/fluent/fluent-logger-golang)
|
||||
[![Build Status](https://github.com/fluent/fluent-logger-golang/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/fluent/fluent-logger-golang/actions)
|
||||
[![GoDoc](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent?status.svg)](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent)
|
||||
|
||||
## A structured event logger for Fluentd (Golang)
|
||||
|
@ -60,7 +60,12 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
|
|||
|
||||
### FluentNetwork
|
||||
|
||||
Specify the network protocol, as "tcp" (use `FluentHost` and `FluentPort`) or "unix" (use `FluentSocketPath`).
|
||||
Specify the network protocol. The supported values are:
|
||||
|
||||
* "tcp" (use `FluentHost` and `FluentPort`)
|
||||
* "tls" (use`FluentHost` and `FluentPort`)
|
||||
* "unix" (use `FluentSocketPath`)
|
||||
|
||||
The default is "tcp".
|
||||
|
||||
### FluentHost
|
||||
|
@ -121,6 +126,12 @@ The default is false.
|
|||
When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning)
|
||||
The default is false.
|
||||
|
||||
### AsyncResultCallback
|
||||
|
||||
When Async is enabled, if this is callback is provided, it will be called on every write to Fluentd. The callback function
|
||||
takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
|
||||
delivery of the message was unsuccessful.
|
||||
|
||||
### SubSecondPrecision
|
||||
|
||||
Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.
|
||||
|
@ -136,6 +147,10 @@ The default is false.
|
|||
Sets whether to request acknowledgment from Fluentd to increase the reliability
|
||||
of the connection. The default is false.
|
||||
|
||||
### TlsInsecureSkipVerify
|
||||
|
||||
Skip verifying the server certificate. Useful for development and testing. The default is false.
|
||||
|
||||
## FAQ
|
||||
|
||||
### Does this logger support the features of Fluentd Forward Protocol v1?
|
||||
|
@ -144,7 +159,24 @@ of the connection. The default is false.
|
|||
|
||||
This logger doesn't support those features. Patches are welcome!
|
||||
|
||||
### Is it allowed to call `Fluent.Post()` after connection close?
|
||||
|
||||
Before v1.8.0, the Fluent logger silently reopened connections whenever
|
||||
`Fluent.Post()` was called.
|
||||
|
||||
```go
|
||||
logger, _ := fluent.New(fluent.Config{})
|
||||
logger.Post(tag, data)
|
||||
logger.Close()
|
||||
logger.Post(tag, data) /* reopen connection */
|
||||
```
|
||||
|
||||
However, this behavior was confusing, in particular when multiple goroutines
|
||||
were involved. Starting v1.8.0, the logger no longer accepts `Fluent.Post()`
|
||||
after `Fluent.Close()`, and instead returns a "Logger already closed" error.
|
||||
|
||||
## Tests
|
||||
```
|
||||
|
||||
go test
|
||||
```
|
||||
|
|
373
vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go
generated
vendored
373
vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go
generated
vendored
|
@ -1,10 +1,13 @@
|
|||
package fluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -15,7 +18,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
@ -35,22 +37,30 @@ const (
|
|||
// Default sub-second precision value to false since it is only compatible
|
||||
// with fluentd versions v0.14 and above.
|
||||
defaultSubSecondPrecision = false
|
||||
|
||||
// Default value whether to skip checking insecure certs on TLS connections.
|
||||
defaultTlsInsecureSkipVerify = false
|
||||
)
|
||||
|
||||
// randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
|
||||
// during tests with a deterministic function.
|
||||
var randomGenerator = rand.Uint64
|
||||
|
||||
type Config struct {
|
||||
FluentPort int `json:"fluent_port"`
|
||||
FluentHost string `json:"fluent_host"`
|
||||
FluentNetwork string `json:"fluent_network"`
|
||||
FluentSocketPath string `json:"fluent_socket_path"`
|
||||
Timeout time.Duration `json:"timeout"`
|
||||
WriteTimeout time.Duration `json:"write_timeout"`
|
||||
BufferLimit int `json:"buffer_limit"`
|
||||
RetryWait int `json:"retry_wait"`
|
||||
MaxRetry int `json:"max_retry"`
|
||||
MaxRetryWait int `json:"max_retry_wait"`
|
||||
TagPrefix string `json:"tag_prefix"`
|
||||
Async bool `json:"async"`
|
||||
ForceStopAsyncSend bool `json:"force_stop_async_send"`
|
||||
FluentPort int `json:"fluent_port"`
|
||||
FluentHost string `json:"fluent_host"`
|
||||
FluentNetwork string `json:"fluent_network"`
|
||||
FluentSocketPath string `json:"fluent_socket_path"`
|
||||
Timeout time.Duration `json:"timeout"`
|
||||
WriteTimeout time.Duration `json:"write_timeout"`
|
||||
BufferLimit int `json:"buffer_limit"`
|
||||
RetryWait int `json:"retry_wait"`
|
||||
MaxRetry int `json:"max_retry"`
|
||||
MaxRetryWait int `json:"max_retry_wait"`
|
||||
TagPrefix string `json:"tag_prefix"`
|
||||
Async bool `json:"async"`
|
||||
ForceStopAsyncSend bool `json:"force_stop_async_send"`
|
||||
AsyncResultCallback func(data []byte, err error)
|
||||
// Deprecated: Use Async instead
|
||||
AsyncConnect bool `json:"async_connect"`
|
||||
MarshalAsJSON bool `json:"marshal_as_json"`
|
||||
|
@ -63,6 +73,9 @@ type Config struct {
|
|||
// respond with an acknowledgement. This option improves the reliability
|
||||
// of the message transmission.
|
||||
RequestAck bool `json:"request_ack"`
|
||||
|
||||
// Flag to skip verifying insecure certs on TLS connections
|
||||
TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
|
||||
}
|
||||
|
||||
type ErrUnknownNetwork struct {
|
||||
|
@ -85,17 +98,24 @@ type msgToSend struct {
|
|||
type Fluent struct {
|
||||
Config
|
||||
|
||||
dialer dialer
|
||||
stopRunning chan bool
|
||||
pending chan *msgToSend
|
||||
pendingMutex sync.RWMutex
|
||||
chanClosed bool
|
||||
wg sync.WaitGroup
|
||||
dialer dialer
|
||||
// stopRunning is used in async mode to signal to run() it should abort.
|
||||
stopRunning chan struct{}
|
||||
// cancelDialings is used by Close() to stop any in-progress dialing.
|
||||
cancelDialings context.CancelFunc
|
||||
pending chan *msgToSend
|
||||
pendingMutex sync.RWMutex
|
||||
closed bool
|
||||
wg sync.WaitGroup
|
||||
|
||||
muconn sync.Mutex
|
||||
muconn sync.RWMutex
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
type dialer interface {
|
||||
DialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||
}
|
||||
|
||||
// New creates a new Logger.
|
||||
func New(config Config) (*Fluent, error) {
|
||||
if config.Timeout == 0 {
|
||||
|
@ -106,10 +126,6 @@ func New(config Config) (*Fluent, error) {
|
|||
})
|
||||
}
|
||||
|
||||
type dialer interface {
|
||||
Dial(string, string) (net.Conn, error)
|
||||
}
|
||||
|
||||
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
||||
if config.FluentNetwork == "" {
|
||||
config.FluentNetwork = defaultNetwork
|
||||
|
@ -138,27 +154,36 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
|||
if config.MaxRetryWait == 0 {
|
||||
config.MaxRetryWait = defaultMaxRetryWait
|
||||
}
|
||||
if !config.TlsInsecureSkipVerify {
|
||||
config.TlsInsecureSkipVerify = defaultTlsInsecureSkipVerify
|
||||
}
|
||||
if config.AsyncConnect {
|
||||
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
|
||||
config.Async = config.Async || config.AsyncConnect
|
||||
}
|
||||
|
||||
if config.Async {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
f = &Fluent{
|
||||
Config: config,
|
||||
dialer: d,
|
||||
pending: make(chan *msgToSend, config.BufferLimit),
|
||||
pendingMutex: sync.RWMutex{},
|
||||
stopRunning: make(chan bool, 1),
|
||||
Config: config,
|
||||
dialer: d,
|
||||
stopRunning: make(chan struct{}),
|
||||
cancelDialings: cancel,
|
||||
pending: make(chan *msgToSend, config.BufferLimit),
|
||||
pendingMutex: sync.RWMutex{},
|
||||
muconn: sync.RWMutex{},
|
||||
}
|
||||
|
||||
f.wg.Add(1)
|
||||
go f.run()
|
||||
go f.run(ctx)
|
||||
} else {
|
||||
f = &Fluent{
|
||||
Config: config,
|
||||
dialer: d,
|
||||
muconn: sync.RWMutex{},
|
||||
}
|
||||
err = f.connect()
|
||||
err = f.connect(context.Background())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -211,13 +236,18 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
|
|||
fields := msgtype.NumField()
|
||||
for i := 0; i < fields; i++ {
|
||||
field := msgtype.Field(i)
|
||||
value := msg.FieldByIndex(field.Index)
|
||||
// ignore unexported fields
|
||||
if !value.CanInterface() {
|
||||
continue
|
||||
}
|
||||
name := field.Name
|
||||
if n1 := field.Tag.Get("msg"); n1 != "" {
|
||||
name = n1
|
||||
} else if n2 := field.Tag.Get("codec"); n2 != "" {
|
||||
name = n2
|
||||
}
|
||||
kv[name] = msg.FieldByIndex(field.Index).Interface()
|
||||
kv[name] = value.Interface()
|
||||
}
|
||||
return f.EncodeAndPostData(tag, tm, kv)
|
||||
}
|
||||
|
@ -254,8 +284,12 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
|
|||
if f.Config.Async {
|
||||
return f.appendBuffer(msg)
|
||||
}
|
||||
|
||||
// Synchronous write
|
||||
return f.write(msg)
|
||||
if f.closed {
|
||||
return fmt.Errorf("fluent#postRawData: Logger already closed")
|
||||
}
|
||||
return f.writeWithRetry(context.Background(), msg)
|
||||
}
|
||||
|
||||
// For sending forward protocol adopted JSON
|
||||
|
@ -289,7 +323,7 @@ func getUniqueID(timeUnix int64) (string, error) {
|
|||
enc.Close()
|
||||
return "", err
|
||||
}
|
||||
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
|
||||
if err := binary.Write(enc, binary.LittleEndian, randomGenerator()); err != nil {
|
||||
enc.Close()
|
||||
return "", err
|
||||
}
|
||||
|
@ -325,32 +359,53 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
|
|||
return
|
||||
}
|
||||
|
||||
// Close closes the connection, waiting for pending logs to be sent
|
||||
// Close closes the connection, waiting for pending logs to be sent. If the client is
|
||||
// running in async mode, the run() goroutine exits before Close() returns.
|
||||
func (f *Fluent) Close() (err error) {
|
||||
defer f.close(f.conn)
|
||||
if f.Config.Async {
|
||||
f.pendingMutex.Lock()
|
||||
if f.chanClosed {
|
||||
if f.closed {
|
||||
f.pendingMutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
f.chanClosed = true
|
||||
f.closed = true
|
||||
f.pendingMutex.Unlock()
|
||||
|
||||
if f.Config.ForceStopAsyncSend {
|
||||
f.stopRunning <- true
|
||||
close(f.stopRunning)
|
||||
f.cancelDialings()
|
||||
}
|
||||
|
||||
close(f.pending)
|
||||
// If ForceStopAsyncSend is false, all logs in the channel have to be sent
|
||||
// before closing the connection. At this point closed is true so no more
|
||||
// logs are written to the channel and f.pending has been closed, so run()
|
||||
// goroutine will exit as soon as all logs in the channel are sent.
|
||||
if !f.Config.ForceStopAsyncSend {
|
||||
f.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
f.muconn.Lock()
|
||||
f.close()
|
||||
f.closed = true
|
||||
f.muconn.Unlock()
|
||||
|
||||
// If ForceStopAsyncSend is true, we shall close the connection before waiting for
|
||||
// run() goroutine to exit to be sure we aren't waiting on ack message that might
|
||||
// never come (eg. because fluentd server is down). However we want to be sure the
|
||||
// run() goroutine stops before returning from Close().
|
||||
if f.Config.ForceStopAsyncSend {
|
||||
f.wg.Wait()
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// appendBuffer appends data to buffer with lock.
|
||||
func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
||||
f.pendingMutex.RLock()
|
||||
defer f.pendingMutex.RUnlock()
|
||||
if f.chanClosed {
|
||||
if f.closed {
|
||||
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
|
||||
}
|
||||
select {
|
||||
|
@ -361,58 +416,114 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// close closes the connection.
|
||||
func (f *Fluent) close(c net.Conn) {
|
||||
f.muconn.Lock()
|
||||
if f.conn != nil && f.conn == c {
|
||||
// close closes the connection. Callers should take care of locking muconn first.
|
||||
func (f *Fluent) close() {
|
||||
if f.conn != nil {
|
||||
f.conn.Close()
|
||||
f.conn = nil
|
||||
}
|
||||
f.muconn.Unlock()
|
||||
}
|
||||
|
||||
// connect establishes a new connection using the specified transport.
|
||||
func (f *Fluent) connect() (err error) {
|
||||
// connect establishes a new connection using the specified transport. Caller should
|
||||
// take care of locking muconn first.
|
||||
func (f *Fluent) connect(ctx context.Context) (err error) {
|
||||
switch f.Config.FluentNetwork {
|
||||
case "tcp":
|
||||
f.conn, err = f.dialer.Dial(
|
||||
f.conn, err = f.dialer.DialContext(ctx,
|
||||
f.Config.FluentNetwork,
|
||||
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
|
||||
case "tls":
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify}
|
||||
f.conn, err = tls.DialWithDialer(
|
||||
&net.Dialer{Timeout: f.Config.Timeout},
|
||||
"tcp",
|
||||
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), tlsConfig,
|
||||
)
|
||||
case "unix":
|
||||
f.conn, err = f.dialer.Dial(
|
||||
f.conn, err = f.dialer.DialContext(ctx,
|
||||
f.Config.FluentNetwork,
|
||||
f.Config.FluentSocketPath)
|
||||
default:
|
||||
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *Fluent) run() {
|
||||
drainEvents := false
|
||||
var emitEventDrainMsg sync.Once
|
||||
var errIsClosing = errors.New("fluent logger is closing")
|
||||
|
||||
// Caller should take care of locking muconn first.
|
||||
func (f *Fluent) connectWithRetry(ctx context.Context) error {
|
||||
// A Time channel is used instead of time.Sleep() to avoid blocking this
|
||||
// goroutine during way too much time (because of the exponential back-off
|
||||
// retry).
|
||||
// time.NewTimer() is used instead of time.After() to avoid leaking the
|
||||
// timer channel (cf. https://pkg.go.dev/time#After).
|
||||
timeout := time.NewTimer(time.Duration(0))
|
||||
defer func() {
|
||||
// timeout.Stop() is called in a function literal instead of being
|
||||
// defered directly as it's re-assigned below when the retry loop spins.
|
||||
timeout.Stop()
|
||||
}()
|
||||
|
||||
for i := 0; i < f.Config.MaxRetry; i++ {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
err := f.connect(ctx)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, ok := err.(*ErrUnknownNetwork); ok {
|
||||
return err
|
||||
}
|
||||
if err == context.Canceled {
|
||||
return errIsClosing
|
||||
}
|
||||
|
||||
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
||||
if waitTime > f.Config.MaxRetryWait {
|
||||
waitTime = f.Config.MaxRetryWait
|
||||
}
|
||||
|
||||
timeout = time.NewTimer(time.Duration(waitTime) * time.Millisecond)
|
||||
case <-ctx.Done():
|
||||
return errIsClosing
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry)
|
||||
}
|
||||
|
||||
// run is the goroutine used to unqueue and write logs in async mode. That
|
||||
// goroutine is meant to run during the whole life of the Fluent logger.
|
||||
func (f *Fluent) run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case entry, ok := <-f.pending:
|
||||
// f.stopRunning is closed before f.pending only when ForceStopAsyncSend
|
||||
// is enabled. Otherwise, f.pending is closed when Close() is called.
|
||||
if !ok {
|
||||
f.wg.Done()
|
||||
return
|
||||
}
|
||||
if drainEvents {
|
||||
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
|
||||
continue
|
||||
}
|
||||
err := f.write(entry)
|
||||
if err != nil {
|
||||
|
||||
err := f.writeWithRetry(ctx, entry)
|
||||
if err != nil && err != errIsClosing {
|
||||
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
select {
|
||||
case stopRunning, ok := <-f.stopRunning:
|
||||
if stopRunning || !ok {
|
||||
drainEvents = true
|
||||
if f.AsyncResultCallback != nil {
|
||||
var data []byte
|
||||
if entry != nil {
|
||||
data = entry.data
|
||||
}
|
||||
f.AsyncResultCallback(data, err)
|
||||
}
|
||||
default:
|
||||
case <-f.stopRunning:
|
||||
fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339))
|
||||
|
||||
f.wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -421,63 +532,85 @@ func e(x, y float64) int {
|
|||
return int(math.Pow(x, y))
|
||||
}
|
||||
|
||||
func (f *Fluent) write(msg *msgToSend) error {
|
||||
var c net.Conn
|
||||
func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
|
||||
for i := 0; i < f.Config.MaxRetry; i++ {
|
||||
c = f.conn
|
||||
// Connect if needed
|
||||
if c == nil {
|
||||
f.muconn.Lock()
|
||||
if f.conn == nil {
|
||||
err := f.connect()
|
||||
if err != nil {
|
||||
f.muconn.Unlock()
|
||||
|
||||
if _, ok := err.(*ErrUnknownNetwork); ok {
|
||||
// do not retry on unknown network error
|
||||
break
|
||||
}
|
||||
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
||||
if waitTime > f.Config.MaxRetryWait {
|
||||
waitTime = f.Config.MaxRetryWait
|
||||
}
|
||||
time.Sleep(time.Duration(waitTime) * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
c = f.conn
|
||||
f.muconn.Unlock()
|
||||
}
|
||||
|
||||
// We're connected, write msg
|
||||
t := f.Config.WriteTimeout
|
||||
if time.Duration(0) < t {
|
||||
c.SetWriteDeadline(time.Now().Add(t))
|
||||
} else {
|
||||
c.SetWriteDeadline(time.Time{})
|
||||
}
|
||||
_, err := c.Write(msg.data)
|
||||
if err != nil {
|
||||
f.close(c)
|
||||
} else {
|
||||
// Acknowledgment check
|
||||
if msg.ack != "" {
|
||||
resp := &AckResp{}
|
||||
if f.Config.MarshalAsJSON {
|
||||
dec := json.NewDecoder(c)
|
||||
err = dec.Decode(resp)
|
||||
} else {
|
||||
r := msgp.NewReader(c)
|
||||
err = resp.DecodeMsg(r)
|
||||
}
|
||||
if err != nil || resp.Ack != msg.ack {
|
||||
f.close(c)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if retry, err := f.write(ctx, msg); !retry {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
|
||||
return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
|
||||
}
|
||||
|
||||
// write writes the provided msg to fluentd server. Its first return values is
|
||||
// a bool indicating whether the write should be retried.
|
||||
// This method relies on function literals to execute muconn.Unlock or
|
||||
// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
|
||||
// the case of panic recovering.
|
||||
func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
|
||||
closer := func() {
|
||||
f.muconn.Lock()
|
||||
defer f.muconn.Unlock()
|
||||
|
||||
f.close()
|
||||
}
|
||||
|
||||
if err := func() (err error) {
|
||||
f.muconn.Lock()
|
||||
defer f.muconn.Unlock()
|
||||
|
||||
if f.conn == nil {
|
||||
err = f.connectWithRetry(ctx)
|
||||
}
|
||||
|
||||
return err
|
||||
}(); err != nil {
|
||||
// Here, we don't want to retry the write since connectWithRetry already
|
||||
// retries Config.MaxRetry times to connect.
|
||||
return false, fmt.Errorf("fluent#write: %v", err)
|
||||
}
|
||||
|
||||
if err := func() (err error) {
|
||||
f.muconn.RLock()
|
||||
defer f.muconn.RUnlock()
|
||||
|
||||
if f.conn == nil {
|
||||
return fmt.Errorf("connection has been closed before writing to it")
|
||||
}
|
||||
|
||||
t := f.Config.WriteTimeout
|
||||
if time.Duration(0) < t {
|
||||
f.conn.SetWriteDeadline(time.Now().Add(t))
|
||||
} else {
|
||||
f.conn.SetWriteDeadline(time.Time{})
|
||||
}
|
||||
|
||||
_, err = f.conn.Write(msg.data)
|
||||
return err
|
||||
}(); err != nil {
|
||||
closer()
|
||||
return true, fmt.Errorf("fluent#write: %v", err)
|
||||
}
|
||||
|
||||
// Acknowledgment check
|
||||
if msg.ack != "" {
|
||||
resp := &AckResp{}
|
||||
var err error
|
||||
if f.Config.MarshalAsJSON {
|
||||
dec := json.NewDecoder(f.conn)
|
||||
err = dec.Decode(resp)
|
||||
} else {
|
||||
r := msgp.NewReader(f.conn)
|
||||
err = resp.DecodeMsg(r)
|
||||
}
|
||||
|
||||
if err != nil || resp.Ack != msg.ack {
|
||||
fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)
|
||||
|
||||
closer()
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue