|
@@ -14,6 +14,8 @@ import (
|
|
|
|
|
|
const (
|
|
|
defaultHost = "127.0.0.1"
|
|
|
+ defaultNetwork = "tcp"
|
|
|
+ defaultSocketPath = ""
|
|
|
defaultPort = 24224
|
|
|
defaultTimeout = 3 * time.Second
|
|
|
defaultBufferLimit = 8 * 1024 * 1024
|
|
@@ -23,13 +25,16 @@ const (
|
|
|
)
|
|
|
|
|
|
type Config struct {
|
|
|
- FluentPort int
|
|
|
- FluentHost string
|
|
|
- Timeout time.Duration
|
|
|
- BufferLimit int
|
|
|
- RetryWait int
|
|
|
- MaxRetry int
|
|
|
- TagPrefix string
|
|
|
+ FluentPort int
|
|
|
+ FluentHost string
|
|
|
+ FluentNetwork string
|
|
|
+ FluentSocketPath string
|
|
|
+ Timeout time.Duration
|
|
|
+ BufferLimit int
|
|
|
+ RetryWait int
|
|
|
+ MaxRetry int
|
|
|
+ TagPrefix string
|
|
|
+ AsyncConnect bool
|
|
|
}
|
|
|
|
|
|
type Fluent struct {
|
|
@@ -42,12 +47,18 @@ type Fluent struct {
|
|
|
|
|
|
// New creates a new Logger.
|
|
|
func New(config Config) (f *Fluent, err error) {
|
|
|
+ if config.FluentNetwork == "" {
|
|
|
+ config.FluentNetwork = defaultNetwork
|
|
|
+ }
|
|
|
if config.FluentHost == "" {
|
|
|
config.FluentHost = defaultHost
|
|
|
}
|
|
|
if config.FluentPort == 0 {
|
|
|
config.FluentPort = defaultPort
|
|
|
}
|
|
|
+ if config.FluentSocketPath == "" {
|
|
|
+ config.FluentSocketPath = defaultSocketPath
|
|
|
+ }
|
|
|
if config.Timeout == 0 {
|
|
|
config.Timeout = defaultTimeout
|
|
|
}
|
|
@@ -60,8 +71,13 @@ func New(config Config) (f *Fluent, err error) {
|
|
|
if config.MaxRetry == 0 {
|
|
|
config.MaxRetry = defaultMaxRetry
|
|
|
}
|
|
|
- f = &Fluent{Config: config, reconnecting: false}
|
|
|
- err = f.connect()
|
|
|
+ if config.AsyncConnect {
|
|
|
+ f = &Fluent{Config: config, reconnecting: true}
|
|
|
+ f.reconnect()
|
|
|
+ } else {
|
|
|
+ f = &Fluent{Config: config, reconnecting: false}
|
|
|
+ err = f.connect()
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -171,9 +187,9 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data
|
|
|
// Close closes the connection.
|
|
|
func (f *Fluent) Close() (err error) {
|
|
|
if len(f.pending) > 0 {
|
|
|
- _ = f.send()
|
|
|
+ err = f.send()
|
|
|
}
|
|
|
- err = f.close()
|
|
|
+ f.close()
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -194,7 +210,14 @@ func (f *Fluent) close() (err error) {
|
|
|
|
|
|
// connect establishes a new connection using the specified transport.
|
|
|
func (f *Fluent) connect() (err error) {
|
|
|
- f.conn, err = net.DialTimeout("tcp", f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
|
|
|
+ switch f.Config.FluentNetwork {
|
|
|
+ case "tcp":
|
|
|
+ f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
|
|
|
+ case "unix":
|
|
|
+ f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
|
|
|
+ default:
|
|
|
+ err = net.UnknownNetworkError(f.Config.FluentNetwork)
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
|