read.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package jsonfilelog
  2. import (
  3. "encoding/json"
  4. "io"
  5. "github.com/docker/docker/api/types/backend"
  6. "github.com/docker/docker/daemon/logger"
  7. "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
  8. )
  9. const maxJSONDecodeRetry = 20000
  10. // ReadLogs implements the logger's LogReader interface for the logs
  11. // created by this driver.
  12. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  13. logWatcher := logger.NewLogWatcher()
  14. go l.readLogs(logWatcher, config)
  15. return logWatcher
  16. }
  17. func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
  18. defer close(watcher.Msg)
  19. l.mu.Lock()
  20. l.readers[watcher] = struct{}{}
  21. l.mu.Unlock()
  22. l.writer.ReadLogs(config, watcher)
  23. l.mu.Lock()
  24. delete(l.readers, watcher)
  25. l.mu.Unlock()
  26. }
  27. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  28. l.Reset()
  29. if err := dec.Decode(l); err != nil {
  30. return nil, err
  31. }
  32. var attrs []backend.LogAttr
  33. if len(l.Attrs) != 0 {
  34. attrs = make([]backend.LogAttr, 0, len(l.Attrs))
  35. for k, v := range l.Attrs {
  36. attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
  37. }
  38. }
  39. msg := &logger.Message{
  40. Source: l.Stream,
  41. Timestamp: l.Created,
  42. Line: []byte(l.Log),
  43. Attrs: attrs,
  44. }
  45. return msg, nil
  46. }
  47. // decodeFunc is used to create a decoder for the log file reader
  48. func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
  49. l := &jsonlog.JSONLog{}
  50. dec := json.NewDecoder(rdr)
  51. return func() (msg *logger.Message, err error) {
  52. for retries := 0; retries < maxJSONDecodeRetry; retries++ {
  53. msg, err = decodeLogLine(dec, l)
  54. if err == nil {
  55. break
  56. }
  57. // try again, could be due to a an incomplete json object as we read
  58. if _, ok := err.(*json.SyntaxError); ok {
  59. dec = json.NewDecoder(rdr)
  60. retries++
  61. continue
  62. }
  63. // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  64. // remaining data in the parser's buffer while an io.EOF occurs.
  65. // If the json logger writes a partial json log entry to the disk
  66. // while at the same time the decoder tries to decode it, the race condition happens.
  67. if err == io.ErrUnexpectedEOF {
  68. reader := io.MultiReader(dec.Buffered(), rdr)
  69. dec = json.NewDecoder(reader)
  70. retries++
  71. }
  72. }
  73. return msg, err
  74. }
  75. }