fluent.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package fluent
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "math"
  9. "math/rand"
  10. "net"
  11. "os"
  12. "reflect"
  13. "strconv"
  14. "sync"
  15. "time"
  16. "bytes"
  17. "encoding/base64"
  18. "encoding/binary"
  19. "github.com/tinylib/msgp/msgp"
  20. )
  21. const (
  22. defaultHost = "127.0.0.1"
  23. defaultNetwork = "tcp"
  24. defaultSocketPath = ""
  25. defaultPort = 24224
  26. defaultTimeout = 3 * time.Second
  27. defaultWriteTimeout = time.Duration(0) // Write() will not time out
  28. defaultBufferLimit = 8 * 1024
  29. defaultRetryWait = 500
  30. defaultMaxRetryWait = 60000
  31. defaultMaxRetry = 13
  32. defaultReconnectWaitIncreRate = 1.5
  33. // Default sub-second precision value to false since it is only compatible
  34. // with fluentd versions v0.14 and above.
  35. defaultSubSecondPrecision = false
  36. // Default value whether to skip checking insecure certs on TLS connections.
  37. defaultTlsInsecureSkipVerify = false
  38. )
  39. // randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
  40. // during tests with a deterministic function.
  41. var randomGenerator = rand.Uint64
  42. type Config struct {
  43. FluentPort int `json:"fluent_port"`
  44. FluentHost string `json:"fluent_host"`
  45. FluentNetwork string `json:"fluent_network"`
  46. FluentSocketPath string `json:"fluent_socket_path"`
  47. Timeout time.Duration `json:"timeout"`
  48. WriteTimeout time.Duration `json:"write_timeout"`
  49. BufferLimit int `json:"buffer_limit"`
  50. RetryWait int `json:"retry_wait"`
  51. MaxRetry int `json:"max_retry"`
  52. MaxRetryWait int `json:"max_retry_wait"`
  53. TagPrefix string `json:"tag_prefix"`
  54. Async bool `json:"async"`
  55. ForceStopAsyncSend bool `json:"force_stop_async_send"`
  56. AsyncResultCallback func(data []byte, err error)
  57. // Deprecated: Use Async instead
  58. AsyncConnect bool `json:"async_connect"`
  59. MarshalAsJSON bool `json:"marshal_as_json"`
  60. // Sub-second precision timestamps are only possible for those using fluentd
  61. // v0.14+ and serializing their messages with msgpack.
  62. SubSecondPrecision bool `json:"sub_second_precision"`
  63. // RequestAck sends the chunk option with a unique ID. The server will
  64. // respond with an acknowledgement. This option improves the reliability
  65. // of the message transmission.
  66. RequestAck bool `json:"request_ack"`
  67. // Flag to skip verifying insecure certs on TLS connections
  68. TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
  69. }
  70. type ErrUnknownNetwork struct {
  71. network string
  72. }
  73. func (e *ErrUnknownNetwork) Error() string {
  74. return "unknown network " + e.network
  75. }
  76. func NewErrUnknownNetwork(network string) error {
  77. return &ErrUnknownNetwork{network}
  78. }
  79. type msgToSend struct {
  80. data []byte
  81. ack string
  82. }
  83. type Fluent struct {
  84. Config
  85. dialer dialer
  86. // stopRunning is used in async mode to signal to run() it should abort.
  87. stopRunning chan struct{}
  88. // cancelDialings is used by Close() to stop any in-progress dialing.
  89. cancelDialings context.CancelFunc
  90. pending chan *msgToSend
  91. pendingMutex sync.RWMutex
  92. closed bool
  93. wg sync.WaitGroup
  94. muconn sync.RWMutex
  95. conn net.Conn
  96. }
  97. type dialer interface {
  98. DialContext(ctx context.Context, network, address string) (net.Conn, error)
  99. }
  100. // New creates a new Logger.
  101. func New(config Config) (*Fluent, error) {
  102. if config.Timeout == 0 {
  103. config.Timeout = defaultTimeout
  104. }
  105. return newWithDialer(config, &net.Dialer{
  106. Timeout: config.Timeout,
  107. })
  108. }
  109. func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
  110. if config.FluentNetwork == "" {
  111. config.FluentNetwork = defaultNetwork
  112. }
  113. if config.FluentHost == "" {
  114. config.FluentHost = defaultHost
  115. }
  116. if config.FluentPort == 0 {
  117. config.FluentPort = defaultPort
  118. }
  119. if config.FluentSocketPath == "" {
  120. config.FluentSocketPath = defaultSocketPath
  121. }
  122. if config.WriteTimeout == 0 {
  123. config.WriteTimeout = defaultWriteTimeout
  124. }
  125. if config.BufferLimit == 0 {
  126. config.BufferLimit = defaultBufferLimit
  127. }
  128. if config.RetryWait == 0 {
  129. config.RetryWait = defaultRetryWait
  130. }
  131. if config.MaxRetry == 0 {
  132. config.MaxRetry = defaultMaxRetry
  133. }
  134. if config.MaxRetryWait == 0 {
  135. config.MaxRetryWait = defaultMaxRetryWait
  136. }
  137. if !config.TlsInsecureSkipVerify {
  138. config.TlsInsecureSkipVerify = defaultTlsInsecureSkipVerify
  139. }
  140. if config.AsyncConnect {
  141. fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
  142. config.Async = config.Async || config.AsyncConnect
  143. }
  144. if config.Async {
  145. ctx, cancel := context.WithCancel(context.Background())
  146. f = &Fluent{
  147. Config: config,
  148. dialer: d,
  149. stopRunning: make(chan struct{}),
  150. cancelDialings: cancel,
  151. pending: make(chan *msgToSend, config.BufferLimit),
  152. pendingMutex: sync.RWMutex{},
  153. muconn: sync.RWMutex{},
  154. }
  155. f.wg.Add(1)
  156. go f.run(ctx)
  157. } else {
  158. f = &Fluent{
  159. Config: config,
  160. dialer: d,
  161. muconn: sync.RWMutex{},
  162. }
  163. err = f.connect(context.Background())
  164. }
  165. return
  166. }
  167. // Post writes the output for a logging event.
  168. //
  169. // Examples:
  170. //
  171. // // send map[string]
  172. // mapStringData := map[string]string{
  173. // "foo": "bar",
  174. // }
  175. // f.Post("tag_name", mapStringData)
  176. //
  177. // // send message with specified time
  178. // mapStringData := map[string]string{
  179. // "foo": "bar",
  180. // }
  181. // tm := time.Now()
  182. // f.PostWithTime("tag_name", tm, mapStringData)
  183. //
  184. // // send struct
  185. // structData := struct {
  186. // Name string `msg:"name"`
  187. // } {
  188. // "john smith",
  189. // }
  190. // f.Post("tag_name", structData)
  191. //
  192. func (f *Fluent) Post(tag string, message interface{}) error {
  193. timeNow := time.Now()
  194. return f.PostWithTime(tag, timeNow, message)
  195. }
  196. func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
  197. if len(f.TagPrefix) > 0 {
  198. tag = f.TagPrefix + "." + tag
  199. }
  200. if m, ok := message.(msgp.Marshaler); ok {
  201. return f.EncodeAndPostData(tag, tm, m)
  202. }
  203. msg := reflect.ValueOf(message)
  204. msgtype := msg.Type()
  205. if msgtype.Kind() == reflect.Struct {
  206. // message should be tagged by "codec" or "msg"
  207. kv := make(map[string]interface{})
  208. fields := msgtype.NumField()
  209. for i := 0; i < fields; i++ {
  210. field := msgtype.Field(i)
  211. value := msg.FieldByIndex(field.Index)
  212. // ignore unexported fields
  213. if !value.CanInterface() {
  214. continue
  215. }
  216. name := field.Name
  217. if n1 := field.Tag.Get("msg"); n1 != "" {
  218. name = n1
  219. } else if n2 := field.Tag.Get("codec"); n2 != "" {
  220. name = n2
  221. }
  222. kv[name] = value.Interface()
  223. }
  224. return f.EncodeAndPostData(tag, tm, kv)
  225. }
  226. if msgtype.Kind() != reflect.Map {
  227. return errors.New("fluent#PostWithTime: message must be a map")
  228. } else if msgtype.Key().Kind() != reflect.String {
  229. return errors.New("fluent#PostWithTime: map keys must be strings")
  230. }
  231. kv := make(map[string]interface{})
  232. for _, k := range msg.MapKeys() {
  233. kv[k.String()] = msg.MapIndex(k).Interface()
  234. }
  235. return f.EncodeAndPostData(tag, tm, kv)
  236. }
  237. func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
  238. var msg *msgToSend
  239. var err error
  240. if msg, err = f.EncodeData(tag, tm, message); err != nil {
  241. return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
  242. }
  243. return f.postRawData(msg)
  244. }
  245. // Deprecated: Use EncodeAndPostData instead
  246. func (f *Fluent) PostRawData(msg *msgToSend) {
  247. f.postRawData(msg)
  248. }
  249. func (f *Fluent) postRawData(msg *msgToSend) error {
  250. if f.Config.Async {
  251. return f.appendBuffer(msg)
  252. }
  253. // Synchronous write
  254. if f.closed {
  255. return fmt.Errorf("fluent#postRawData: Logger already closed")
  256. }
  257. return f.writeWithRetry(context.Background(), msg)
  258. }
  259. // For sending forward protocol adopted JSON
  260. type MessageChunk struct {
  261. message Message
  262. }
  263. // Golang default marshaler does not support
  264. // ["value", "value2", {"key":"value"}] style marshaling.
  265. // So, it should write JSON marshaler by hand.
  266. func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
  267. data, err := json.Marshal(chunk.message.Record)
  268. if err != nil {
  269. return nil, err
  270. }
  271. option, err := json.Marshal(chunk.message.Option)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
  276. chunk.message.Time, data, option)), err
  277. }
  278. // getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
  279. // mechanism, see
  280. // https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
  281. func getUniqueID(timeUnix int64) (string, error) {
  282. buf := bytes.NewBuffer(nil)
  283. enc := base64.NewEncoder(base64.StdEncoding, buf)
  284. if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
  285. enc.Close()
  286. return "", err
  287. }
  288. if err := binary.Write(enc, binary.LittleEndian, randomGenerator()); err != nil {
  289. enc.Close()
  290. return "", err
  291. }
  292. // encoder needs to be closed before buf.String(), defer does not work
  293. // here
  294. enc.Close()
  295. return buf.String(), nil
  296. }
  297. func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
  298. option := make(map[string]string)
  299. msg = &msgToSend{}
  300. timeUnix := tm.Unix()
  301. if f.Config.RequestAck {
  302. var err error
  303. msg.ack, err = getUniqueID(timeUnix)
  304. if err != nil {
  305. return nil, err
  306. }
  307. option["chunk"] = msg.ack
  308. }
  309. if f.Config.MarshalAsJSON {
  310. m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
  311. chunk := &MessageChunk{message: m}
  312. msg.data, err = json.Marshal(chunk)
  313. } else if f.Config.SubSecondPrecision {
  314. m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
  315. msg.data, err = m.MarshalMsg(nil)
  316. } else {
  317. m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
  318. msg.data, err = m.MarshalMsg(nil)
  319. }
  320. return
  321. }
  322. // Close closes the connection, waiting for pending logs to be sent. If the client is
  323. // running in async mode, the run() goroutine exits before Close() returns.
  324. func (f *Fluent) Close() (err error) {
  325. if f.Config.Async {
  326. f.pendingMutex.Lock()
  327. if f.closed {
  328. f.pendingMutex.Unlock()
  329. return nil
  330. }
  331. f.closed = true
  332. f.pendingMutex.Unlock()
  333. if f.Config.ForceStopAsyncSend {
  334. close(f.stopRunning)
  335. f.cancelDialings()
  336. }
  337. close(f.pending)
  338. // If ForceStopAsyncSend is false, all logs in the channel have to be sent
  339. // before closing the connection. At this point closed is true so no more
  340. // logs are written to the channel and f.pending has been closed, so run()
  341. // goroutine will exit as soon as all logs in the channel are sent.
  342. if !f.Config.ForceStopAsyncSend {
  343. f.wg.Wait()
  344. }
  345. }
  346. f.muconn.Lock()
  347. f.close()
  348. f.closed = true
  349. f.muconn.Unlock()
  350. // If ForceStopAsyncSend is true, we shall close the connection before waiting for
  351. // run() goroutine to exit to be sure we aren't waiting on ack message that might
  352. // never come (eg. because fluentd server is down). However we want to be sure the
  353. // run() goroutine stops before returning from Close().
  354. if f.Config.ForceStopAsyncSend {
  355. f.wg.Wait()
  356. }
  357. return
  358. }
  359. // appendBuffer appends data to buffer with lock.
  360. func (f *Fluent) appendBuffer(msg *msgToSend) error {
  361. f.pendingMutex.RLock()
  362. defer f.pendingMutex.RUnlock()
  363. if f.closed {
  364. return fmt.Errorf("fluent#appendBuffer: Logger already closed")
  365. }
  366. select {
  367. case f.pending <- msg:
  368. default:
  369. return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
  370. }
  371. return nil
  372. }
  373. // close closes the connection. Callers should take care of locking muconn first.
  374. func (f *Fluent) close() {
  375. if f.conn != nil {
  376. f.conn.Close()
  377. f.conn = nil
  378. }
  379. }
  380. // connect establishes a new connection using the specified transport. Caller should
  381. // take care of locking muconn first.
  382. func (f *Fluent) connect(ctx context.Context) (err error) {
  383. switch f.Config.FluentNetwork {
  384. case "tcp":
  385. f.conn, err = f.dialer.DialContext(ctx,
  386. f.Config.FluentNetwork,
  387. f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
  388. case "tls":
  389. tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify}
  390. f.conn, err = tls.DialWithDialer(
  391. &net.Dialer{Timeout: f.Config.Timeout},
  392. "tcp",
  393. f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), tlsConfig,
  394. )
  395. case "unix":
  396. f.conn, err = f.dialer.DialContext(ctx,
  397. f.Config.FluentNetwork,
  398. f.Config.FluentSocketPath)
  399. default:
  400. err = NewErrUnknownNetwork(f.Config.FluentNetwork)
  401. }
  402. return err
  403. }
  404. var errIsClosing = errors.New("fluent logger is closing")
  405. // Caller should take care of locking muconn first.
  406. func (f *Fluent) connectWithRetry(ctx context.Context) error {
  407. // A Time channel is used instead of time.Sleep() to avoid blocking this
  408. // goroutine during way too much time (because of the exponential back-off
  409. // retry).
  410. // time.NewTimer() is used instead of time.After() to avoid leaking the
  411. // timer channel (cf. https://pkg.go.dev/time#After).
  412. timeout := time.NewTimer(time.Duration(0))
  413. defer func() {
  414. // timeout.Stop() is called in a function literal instead of being
  415. // defered directly as it's re-assigned below when the retry loop spins.
  416. timeout.Stop()
  417. }()
  418. for i := 0; i < f.Config.MaxRetry; i++ {
  419. select {
  420. case <-timeout.C:
  421. err := f.connect(ctx)
  422. if err == nil {
  423. return nil
  424. }
  425. if _, ok := err.(*ErrUnknownNetwork); ok {
  426. return err
  427. }
  428. if err == context.Canceled {
  429. return errIsClosing
  430. }
  431. waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
  432. if waitTime > f.Config.MaxRetryWait {
  433. waitTime = f.Config.MaxRetryWait
  434. }
  435. timeout = time.NewTimer(time.Duration(waitTime) * time.Millisecond)
  436. case <-ctx.Done():
  437. return errIsClosing
  438. }
  439. }
  440. return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry)
  441. }
  442. // run is the goroutine used to unqueue and write logs in async mode. That
  443. // goroutine is meant to run during the whole life of the Fluent logger.
  444. func (f *Fluent) run(ctx context.Context) {
  445. for {
  446. select {
  447. case entry, ok := <-f.pending:
  448. // f.stopRunning is closed before f.pending only when ForceStopAsyncSend
  449. // is enabled. Otherwise, f.pending is closed when Close() is called.
  450. if !ok {
  451. f.wg.Done()
  452. return
  453. }
  454. err := f.writeWithRetry(ctx, entry)
  455. if err != nil && err != errIsClosing {
  456. fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
  457. }
  458. if f.AsyncResultCallback != nil {
  459. var data []byte
  460. if entry != nil {
  461. data = entry.data
  462. }
  463. f.AsyncResultCallback(data, err)
  464. }
  465. case <-f.stopRunning:
  466. fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339))
  467. f.wg.Done()
  468. return
  469. }
  470. }
  471. }
  472. func e(x, y float64) int {
  473. return int(math.Pow(x, y))
  474. }
  475. func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
  476. for i := 0; i < f.Config.MaxRetry; i++ {
  477. if retry, err := f.write(ctx, msg); !retry {
  478. return err
  479. }
  480. }
  481. return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
  482. }
  483. // write writes the provided msg to fluentd server. Its first return values is
  484. // a bool indicating whether the write should be retried.
  485. // This method relies on function literals to execute muconn.Unlock or
  486. // muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
  487. // the case of panic recovering.
  488. func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
  489. closer := func() {
  490. f.muconn.Lock()
  491. defer f.muconn.Unlock()
  492. f.close()
  493. }
  494. if err := func() (err error) {
  495. f.muconn.Lock()
  496. defer f.muconn.Unlock()
  497. if f.conn == nil {
  498. err = f.connectWithRetry(ctx)
  499. }
  500. return err
  501. }(); err != nil {
  502. // Here, we don't want to retry the write since connectWithRetry already
  503. // retries Config.MaxRetry times to connect.
  504. return false, fmt.Errorf("fluent#write: %v", err)
  505. }
  506. if err := func() (err error) {
  507. f.muconn.RLock()
  508. defer f.muconn.RUnlock()
  509. if f.conn == nil {
  510. return fmt.Errorf("connection has been closed before writing to it")
  511. }
  512. t := f.Config.WriteTimeout
  513. if time.Duration(0) < t {
  514. f.conn.SetWriteDeadline(time.Now().Add(t))
  515. } else {
  516. f.conn.SetWriteDeadline(time.Time{})
  517. }
  518. _, err = f.conn.Write(msg.data)
  519. return err
  520. }(); err != nil {
  521. closer()
  522. return true, fmt.Errorf("fluent#write: %v", err)
  523. }
  524. // Acknowledgment check
  525. if msg.ack != "" {
  526. resp := &AckResp{}
  527. var err error
  528. if f.Config.MarshalAsJSON {
  529. dec := json.NewDecoder(f.conn)
  530. err = dec.Decode(resp)
  531. } else {
  532. r := msgp.NewReader(f.conn)
  533. err = resp.DecodeMsg(r)
  534. }
  535. if err != nil || resp.Ack != msg.ack {
  536. fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)
  537. closer()
  538. return true, err
  539. }
  540. }
  541. return false, nil
  542. }