|
@@ -15,8 +15,9 @@ import (
|
|
|
"bytes"
|
|
|
"encoding/base64"
|
|
|
"encoding/binary"
|
|
|
- "github.com/tinylib/msgp/msgp"
|
|
|
"math/rand"
|
|
|
+
|
|
|
+ "github.com/tinylib/msgp/msgp"
|
|
|
)
|
|
|
|
|
|
const (
|
|
@@ -37,18 +38,19 @@ const (
|
|
|
)
|
|
|
|
|
|
type Config struct {
|
|
|
- FluentPort int `json:"fluent_port"`
|
|
|
- FluentHost string `json:"fluent_host"`
|
|
|
- FluentNetwork string `json:"fluent_network"`
|
|
|
- FluentSocketPath string `json:"fluent_socket_path"`
|
|
|
- Timeout time.Duration `json:"timeout"`
|
|
|
- WriteTimeout time.Duration `json:"write_timeout"`
|
|
|
- BufferLimit int `json:"buffer_limit"`
|
|
|
- RetryWait int `json:"retry_wait"`
|
|
|
- MaxRetry int `json:"max_retry"`
|
|
|
- MaxRetryWait int `json:"max_retry_wait"`
|
|
|
- TagPrefix string `json:"tag_prefix"`
|
|
|
- Async bool `json:"async"`
|
|
|
+ FluentPort int `json:"fluent_port"`
|
|
|
+ FluentHost string `json:"fluent_host"`
|
|
|
+ FluentNetwork string `json:"fluent_network"`
|
|
|
+ FluentSocketPath string `json:"fluent_socket_path"`
|
|
|
+ Timeout time.Duration `json:"timeout"`
|
|
|
+ WriteTimeout time.Duration `json:"write_timeout"`
|
|
|
+ BufferLimit int `json:"buffer_limit"`
|
|
|
+ RetryWait int `json:"retry_wait"`
|
|
|
+ MaxRetry int `json:"max_retry"`
|
|
|
+ MaxRetryWait int `json:"max_retry_wait"`
|
|
|
+ TagPrefix string `json:"tag_prefix"`
|
|
|
+ Async bool `json:"async"`
|
|
|
+ ForceStopAsyncSend bool `json:"force_stop_async_send"`
|
|
|
// Deprecated: Use Async instead
|
|
|
AsyncConnect bool `json:"async_connect"`
|
|
|
MarshalAsJSON bool `json:"marshal_as_json"`
|
|
@@ -63,6 +65,18 @@ type Config struct {
|
|
|
RequestAck bool `json:"request_ack"`
|
|
|
}
|
|
|
|
|
|
+type ErrUnknownNetwork struct {
|
|
|
+ network string
|
|
|
+}
|
|
|
+
|
|
|
+func (e *ErrUnknownNetwork) Error() string {
|
|
|
+ return "unknown network " + e.network
|
|
|
+}
|
|
|
+
|
|
|
+func NewErrUnknownNetwork(network string) error {
|
|
|
+ return &ErrUnknownNetwork{network}
|
|
|
+}
|
|
|
+
|
|
|
type msgToSend struct {
|
|
|
data []byte
|
|
|
ack string
|
|
@@ -71,15 +85,32 @@ type msgToSend struct {
|
|
|
type Fluent struct {
|
|
|
Config
|
|
|
|
|
|
- pending chan *msgToSend
|
|
|
- wg sync.WaitGroup
|
|
|
+ dialer dialer
|
|
|
+ stopRunning chan bool
|
|
|
+ pending chan *msgToSend
|
|
|
+ pendingMutex sync.RWMutex
|
|
|
+ chanClosed bool
|
|
|
+ wg sync.WaitGroup
|
|
|
|
|
|
muconn sync.Mutex
|
|
|
conn net.Conn
|
|
|
}
|
|
|
|
|
|
// New creates a new Logger.
|
|
|
-func New(config Config) (f *Fluent, err error) {
|
|
|
+func New(config Config) (*Fluent, error) {
|
|
|
+ if config.Timeout == 0 {
|
|
|
+ config.Timeout = defaultTimeout
|
|
|
+ }
|
|
|
+ return newWithDialer(config, &net.Dialer{
|
|
|
+ Timeout: config.Timeout,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+type dialer interface {
|
|
|
+ Dial(string, string) (net.Conn, error)
|
|
|
+}
|
|
|
+
|
|
|
+func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
|
|
|
if config.FluentNetwork == "" {
|
|
|
config.FluentNetwork = defaultNetwork
|
|
|
}
|
|
@@ -92,9 +123,6 @@ func New(config Config) (f *Fluent, err error) {
|
|
|
if config.FluentSocketPath == "" {
|
|
|
config.FluentSocketPath = defaultSocketPath
|
|
|
}
|
|
|
- if config.Timeout == 0 {
|
|
|
- config.Timeout = defaultTimeout
|
|
|
- }
|
|
|
if config.WriteTimeout == 0 {
|
|
|
config.WriteTimeout = defaultWriteTimeout
|
|
|
}
|
|
@@ -114,15 +142,22 @@ func New(config Config) (f *Fluent, err error) {
|
|
|
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
|
|
|
config.Async = config.Async || config.AsyncConnect
|
|
|
}
|
|
|
+
|
|
|
if config.Async {
|
|
|
f = &Fluent{
|
|
|
- Config: config,
|
|
|
- pending: make(chan *msgToSend, config.BufferLimit),
|
|
|
+ Config: config,
|
|
|
+ dialer: d,
|
|
|
+ pending: make(chan *msgToSend, config.BufferLimit),
|
|
|
+ pendingMutex: sync.RWMutex{},
|
|
|
+ stopRunning: make(chan bool, 1),
|
|
|
}
|
|
|
f.wg.Add(1)
|
|
|
go f.run()
|
|
|
} else {
|
|
|
- f = &Fluent{Config: config}
|
|
|
+ f = &Fluent{
|
|
|
+ Config: config,
|
|
|
+ dialer: d,
|
|
|
+ }
|
|
|
err = f.connect()
|
|
|
}
|
|
|
return
|
|
@@ -292,16 +327,32 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
|
|
|
|
|
|
// Close closes the connection, waiting for pending logs to be sent
|
|
|
func (f *Fluent) Close() (err error) {
|
|
|
+ defer f.close(f.conn)
|
|
|
if f.Config.Async {
|
|
|
+ f.pendingMutex.Lock()
|
|
|
+ if f.chanClosed {
|
|
|
+ f.pendingMutex.Unlock()
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ f.chanClosed = true
|
|
|
+ f.pendingMutex.Unlock()
|
|
|
+ if f.Config.ForceStopAsyncSend {
|
|
|
+ f.stopRunning <- true
|
|
|
+ close(f.stopRunning)
|
|
|
+ }
|
|
|
close(f.pending)
|
|
|
f.wg.Wait()
|
|
|
}
|
|
|
- f.close()
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
// appendBuffer appends data to buffer with lock.
|
|
|
func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|
|
+ f.pendingMutex.RLock()
|
|
|
+ defer f.pendingMutex.RUnlock()
|
|
|
+ if f.chanClosed {
|
|
|
+ return fmt.Errorf("fluent#appendBuffer: Logger already closed")
|
|
|
+ }
|
|
|
select {
|
|
|
case f.pending <- msg:
|
|
|
default:
|
|
@@ -311,9 +362,9 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
|
|
|
}
|
|
|
|
|
|
// close closes the connection.
|
|
|
-func (f *Fluent) close() {
|
|
|
+func (f *Fluent) close(c net.Conn) {
|
|
|
f.muconn.Lock()
|
|
|
- if f.conn != nil {
|
|
|
+ if f.conn != nil && f.conn == c {
|
|
|
f.conn.Close()
|
|
|
f.conn = nil
|
|
|
}
|
|
@@ -322,19 +373,24 @@ func (f *Fluent) close() {
|
|
|
|
|
|
// connect establishes a new connection using the specified transport.
|
|
|
func (f *Fluent) connect() (err error) {
|
|
|
-
|
|
|
switch f.Config.FluentNetwork {
|
|
|
case "tcp":
|
|
|
- f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
|
|
|
+ f.conn, err = f.dialer.Dial(
|
|
|
+ f.Config.FluentNetwork,
|
|
|
+ f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
|
|
|
case "unix":
|
|
|
- f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
|
|
|
+ f.conn, err = f.dialer.Dial(
|
|
|
+ f.Config.FluentNetwork,
|
|
|
+ f.Config.FluentSocketPath)
|
|
|
default:
|
|
|
- err = net.UnknownNetworkError(f.Config.FluentNetwork)
|
|
|
+ err = NewErrUnknownNetwork(f.Config.FluentNetwork)
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
func (f *Fluent) run() {
|
|
|
+ drainEvents := false
|
|
|
+ var emitEventDrainMsg sync.Once
|
|
|
for {
|
|
|
select {
|
|
|
case entry, ok := <-f.pending:
|
|
@@ -342,11 +398,22 @@ func (f *Fluent) run() {
|
|
|
f.wg.Done()
|
|
|
return
|
|
|
}
|
|
|
+ if drainEvents {
|
|
|
+ emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
|
|
|
+ continue
|
|
|
+ }
|
|
|
err := f.write(entry)
|
|
|
if err != nil {
|
|
|
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
|
|
|
}
|
|
|
}
|
|
|
+ select {
|
|
|
+ case stopRunning, ok := <-f.stopRunning:
|
|
|
+ if stopRunning || !ok {
|
|
|
+ drainEvents = true
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -355,48 +422,56 @@ func e(x, y float64) int {
|
|
|
}
|
|
|
|
|
|
func (f *Fluent) write(msg *msgToSend) error {
|
|
|
-
|
|
|
+ var c net.Conn
|
|
|
for i := 0; i < f.Config.MaxRetry; i++ {
|
|
|
-
|
|
|
+ c = f.conn
|
|
|
// Connect if needed
|
|
|
- f.muconn.Lock()
|
|
|
- if f.conn == nil {
|
|
|
- err := f.connect()
|
|
|
- if err != nil {
|
|
|
- f.muconn.Unlock()
|
|
|
- waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
|
|
- if waitTime > f.Config.MaxRetryWait {
|
|
|
- waitTime = f.Config.MaxRetryWait
|
|
|
+ if c == nil {
|
|
|
+ f.muconn.Lock()
|
|
|
+ if f.conn == nil {
|
|
|
+ err := f.connect()
|
|
|
+ if err != nil {
|
|
|
+ f.muconn.Unlock()
|
|
|
+
|
|
|
+ if _, ok := err.(*ErrUnknownNetwork); ok {
|
|
|
+ // do not retry on unknown network error
|
|
|
+ break
|
|
|
+ }
|
|
|
+ waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
|
|
+ if waitTime > f.Config.MaxRetryWait {
|
|
|
+ waitTime = f.Config.MaxRetryWait
|
|
|
+ }
|
|
|
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
|
|
|
+ continue
|
|
|
}
|
|
|
- time.Sleep(time.Duration(waitTime) * time.Millisecond)
|
|
|
- continue
|
|
|
}
|
|
|
+ c = f.conn
|
|
|
+ f.muconn.Unlock()
|
|
|
}
|
|
|
- f.muconn.Unlock()
|
|
|
|
|
|
// We're connected, write msg
|
|
|
t := f.Config.WriteTimeout
|
|
|
if time.Duration(0) < t {
|
|
|
- f.conn.SetWriteDeadline(time.Now().Add(t))
|
|
|
+ c.SetWriteDeadline(time.Now().Add(t))
|
|
|
} else {
|
|
|
- f.conn.SetWriteDeadline(time.Time{})
|
|
|
+ c.SetWriteDeadline(time.Time{})
|
|
|
}
|
|
|
- _, err := f.conn.Write(msg.data)
|
|
|
+ _, err := c.Write(msg.data)
|
|
|
if err != nil {
|
|
|
- f.close()
|
|
|
+ f.close(c)
|
|
|
} else {
|
|
|
// Acknowledgment check
|
|
|
if msg.ack != "" {
|
|
|
resp := &AckResp{}
|
|
|
if f.Config.MarshalAsJSON {
|
|
|
- dec := json.NewDecoder(f.conn)
|
|
|
+ dec := json.NewDecoder(c)
|
|
|
err = dec.Decode(resp)
|
|
|
} else {
|
|
|
- r := msgp.NewReader(f.conn)
|
|
|
+ r := msgp.NewReader(c)
|
|
|
err = resp.DecodeMsg(r)
|
|
|
}
|
|
|
if err != nil || resp.Ack != msg.ack {
|
|
|
- f.close()
|
|
|
+ f.close(c)
|
|
|
continue
|
|
|
}
|
|
|
}
|