read.go 4.9 KB


  1. package jsonfilelog
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/docker/docker/daemon/logger"
  11. "github.com/docker/docker/pkg/filenotify"
  12. "github.com/docker/docker/pkg/ioutils"
  13. "github.com/docker/docker/pkg/jsonlog"
  14. "github.com/docker/docker/pkg/tailfile"
  15. )
  16. const maxJSONDecodeRetry = 20000
  17. func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
  18. l.Reset()
  19. if err := dec.Decode(l); err != nil {
  20. return nil, err
  21. }
  22. msg := &logger.Message{
  23. Source: l.Stream,
  24. Timestamp: l.Created,
  25. Line: []byte(l.Log),
  26. }
  27. return msg, nil
  28. }
  29. // ReadLogs implements the logger's LogReader interface for the logs
  30. // created by this driver.
  31. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  32. logWatcher := logger.NewLogWatcher()
  33. go l.readLogs(logWatcher, config)
  34. return logWatcher
  35. }
  36. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  37. defer close(logWatcher.Msg)
  38. pth := l.ctx.LogPath
  39. var files []io.ReadSeeker
  40. for i := l.n; i > 1; i-- {
  41. f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
  42. if err != nil {
  43. if !os.IsNotExist(err) {
  44. logWatcher.Err <- err
  45. break
  46. }
  47. continue
  48. }
  49. defer f.Close()
  50. files = append(files, f)
  51. }
  52. latestFile, err := os.Open(pth)
  53. if err != nil {
  54. logWatcher.Err <- err
  55. return
  56. }
  57. defer latestFile.Close()
  58. files = append(files, latestFile)
  59. tailer := ioutils.MultiReadSeeker(files...)
  60. if config.Tail != 0 {
  61. tailFile(tailer, logWatcher, config.Tail, config.Since)
  62. }
  63. if !config.Follow {
  64. return
  65. }
  66. if config.Tail >= 0 {
  67. latestFile.Seek(0, os.SEEK_END)
  68. }
  69. l.mu.Lock()
  70. l.readers[logWatcher] = struct{}{}
  71. l.mu.Unlock()
  72. notifyRotate := l.notifyRotate.Subscribe()
  73. followLogs(latestFile, logWatcher, notifyRotate, config.Since)
  74. l.mu.Lock()
  75. delete(l.readers, logWatcher)
  76. l.mu.Unlock()
  77. l.notifyRotate.Evict(notifyRotate)
  78. }
  79. func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
  80. var rdr io.Reader = f
  81. if tail > 0 {
  82. ls, err := tailfile.TailFile(f, tail)
  83. if err != nil {
  84. logWatcher.Err <- err
  85. return
  86. }
  87. rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
  88. }
  89. dec := json.NewDecoder(rdr)
  90. l := &jsonlog.JSONLog{}
  91. for {
  92. msg, err := decodeLogLine(dec, l)
  93. if err != nil {
  94. if err != io.EOF {
  95. logWatcher.Err <- err
  96. }
  97. return
  98. }
  99. if !since.IsZero() && msg.Timestamp.Before(since) {
  100. continue
  101. }
  102. logWatcher.Msg <- msg
  103. }
  104. }
  105. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  106. dec := json.NewDecoder(f)
  107. l := &jsonlog.JSONLog{}
  108. fileWatcher, err := filenotify.New()
  109. if err != nil {
  110. logWatcher.Err <- err
  111. }
  112. defer fileWatcher.Close()
  113. var retries int
  114. for {
  115. msg, err := decodeLogLine(dec, l)
  116. if err != nil {
  117. if err != io.EOF {
  118. // try again because this shouldn't happen
  119. if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  120. dec = json.NewDecoder(f)
  121. retries++
  122. continue
  123. }
  124. // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  125. // remaining data in the parser's buffer while an io.EOF occurs.
  126. // If the json logger writes a partial json log entry to the disk
  127. // while at the same time the decoder tries to decode it, the race codition happens.
  128. if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
  129. reader := io.MultiReader(dec.Buffered(), f)
  130. dec = json.NewDecoder(reader)
  131. retries++
  132. continue
  133. }
  134. logWatcher.Err <- err
  135. return
  136. }
  137. logrus.WithField("logger", "json-file").Debugf("waiting for events")
  138. if err := fileWatcher.Add(f.Name()); err != nil {
  139. logrus.WithField("logger", "json-file").Warn("falling back to file poller")
  140. fileWatcher.Close()
  141. fileWatcher = filenotify.NewPollingWatcher()
  142. if err := fileWatcher.Add(f.Name()); err != nil {
  143. logrus.Errorf("error watching log file for modifications: %v", err)
  144. logWatcher.Err <- err
  145. }
  146. }
  147. select {
  148. case <-fileWatcher.Events():
  149. dec = json.NewDecoder(f)
  150. fileWatcher.Remove(f.Name())
  151. continue
  152. case <-fileWatcher.Errors():
  153. fileWatcher.Remove(f.Name())
  154. logWatcher.Err <- err
  155. return
  156. case <-logWatcher.WatchClose():
  157. fileWatcher.Remove(f.Name())
  158. return
  159. case <-notifyRotate:
  160. f, err = os.Open(f.Name())
  161. if err != nil {
  162. logWatcher.Err <- err
  163. return
  164. }
  165. dec = json.NewDecoder(f)
  166. fileWatcher.Remove(f.Name())
  167. fileWatcher.Add(f.Name())
  168. continue
  169. }
  170. }
  171. retries = 0 // reset retries since we've succeeded
  172. if !since.IsZero() && msg.Timestamp.Before(since) {
  173. continue
  174. }
  175. select {
  176. case logWatcher.Msg <- msg:
  177. case <-logWatcher.WatchClose():
  178. logWatcher.Msg <- msg
  179. for {
  180. msg, err := decodeLogLine(dec, l)
  181. if err != nil {
  182. return
  183. }
  184. if !since.IsZero() && msg.Timestamp.Before(since) {
  185. continue
  186. }
  187. logWatcher.Msg <- msg
  188. }
  189. }
  190. }
  191. }