read.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io"
  6. "github.com/docker/docker/api/types/backend"
  7. "github.com/docker/docker/daemon/logger"
  8. "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
  9. "github.com/docker/docker/daemon/logger/loggerutils"
  10. "github.com/docker/docker/pkg/tailfile"
  11. "github.com/sirupsen/logrus"
  12. )
  13. const maxJSONDecodeRetry = 20000
  14. // ReadLogs implements the logger's LogReader interface for the logs
  15. // created by this driver.
  16. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  17. logWatcher := logger.NewLogWatcher()
  18. go l.readLogs(logWatcher, config)
  19. return logWatcher
  20. }
  21. func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
  22. defer close(watcher.Msg)
  23. l.mu.Lock()
  24. l.readers[watcher] = struct{}{}
  25. l.mu.Unlock()
  26. l.writer.ReadLogs(config, watcher)
  27. l.mu.Lock()
  28. delete(l.readers, watcher)
  29. l.mu.Unlock()
  30. }
  31. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  32. l.Reset()
  33. if err := dec.Decode(l); err != nil {
  34. return nil, err
  35. }
  36. var attrs []backend.LogAttr
  37. if len(l.Attrs) != 0 {
  38. attrs = make([]backend.LogAttr, 0, len(l.Attrs))
  39. for k, v := range l.Attrs {
  40. attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
  41. }
  42. }
  43. msg := &logger.Message{
  44. Source: l.Stream,
  45. Timestamp: l.Created,
  46. Line: []byte(l.Log),
  47. Attrs: attrs,
  48. }
  49. return msg, nil
  50. }
  51. // decodeFunc is used to create a decoder for the log file reader
  52. func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
  53. l := &jsonlog.JSONLog{}
  54. dec := json.NewDecoder(rdr)
  55. return func() (msg *logger.Message, err error) {
  56. for retries := 0; retries < maxJSONDecodeRetry; retries++ {
  57. msg, err = decodeLogLine(dec, l)
  58. if err == nil || err == io.EOF {
  59. break
  60. }
  61. logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
  62. // try again, could be due to a an incomplete json object as we read
  63. if _, ok := err.(*json.SyntaxError); ok {
  64. dec = json.NewDecoder(rdr)
  65. continue
  66. }
  67. // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  68. // remaining data in the parser's buffer while an io.EOF occurs.
  69. // If the json logger writes a partial json log entry to the disk
  70. // while at the same time the decoder tries to decode it, the race condition happens.
  71. if err == io.ErrUnexpectedEOF {
  72. reader := io.MultiReader(dec.Buffered(), rdr)
  73. dec = json.NewDecoder(reader)
  74. continue
  75. }
  76. }
  77. return msg, err
  78. }
  79. }
  80. func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
  81. return tailfile.NewTailReader(ctx, r, req)
  82. }